In [259]:
import pandas as pd
import numpy as np 
%matplotlib inline
import time
from datetime import datetime
from typing import List, Any
import yaml
import boto3

import sagemaker
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.feature_extraction import FeatureHasher
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split

https://www.alpha-quantum.com/blog/ctr-prediction/ctr-prediction-using-hashing-trick-logistic-regression-sgd-and-only-simple-python/

In [253]:
SETTING_FILE_PATH = '../settings.yaml'

In [239]:
# df_train = pd.read_csv('../avazu-ctr-prediction/train')
df_train = pd.read_csv('../avazu-ctr-prediction/train_partial')

In [32]:
df_train_partial = df_train[df_train.index % 100 == 0]

In [35]:
df_train_partial.to_csv('../avazu-ctr-prediction/train_partial', index=False)

In [111]:
df_test = pd.read_csv('../avazu-ctr-prediction/test')

In [74]:
pd.concat([df_train_partial.nunique().to_frame(), df_train_partial.dtypes.to_frame(), df_train_partial.isna().any().to_frame()], axis=1)

Unnamed: 0,0,0.1,0.2
id,404290,float64,False
click,2,int64,False
hour,240,int64,False
C1,7,int64,False
banner_pos,7,int64,False
site_id,2171,object,False
site_domain,2147,object,False
site_category,20,object,False
app_id,2245,object,False
app_domain,132,object,False


In [73]:
## hasing trick, train valid test split

In [115]:
df_train, df_valid = train_test_split(df_train_partial, train_size=0.8)

In [145]:
def preprocess(df: pd.DataFrame):
    df['hour'] = df['hour'].map(lambda x: datetime.strptime(str(x), "%y%m%d%H"))
    df['day_of_week'] = df['hour'].map(lambda x: x.hour)
    
    feature_hasher = FeatureHasher(n_features=2**24, input_type='string')
    hashed_feature = feature_hasher.fit_transform(np.asanyarray(df.astype(str)))
    
    return hashed_feature

In [240]:
current_time_sec = int(round(time.time()))
df_train['event_time'] = pd.Series([current_time_sec]*len(df_train), dtype="float64")


In [241]:
feature_cols = ['id', 'event_time', 'click', 'hour', 'C1', 'banner_pos', 'site_id', 'site_domain',
                            'site_category', 'app_id', 'app_domain', 'app_category', 'device_id',
                            'device_ip', 'device_model', 'device_type', 'device_conn_type', 'C14',
                            'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']

target = 'click'


In [147]:
y_train = df_train[target].values
y_train = np.asarray(y_train).ravel()

X_train = df_train[feature_cols]
X_train_hashed = preprocess(X_train)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [148]:
model = SGDClassifier(loss='log', alpha=0.00001, penalty='l2', eta0=2.0, n_jobs=-1, random_state=42)
# model = LogisticRegression()
model.partial_fit(X_train_hashed, y_train, classes=[0, 1])


SGDClassifier(alpha=1e-05, eta0=2.0, loss='log', n_jobs=-1, random_state=42)

In [149]:
y_valid = df_valid[target]
y_valid = np.asarray(y_valid).ravel()

X_valid = df_valid[feature_cols]
X_valid_hashed = preprocess(X_valid)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [156]:
y_pred = model.predict_proba(X_valid_hashed)

In [228]:
with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)

In [254]:
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']
region = boto3.Session().region_name

sm = boto3.client('sagemaker')
featurestore_runtime = boto3.client("sagemaker-featurestore-runtime")
s3 = boto3.client('s3')

In [242]:
prefix = "ctr-prediction-feature-store"
feature_group_name = "ctr-prediction-group"

feature_definitions = [
    FeatureDefinition(feature_name=feature_name, feature_type=FeatureTypeEnum.STRING)
    for feature_name in feature_cols
]

feature_group = FeatureGroup(name=feature_group_name, feature_definitions=feature_definitions, sagemaker_session=sess)


In [243]:
record_identifier_name = "id"
event_time_feature_name = "event_time"

In [247]:
feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",
    record_identifier_name = record_identifier_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=False)

{'FeatureGroupArn': 'arn:aws:sagemaker:ap-northeast-1:547760918250:feature-group/ctr-prediction-group',
 'ResponseMetadata': {'RequestId': '38cc9a61-2b99-4ad5-a6af-069e44d781fd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '38cc9a61-2b99-4ad5-a6af-069e44d781fd',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Thu, 21 Apr 2022 16:16:21 GMT'},
  'RetryAttempts': 0}}

In [248]:
import time


def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")
    
wait_for_feature_group_creation_complete(feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup ctr-prediction-group successfully created.


In [249]:
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        data_frame[label] = data_frame[label].astype("str").astype("string")
    return data_frame

In [251]:
output_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
df_train['event_time'] = output_date

feature_group.ingest(data_frame=cast_object_to_string(df_train), max_workers=3, wait=True)


KeyboardInterrupt: 

In [218]:
feature_store_query = feature_group.athena_query()

In [220]:
feature_store_table = feature_store_query.table_name

In [221]:
print(feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.ctr-prediction-group (
  id STRING
  event_time STRING
  click STRING
  hour STRING
  C1 STRING
  banner_pos STRING
  site_id STRING
  site_domain STRING
  site_category STRING
  app_id STRING
  app_domain STRING
  app_category STRING
  device_id STRING
  device_ip STRING
  device_model STRING
  device_type STRING
  device_conn_type STRING
  C14 STRING
  C15 STRING
  C16 STRING
  C17 STRING
  C18 STRING
  C19 STRING
  C20 STRING
  C21 STRING
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION 's3://ctr-prediction/ctr-prediction-feature-store/547760918250/sagemaker/ap-northeast-1/offline-store/ctr-prediction-group'


In [222]:
query_string = """
SELECT id, hour, click 
FROM "{}" LIMIT 5
""".format(
    feature_store_table
)

In [223]:
feature_store_query.run(query_string=query_string, output_location="s3://" + bucket + "/" + prefix + "/query_results/")

feature_store_query.wait()

In [224]:
dataset = pd.DataFrame()

dataset = feature_store_query.as_dataframe()

dataset

Unnamed: 0,id,hour,click
0,1.10805e+19,1410211400,0
1,1.149118e+19,1410211400,0
2,1.164484e+19,1410211400,1
3,1.253413e+19,1410211400,0
4,1.254452e+19,1410211400,1


In [258]:
processing_instance_type = "ml.c.large"
processing_instance_count = 1
train_split_percentage = 0.90
validation_split_percentage = 0.05
test_split_percentage = 0.05
balance_dataset = True
max_seq_length = 64

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    max_runtime_in_seconds=7200,
)

In [None]:
processor.run(
    code="ctr-prediction-preprocessor.py",
    inputs=[ProcessingInput(
                    source="../avazu-ctr-prediction/train_partial",
                    destination="/opt/ml/processing/input/data/")
           ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/train"),
        ProcessingOutput(source="/opt/ml/processing/output/validation"),
        ProcessingOutput(source="/opt/ml/processing/output/test"),
    ])