In [43]:
from pathlib import Path
import yaml
import boto3
from datetime import datetime
from time import strftime, gmtime
import pandas as pd
import warnings

import sagemaker
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.feature_store.feature_group import FeatureGroup

pd.set_option('display.max_columns', 30)
warnings.simplefilter('ignore')

## データ準備

In [3]:
SETTING_FILE_PATH = "../config/settings.yaml"
DATASET_FOLDER_PATH = "./avazu-ctr-prediction"

In [4]:
with open(SETTING_FILE_PATH) as file:
    aws_info = yaml.safe_load(file)
    
sess = sagemaker.Session()
role = aws_info['aws']['sagemaker']['role']
bucket = aws_info['aws']['sagemaker']['s3bucket']
region = boto3.Session().region_name

sm = boto3.client('sagemaker')
featurestore_runtime = boto3.client("sagemaker-featurestore-runtime")
s3 = boto3.client('s3')

In [5]:
dataset_folder = Path(DATASET_FOLDER_PATH)
# df_train = pd.read_csv(dataset_folder / "train")
# df_train_partial = df_train[df_train.index % 100 == 0]
# df_train_partial.to_csv(dataset_folder / "train_partial", index=False)

df_train_partial = pd.read_csv(dataset_folder / "train_partial")

In [9]:
df_train_partial.head()

Unnamed: 0,id,click,hour,C1,banner_pos,site_id,site_domain,site_category,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21
0,1.000009e+18,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,07d7df22,a99f214a,ddd2926e,44956a24,1,2,15706,320,50,1722,0,35,-1,79
1,1.001579e+19,0,14102100,1005,1,856e6d3f,58a89a43,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,4375586d,5ec45883,1,0,19772,320,50,2227,0,687,100075,48
2,1.002948e+18,0,14102100,1005,0,85f751fd,c4e18dd6,50e219e0,1779deee,2347f47a,f95efa07,a99f214a,ab9a5222,2ee63ff8,1,0,20596,320,50,2161,0,35,-1,157
3,1.004511e+19,0,14102100,1005,0,85f751fd,c4e18dd6,50e219e0,51cedd4e,aefc06bd,0f2161f8,a99f214a,bbe53381,542422a7,1,0,19743,320,50,2264,3,427,100000,61
4,1.00599e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,07d7df22,a99f214a,8a014cbb,04f5b394,1,0,15702,320,50,1722,0,35,-1,79


In [25]:
# レコードの生成時刻を表す列を追加
output_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
df_train_partial['event_time'] = output_date

In [26]:
record_identifier_name = "id" # レコードの識別子
event_time_feature_name = "event_time" # レコードの生成時刻

feature_names = ['click', 'hour', 'C1', 'banner_pos', 'site_id', 'site_domain',
                            'site_category', 'app_id', 'app_domain', 'app_category', 'device_id',
                            'device_ip', 'device_model', 'device_type', 'device_conn_type', 'C14',
                            'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21'] + [record_identifier_name, event_time_feature_name]
print(feature_names)

['click', 'hour', 'C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_id', 'device_ip', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'id', 'event_time']


## Feature Group の作成

In [100]:
prefix = "ctr-prediction-feature-store"
feature_group_name = "ctr-prediction-group" + f"-{strftime('%d-%H-%M-%S', gmtime())}"
print(feature_group_name)

ctr-prediction-group-11-00-53-30


### load_feature_definitions を使用して Feature Definition のスキーマを自動で識別する場合

In [68]:
feature_group_auto = FeatureGroup(name=feature_group_name, sagemaker_session=sess)

In [69]:
# pandas の DataFrameを直接入力すると ValueError がでる
try:
    feature_group_auto.load_feature_definitions(data_frame=df_train_partial)
except ValueError as e:
    print(e)

Failed to infer Feature type based on dtype object for column site_id.


In [57]:
df_train_partial.dtypes

id                  float64
click                 int64
hour                  int64
C1                    int64
banner_pos            int64
site_id              object
site_domain          object
site_category        object
app_id               object
app_domain           object
app_category         object
device_id            object
device_ip            object
device_model         object
device_type           int64
device_conn_type      int64
C14                   int64
C15                   int64
C16                   int64
C17                   int64
C18                   int64
C19                   int64
C20                   int64
C21                   int64
event_time           object
dtype: object

In [58]:
# Pandas の object 型を Feature Group が認識できる string 型に変換する
def cast_object_to_string(df: pd.DataFrame) -> pd.DataFrame:
    df_tmp = df.copy()
    for label in df_tmp.columns:
        if df_tmp.dtypes[label] == 'object':
            df_tmp[label] = df_tmp[label].astype("str").astype("string")
    return df_tmp

In [33]:
cast_object_to_string(df_train_partial).dtypes

id                  float64
click                 int64
hour                  int64
C1                    int64
banner_pos            int64
site_id              string
site_domain          string
site_category        string
app_id               string
app_domain           string
app_category         string
device_id            string
device_ip            string
device_model         string
device_type           int64
device_conn_type      int64
C14                   int64
C15                   int64
C16                   int64
C17                   int64
C18                   int64
C19                   int64
C20                   int64
C21                   int64
event_time           string
dtype: object

In [70]:
feature_group_auto.load_feature_definitions(data_frame=cast_object_to_string(df_train_partial))

[FeatureDefinition(feature_name='id', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='click', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='hour', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='C1', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='banner_pos', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='site_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_category'

In [71]:
del feature_group_auto

### 自分で定義したFeature Definition のスキーマを使用する場合

In [101]:
feature_definitions = [
    FeatureDefinition(feature_name=feature_name, feature_type=FeatureTypeEnum.STRING)
    for feature_name in feature_names
]

feature_group_original = FeatureGroup(name=feature_group_name, feature_definitions=feature_definitions, sagemaker_session=sess)
feature_group_original.feature_definitions

[FeatureDefinition(feature_name='click', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='hour', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='C1', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='banner_pos', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='site_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_id', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_domain', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='app_category', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='device_id', feature_type=<F

In [102]:
feature_group_original.create(
    s3_uri=f"s3://{bucket}/{prefix}", # offline feature store でデータを保存する S3 URI
    record_identifier_name = record_identifier_name, # レコード識別子のカラム名
    event_time_feature_name=event_time_feature_name, # レコード生成時刻のカラム名
    role_arn=role, 
    enable_online_store=True, # online feature store を作成するか. defualt = False
    description = "Feature Group For CTR Prediciton",
    tags = [{"Key":"author", "Value": "satsuki"},{"Key":"target", "Value": "click"} ]
) 

{'FeatureGroupArn': 'arn:aws:sagemaker:ap-northeast-1:547760918250:feature-group/ctr-prediction-group-11-00-53-30',
 'ResponseMetadata': {'RequestId': 'ec3991ad-98e5-4f4d-8896-e2ceb0f0f1b0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ec3991ad-98e5-4f4d-8896-e2ceb0f0f1b0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '114',
   'date': 'Wed, 11 May 2022 00:53:39 GMT'},
  'RetryAttempts': 0}}

In [None]:
feature_group_original.describe()

In [None]:
sm.list_feature_groups()['FeatureGroupSummaries'][0]

## データの登録

In [None]:
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        data_frame[label] = data_frame[label].astype("str").astype("string")
    return data_frame

In [103]:
df_train_tmp = df_train_partial[:20]
feature_group.ingest(data_frame=df_train_tmp, max_workers=4, wait=True)


IngestionManagerPandas(feature_group_name='ctr-prediction-group-11-00-34-02', sagemaker_session=<sagemaker.session.Session object at 0x11dd35690>, data_frame=              id  click      hour    C1  banner_pos   site_id site_domain  \
0   1.000009e+18      0  14102100  1005           0  1fbe01fe    f3845767   
1   1.001579e+19      0  14102100  1005           1  856e6d3f    58a89a43   
2   1.002948e+18      0  14102100  1005           0  85f751fd    c4e18dd6   
3   1.004511e+19      0  14102100  1005           0  85f751fd    c4e18dd6   
4   1.005990e+19      0  14102100  1005           0  1fbe01fe    f3845767   
5   1.007253e+19      0  14102100  1005           1  85f751fd    c4e18dd6   
6   1.008682e+19      0  14102100  1005           0  85f751fd    c4e18dd6   
7   1.009971e+19      0  14102100  1002           0  34d1d55f    97df357a   
8   1.011290e+19      1  14102100  1005           0  1fbe01fe    f3845767   
9   1.012868e+19      0  14102100  1005           0  85f751fd    c4e18dd

## データ取り出し

### Online Feature Store から取り出し

In [156]:

record_identifier_value = str(1.0203931153220458e+19)

r = featurestore_runtime.get_record(FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=record_identifier_value)


### Offline Feature Store から取り出し

In [125]:
feature_store_query = feature_group.athena_query()
feature_store_table = feature_store_query.table_name

In [126]:
print(feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.ctr-prediction-group-11-00-34-02 (
  click STRING
  hour STRING
  C1 STRING
  banner_pos STRING
  site_id STRING
  site_domain STRING
  site_category STRING
  app_id STRING
  app_domain STRING
  app_category STRING
  device_id STRING
  device_ip STRING
  device_model STRING
  device_type STRING
  device_conn_type STRING
  C14 STRING
  C15 STRING
  C16 STRING
  C17 STRING
  C18 STRING
  C19 STRING
  C20 STRING
  C21 STRING
  id STRING
  event_time STRING
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION 's3://ctr-prediction/ctr-prediction-feature-store/547760918250/sagemaker/ap-northeast-1/offline-store/ctr-prediction-group-11-00-34-02'


In [127]:
query_string = """
SELECT *
FROM "{}" LIMIT 5
""".format(
    feature_store_table
)


In [146]:
feature_store_query.run(query_string=query_string, output_location="s3://" + bucket + "/" + prefix + "/query_results/")

feature_store_query.wait()

In [147]:
dataset = pd.DataFrame()

dataset = feature_store_query.as_dataframe()

dataset

Unnamed: 0,click,hour,c1,banner_pos,site_id,site_domain,site_category,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,c14,c15,c16,c17,c18,c19,c20,c21,id,event_time,write_time,api_invocation_time,is_deleted
0,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,07d7df22,a99f214a,ceffea69,8a4875bd,1,0,15707,320,50,1722,0,35,-1,79,1.020393e+19,2022-05-11T09:22:24Z,2022-05-11 00:59:48.649,2022-05-11 00:54:53.000,False
1,0,14102100,1005,0,85f751fd,c4e18dd6,50e219e0,1779deee,2347f47a,f95efa07,a99f214a,ab9a5222,2ee63ff8,1,0,20596,320,50,2161,0,35,-1,157,1.002948e+18,2022-05-11T09:22:24Z,2022-05-11 00:59:56.789,2022-05-11 00:54:53.000,False
2,0,14102100,1005,0,6c5b482c,7687a86e,3e814130,ecad2386,7801e8d9,07d7df22,a99f214a,8a014cbb,8b1aa260,1,0,20016,300,250,2285,2,39,-1,23,1.02752e+19,2022-05-11T09:22:24Z,2022-05-11 00:59:56.789,2022-05-11 00:54:54.000,False
3,0,14102100,1005,1,856e6d3f,58a89a43,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,4375586d,5ec45883,1,0,19772,320,50,2227,0,687,100075,48,1.001579e+19,2022-05-11T09:22:24Z,2022-05-11 00:59:56.612,2022-05-11 00:54:53.000,False
4,0,14102100,1005,1,44637516,27e3c518,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,02b89493,6332421a,1,0,15708,320,50,1722,0,35,-1,79,1.018874e+19,2022-05-11T09:22:24Z,2022-05-11 00:59:59.771,2022-05-11 00:54:53.000,False


In [150]:
dataset['id'][0]

1.0203931153220458e+19

In [151]:
dataset['id'] == 1.0203931153220458e+19

0     True
1    False
2    False
3    False
4    False
Name: id, dtype: bool