# 本ディレクトリの一連の流れを試せるノートブック

## 諸々の準備

まずはインポート

In [1]:
import os
import gc
import glob

import cudf
import numpy as np
import pandas as pd
import cloudpickle
import torch

import nvtabular as nvt
from nvtabular.ops import Operator

from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags
from merlin.schema import Schema
from merlin.io import Dataset

# numba からの警告を抑制する
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

import transformers4rec.torch as tr
from transformers4rec.config.trainer import T4RecTrainingArguments
from transformers4rec.torch import Trainer
from transformers4rec.torch.ranking_metric import NDCGAt, RecallAt
from transformers4rec.torch.utils.examples_utils import wipe_memory

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
  from .autonotebook import tqdm as notebook_tqdm


各種定数を定義する

In [2]:
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/workspace/data/")
CSV_PATH = os.path.join(INPUT_DATA_DIR, "2019-Oct.csv")
PARQUET_PATH = os.path.join(INPUT_DATA_DIR, "Oct-2019.parquet")
PROCESSED_PATH = os.path.join(INPUT_DATA_DIR, "processed_nvt")
WORKFLOW_PATH = os.path.join(INPUT_DATA_DIR, 'workflow_etl')
SESSIONS_PATH = os.path.join(INPUT_DATA_DIR, "sessions_by_day")
MODEL_PATH = os.path.join(INPUT_DATA_DIR, "trained_model")

SESSIONS_MAX_LENGTH = 20
MINIMUM_SESSION_LENGTH = 2

## 学習データの準備

実験のためのデータをダウンロードする。
使うデータは <https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store> の `2019-Oct.csv` で、
サイズは5.3GiBになる。

In [3]:
if not os.path.exists(CSV_PATH):
    # データをCSV_PATHへダウンロードする
    import kagglehub
    import shutil

    # 環境変数 KAGGLE_USERNAME と KAGGLE_KEY でログイン。
    # 環境変数が設定していない場合は下記の YOUR XXX HERE の部分を該当する値へ書き換えても良い
    KAGGLE_USERNAME = os.environ.get('KAGGLE_USERNAME', 'YOUR USERNAME HERE')
    KAGGLE_KEY = os.environ.get('KAGGLE_KEY', 'YOUR API KEY HERE')
    kagglehub.config.set_kaggle_credentials(KAGGLE_USERNAME, KAGGLE_KEY)
    kagglehub.whoami()

    # 2019-Oct.csv をダウンロードし、ワーキングディレクトリにコピー
    path = kagglehub.dataset_download('mkechinov/ecommerce-behavior-data-from-multi-category-store', path='2019-Oct.csv')
    shutil.copy2(path, CSV_PATH)

CSVのままでは扱いにくいので Parquet 形式に変換する。
初めて変換する場合は特にDocker環境では5分ほどかかる。
変換結果はファイルに保存するので、そのファイル `Oct-2019.parquet` がある場合は自動的にこのステップをスキップする。

In [4]:
%%time

if not os.path.exists(PARQUET_PATH):
    # CSVをロードする。Docker上でやると3～5分くらいかかる
    raw_df = cudf.read_csv(CSV_PATH)
    
    # タイムスタンプの形式を秒へ変換
    raw_df['event_time_dt'] = raw_df['event_time'].astype('datetime64[s]')
    raw_df['event_time_ts'] = raw_df['event_time_dt'].astype('int')
    
    # `user_session` カラムが null の行を削除
    raw_df = raw_df[raw_df['user_session'].isnull()==False]
    
    # `event_time` カラムは利用しない
    raw_df = raw_df.drop(['event_time'], axis=1)
    
    # Workflowを用いて `user_session` カラムでグルーピングしデータフレームへ変換する
    cols = list(raw_df.columns)
    cols.remove('user_session')
    df_event = nvt.Dataset(raw_df) 
    cat_feats = ['user_session'] >> nvt.ops.Categorify()
    workflow = nvt.Workflow(cols + cat_feats)
    workflow.fit(df_event)
    df = workflow.transform(df_event).to_ddf().compute()
    
    # データ読み込みに利用していたメモリを解放する
    raw_df = None
    del(raw_df)
    gc.collect()
    
    # 連続した (user, item) のインタラクションを削除
    df = df.sort_values(['user_session', 'event_time_ts']).reset_index(drop=True)
    print("Count with in-session repeated interactions: {}".format(len(df)))
    # Sorts the dataframe by session and timestamp, to remove consecutive repetitions
    df['product_id_past'] = df['product_id'].shift(1).fillna(0)
    df['session_id_past'] = df['user_session'].shift(1).fillna(0)
    #Keeping only no consecutive repeated in session interactions
    df = df[~((df['user_session'] == df['session_id_past']) & \
                 (df['product_id'] == df['product_id_past']))]
    print("Count after removed in-session repeated interactions: {}".format(len(df)))
    del(df['product_id_past'])
    del(df['session_id_past'])
    gc.collect()
    
    # 特定の item が最初に表れた時刻を記録するカラムを追加
    item_first_interaction_df = df.groupby('product_id').agg({'event_time_ts': 'min'}) \
                .reset_index().rename(columns={'event_time_ts': 'prod_first_event_time_ts'})
    gc.collect()
    df = df.merge(item_first_interaction_df, on=['product_id'], how='left').reset_index(drop=True)
    item_first_interaction_df=None
    del(item_first_interaction_df)
    gc.collect()

    # 最初の1週間分のデータだけを使う
    df = df[df['event_time_dt'] < np.datetime64('2019-10-08')].reset_index(drop=True)
    # それが済めば `event_time_dt` カラムは不要なので削除する
    df = df.drop(['event_time_dt'], axis=1)

    # Parquet形式にして書き出す
    df.to_parquet(PARQUET_PATH)

    df = None
    del(df)
    gc.collect()

Count with in-session repeated interactions: 42448762
Count after removed in-session repeated interactions: 30733301
CPU times: user 1min 22s, sys: 19.5 s, total: 1min 41s
Wall time: 5min 49s


## Workflowを使って ETL する

以下はWorkflowを構築する手続き。

In [5]:
%%time

# カテゴリ化するカラムを指定する
item_id = ['product_id'] >> nvt.ops.TagAsItemID()
cat_feats = item_id + ['category_code', 'brand', 'user_id', 'category_id', 'event_type'] >> nvt.ops.Categorify()


# 時刻に関するカラムを変換する

session_ts = ['event_time_ts']

session_time = (
    session_ts >> 
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >> 
    nvt.ops.Rename(name = 'event_time_dt')
)

sessiontime_weekday = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >> 
    nvt.ops.Rename(name ='et_dayofweek')
)


def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

def get_cycled_feature_value_cos(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_cos = np.cos(2*np.pi*value_scaled)
    return value_cos

weekday_sin = (sessiontime_weekday >> 
               (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> 
               nvt.ops.Rename(name = 'et_dayofweek_sin') >>
               nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
              )
    
weekday_cos= (sessiontime_weekday >> 
              (lambda col: get_cycled_feature_value_cos(col+1, 7)) >> 
              nvt.ops.Rename(name = 'et_dayofweek_cos') >>
              nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
             )

# アイテムの最新性を計算するためのカスタムオペレーター
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['prod_first_event_time_ts']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["prod_first_event_time_ts"]

    @property
    def output_dtype(self):
        return np.float64

recency_features = ['event_time_ts'] >> ItemRecency() 
recency_features_norm = (recency_features >> 
                         nvt.ops.LogOp() >> 
                         nvt.ops.Normalize(out_dtype=np.float32) >> 
                         nvt.ops.Rename(name='product_recency_days_log_norm')
                        )

time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin +
    weekday_cos +
    recency_features_norm
)

# ロング・テイルな価格のカラムを標準化する
price_log = ['price'] >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32) >> nvt.ops.Rename(name='price_log_norm')

# 相対価格をカテゴリー(ID)の平均に変換する

def relative_price_to_avg_categ(col, gdf):
    epsilon = 1e-5
    col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int)
    return col
    
avg_category_id_pr = ['category_id'] >> nvt.ops.JoinGroupby(cont_cols =['price'], stats=["mean"]) >> nvt.ops.Rename(name='avg_category_id_price')
relative_price_to_avg_category = (
    avg_category_id_pr >> 
    nvt.ops.LambdaOp(relative_price_to_avg_categ, dependency=['price']) >> 
    nvt.ops.Rename(name="relative_price_to_avg_categ_id") >>
    nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

# インタラクションをセッションでグルーピングする
groupby_feats = ['event_time_ts', 'user_session'] + cat_feats + time_features + price_log + relative_price_to_avg_category

groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["user_session"], 
    sort_cols=["event_time_ts"],
    aggs={
        'user_id': ['first'],
        'product_id': ["list", "count"],
        'category_code': ["list"],  
        'brand': ["list"], 
        'category_id': ["list"], 
        'event_time_ts': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'et_dayofweek_cos': ["list"],
        'price_log_norm': ["list"],
        'relative_price_to_avg_categ_id': ["list"],
        'product_recency_days_log_norm': ["list"]
        },
    name_sep="-")

groupby_features_list = groupby_features['product_id-list',
        'category_code-list',  
        'brand-list', 
        'category_id-list', 
        'et_dayofweek_sin-list',
        'et_dayofweek_cos-list',
        'price_log_norm-list',
        'relative_price_to_avg_categ_id-list',
        'product_recency_days_log_norm-list']

groupby_features_trim = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True)

# calculate session day index based on 'timestamp-first' column
day_index = ((groupby_features['event_time_dt-first'])  >> 
             nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >> 
             nvt.ops.Rename(f = lambda col: "day_index") >>
             nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
            )

sess_id = groupby_features['user_session'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])

selected_features = sess_id + groupby_features['product_id-count'] + groupby_features_trim + day_index

# 変換済みのデータを再度、読み込む
df = cudf.read_parquet(PARQUET_PATH)

filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["product_id-count"] >= MINIMUM_SESSION_LENGTH)

workflow = nvt.Workflow(filtered_sessions)

CPU times: user 132 ms, sys: 15.2 ms, total: 147 ms
Wall time: 656 ms


作ったWorkflowで ETL (Extract, Transform, Load) する。
結果は `processed_nvt` ディレクトリに保存する。
既に `processed_nvt` ディレクトリが存在する場合は自動的にこのステップをスキップする。

In [6]:
%%time

if not os.path.exists(PROCESSED_PATH):
    dataset = nvt.Dataset(df)
    # Learn features statistics necessary of the preprocessing workflow
    # The following will generate schema.pbtxt file in the provided folder and export the parquet files.
    workflow.fit_transform(dataset).to_parquet(PROCESSED_PATH)
    dataset = None
    del(dataset)

CPU times: user 7.51 s, sys: 550 ms, total: 8.06 s
Wall time: 11.9 s


In [7]:
# 作った Workflow はディレクトリへ保存しておく
if not os.path.exists(WORKFLOW_PATH):
    workflow.save(WORKFLOW_PATH)

## データを日ごとに分割する

In [8]:
%%time

SESSIONS_PATH = os.path.join(INPUT_DATA_DIR, "sessions_by_day")

if not os.path.exists(SESSIONS_PATH):
    PARTITION_COL = 'day_index'
    
    OUTPUT_FOLDER = os.environ.get("OUTPUT_FOLDER", SESSIONS_PATH)
    !mkdir -p $OUTPUT_FOLDER
    
    # read in the processed train dataset
    sessions_gdf = cudf.read_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt/part_0.parquet"))
    
    from transformers4rec.utils.data_utils import save_time_based_splits
    save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                           output_dir= OUTPUT_FOLDER,
                           partition_col=PARTITION_COL,
                           timestamp_col='user_session', 
                          )

    sessions_gdf = None
    del(sessions_gdf)
    gc.collect()

Creating time-based splits: 100%|█████████████████████████████████████████████████████████████████████████| 7/7 [00:04<00:00,  1.68it/s]


CPU times: user 3.52 s, sys: 555 ms, total: 4.07 s
Wall time: 6.78 s


## Transformerモデルを作り学習する

In [9]:
%%time

if not os.path.exists(MODEL_PATH):    
    # 入力スキーマの構築
    
    # Define categorical and continuous columns to fed to training model
    x_cat_names = ['product_id-list', 'category_id-list', 'brand-list']
    x_cont_names = ['product_recency_days_log_norm-list', 'et_dayofweek_sin-list', 'et_dayofweek_cos-list', 
                    'price_log_norm-list', 'relative_price_to_avg_categ_id-list']
    
    train = Dataset(os.path.join(INPUT_DATA_DIR, "processed_nvt/part_0.parquet"))
    schema = train.schema
    schema = schema.select_by_name(x_cat_names + x_cont_names)
    
    # モデル情報の構築
    
    # Define input block
    sequence_length, d_model = 20, 192
    # Define input module to process tabular input-features and to prepare masked inputs
    inputs= tr.TabularSequenceFeatures.from_schema(
        schema,
        max_sequence_length=sequence_length,
        aggregation="concat",
        d_output=d_model,
        masking="mlm",
    )
    
    # Define XLNetConfig class and set default parameters for HF XLNet config  
    transformer_config = tr.XLNetConfig.build(
        d_model=d_model, n_head=4, n_layer=2, total_seq_length=sequence_length
    )
    # Define the model block including: inputs, masking, projection and transformer block.
    body = tr.SequentialBlock(
        inputs, tr.MLPBlock([192]), tr.TransformerBlock(transformer_config, masking=inputs.masking)
    )
    
    # Define the head related to next item prediction task 
    head = tr.Head(
        body,
        tr.NextItemPredictionTask(weight_tying=True, 
                                         metrics=[NDCGAt(top_ks=[10, 20], labels_onehot=True),  
                                                  RecallAt(top_ks=[10, 20], labels_onehot=True)]),
    )
    
    # Get the end-to-end Model class 
    model = tr.Model(head)
    
    #Set arguments for training
    training_args = T4RecTrainingArguments(
                output_dir = "./tmp",
                max_sequence_length=20,
                data_loader_engine='merlin',
                num_train_epochs=3,
                dataloader_drop_last=False,
                per_device_train_batch_size = 256,
                per_device_eval_batch_size = 32,
                gradient_accumulation_steps = 1,
                learning_rate=0.000666,
                report_to = [],
                logging_steps=200,
            )
    
    # 学習を実行し、結果を保存する
    
    # Instantiate the T4Rec Trainer, which manages training and evaluation
    trainer = Trainer(
        model=model,
        args=training_args,
        schema=schema,
        compute_metrics=True,
    )
    
    OUTPUT_DIR = os.environ.get("OUTPUT_DIR", SESSIONS_PATH)
    
    start_time_window_index = 1
    final_time_window_index = 4
    for time_index in range(start_time_window_index, final_time_window_index):
        # Set data 
        time_index_train = time_index
        time_index_eval = time_index + 1
        train_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_train}/train.parquet"))
        eval_paths = glob.glob(os.path.join(OUTPUT_DIR, f"{time_index_eval}/valid.parquet"))
        # Train on day related to time_index 
        print('*'*20)
        print("Launch training for day %s are:" %time_index)
        print('*'*20 + '\n')
        trainer.train_dataset_or_path = train_paths
        trainer.reset_lr_scheduler()
        trainer.train()
        trainer.state.global_step +=1
        # Evaluate on the following day
        trainer.eval_dataset_or_path = eval_paths
        train_metrics = trainer.evaluate(metric_key_prefix='eval')
        print('*'*20)
        print("Eval results for day %s are:\t" %time_index_eval)
        print('\n' + '*'*20 + '\n')
        for key in sorted(train_metrics.keys()):
            print(" %s = %s" % (key, str(train_metrics[key]))) 
        wipe_memory()
    
    model.save(MODEL_PATH)

    model = None
    del(model)
    gc.collect()

Projecting inputs of NextItemPredictionTask to'64' As weight tying requires the input dimension '192' to be equal to the item-id embedding dimension '64'


********************
Launch training for day 1 are:
********************





Step,Training Loss
200,9.8705
400,9.0325
600,8.6974
800,8.6514
1000,8.4071
1200,8.4464


********************
Eval results for day 2 are:	

********************

 eval_/loss = 8.590091705322266
 eval_/next-item/ndcg_at_10 = 0.05700892582535744
 eval_/next-item/ndcg_at_20 = 0.06829071789979935
 eval_/next-item/recall_at_10 = 0.10587436705827713
 eval_/next-item/recall_at_20 = 0.15081818401813507
 eval_runtime = 6.5576
 eval_samples_per_second = 2025.129
 eval_steps_per_second = 63.285
********************
Launch training for day 2 are:
********************



Step,Training Loss
200,8.4646
400,8.2111
600,7.9142
800,7.7797
1000,7.57
1200,7.5597


********************
Eval results for day 3 are:	

********************

 eval_/loss = 7.928411960601807
 eval_/next-item/ndcg_at_10 = 0.07954906672239304
 eval_/next-item/ndcg_at_20 = 0.0967593640089035
 eval_/next-item/recall_at_10 = 0.15242211520671844
 eval_/next-item/recall_at_20 = 0.22076334059238434
 eval_runtime = 7.1194
 eval_samples_per_second = 1725.989
 eval_steps_per_second = 53.937
********************
Launch training for day 3 are:
********************



Step,Training Loss
200,7.7461
400,7.6312
600,7.3862
800,7.3115
1000,7.1579


********************
Eval results for day 4 are:	

********************

 eval_/loss = 7.517406940460205
 eval_/next-item/ndcg_at_10 = 0.09403380751609802
 eval_/next-item/ndcg_at_20 = 0.11495339125394821
 eval_/next-item/recall_at_10 = 0.17811131477355957
 eval_/next-item/recall_at_20 = 0.2610796093940735
 eval_runtime = 10.8837
 eval_samples_per_second = 1428.926
 eval_steps_per_second = 44.654
CPU times: user 3min 34s, sys: 31.2 s, total: 4min 6s
Wall time: 2min 51s


# 推論

人工的に作ったセッションデータから、推薦アイテム列を推論(作成)する。

In [10]:
# compose dataframe

cols = [
        'product_id-list',
        'brand-list',
        'category_id-list',
        'et_dayofweek_sin-list',
        'et_dayofweek_cos-list',
        'price_log_norm-list',
        'relative_price_to_avg_categ_id-list',
        'product_recency_days_log_norm-list',
        ]

emp = np.empty(0, dtype=np.int64)

data = [
    [ emp   , emp, emp, emp, emp, emp, emp, emp, ], # 何も買ってない(初めての買い物客)
    [ [   1], [0], [0], [0], [1], [0], [0], [0], ], # id:1 の商品を0°曜日に買った
    [ [   1], [0], [0], [1], [0], [0], [0], [0], ], # id:1 の商品を90°曜日に買った
    [ [9999], [0], [0], [0], [1], [0], [0], [0], ], # id:9999 の商品を0°曜日に買った

    #[ 999999999, 1, [60], [0], [0], [0], [0], [0], [0], [0], [0], 1, ],
    #[ 999999999, 1, [90], [0], [0], [0], [0], [0], [0], [0], [0], 1, ],
]

df = pd.DataFrame(data, columns=cols)

推論で推薦アイテム一覧を計算する。

In [11]:
# 学習済みモデルを読み込む

model = cloudpickle.load(open(os.path.join(MODEL_PATH, "t4rec_model_class.pkl"), "rb"))

# setup the trainer

# Define categorical and continuous columns to fed to training model
x_cat_names = ['product_id-list', 'category_id-list', 'brand-list']
x_cont_names = ['product_recency_days_log_norm-list', 'et_dayofweek_sin-list', 'et_dayofweek_cos-list',
                'price_log_norm-list', 'relative_price_to_avg_categ_id-list']

train = Dataset(os.path.join(INPUT_DATA_DIR, "processed_nvt/part_0.parquet"))
schema = train.schema
schema = schema.select_by_name(x_cat_names + x_cont_names)

#Set arguments for training
training_args = T4RecTrainingArguments(
            output_dir = "./tmp",
            max_sequence_length=20,
            data_loader_engine='merlin',
            num_train_epochs=3,
            dataloader_drop_last=False,
            per_device_train_batch_size = 256,
            per_device_eval_batch_size = 32,
            gradient_accumulation_steps = 1,
            learning_rate=0.000666,
            report_to = [],
            logging_steps=200,
        )

# Instantiate the T4Rec Trainer, which manages training and evaluation
trainer = Trainer(
    model=model,
    args=training_args,
    schema=schema,
    compute_metrics=True,
)


# predict

model.eval()
with torch.no_grad():
    ds = Dataset(df)
    out = trainer.predict(ds)
    items = out.predictions[0]
    print(items)
    #logits = out.predictions[1]
    #for i in range(len(items)):
    #    print(f"  item#{i}   {items[i]}")
    #    print(f"  logits#{i} {logits[i]}")
    #    print()

[[  27   12    4   11    3    5    6   30   75   10    8    9   14   44
    96  149    7  179  457   32  349   37  104  261   20   19   65   15
  1132  189  441   23  414  204  597   18   16  499  403   39   36   58
  1342  644  264  834   25  262   29  117  131  817  138   34  228   28
   997  115  680 1151 1777   21  176  270  394  150   38  447   41   13
   171   22 1011  120 1574 1123 1630  124   26   95  107  537  121  235
   867  210 1339  435  202  561   24 1219  271  363 1098 1258  967  841
   611  240]
 [   3    4   12   27    5    6   11   10    8    9   14   75    7   30
    32   15   20   96   16   29   36   19   23   13   44  349  104  149
    37  262  179  457   18  441   25   21   41   28  189  403  261   22
    65 1132  414  499   24   48  264  644   53  597   38   34   26   61
    17  108  817   58  131   39   72   84  138   31  176   88  394  115
  1342  204 1151  834  447  150  435  202 1777   62 1014   33  228   83
   125   43  270   40 1574  117 1339  611  121  997

出力は、各セッションに対する推薦アイテム100個。
Workflowで変換されたアイテムIDで、先頭のものほど推薦度合が強い。
また若い番号ほど頻出するので推薦されやすく、かつ推薦度合が強くなりやすいことに留意が必要。

細かい条件で推薦順位が変わっていることがわかる。
特に3番目は2番目と同じアイテムに興味があるユーザーに対して、
異なる曜日には異なるアイテム(`3` ではなく `27`)を推薦していることが見て取れる。

## (参考)Scheme情報

### Workflowの入出力のスキーマ

In [37]:
workflow.input_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged
0,product_id,(),"DType(name='int64', element_type=<ElementType....",False,False
1,category_code,(),"DType(name='object', element_type=<ElementType...",False,False
2,brand,(),"DType(name='object', element_type=<ElementType...",False,False
3,user_id,(),"DType(name='int64', element_type=<ElementType....",False,False
4,event_type,(),"DType(name='object', element_type=<ElementType...",False,False
5,event_time_ts,(),"DType(name='int64', element_type=<ElementType....",False,False
6,prod_first_event_time_ts,(),"DType(name='int64', element_type=<ElementType....",False,False
7,category_id,(),"DType(name='int64', element_type=<ElementType....",False,False
8,price,(),"DType(name='float64', element_type=<ElementTyp...",False,False
9,user_session,(),"DType(name='int64', element_type=<ElementType....",False,False


In [13]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.value_count.min,properties.value_count.max
0,user_session,(Tags.CATEGORICAL),"DType(name='int64', element_type=<ElementType....",False,False,,,,,,,,,,,
1,product_id-count,"(Tags.CATEGORICAL, Tags.ID, Tags.ITEM)","DType(name='int32', element_type=<ElementType....",False,False,,0.0,0.0,.//categories/unique.product_id.parquet,0.0,118335.0,product_id,118336.0,512.0,,
2,product_id-list,"(Tags.LIST, Tags.CATEGORICAL, Tags.ID, Tags.ITEM)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.product_id.parquet,0.0,118335.0,product_id,118336.0,512.0,20.0,20.0
3,category_code-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.category_code.parquet,0.0,125.0,category_code,126.0,24.0,20.0,20.0
4,brand-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.brand.parquet,0.0,2641.0,brand,2642.0,132.0,20.0,20.0
5,category_id-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.category_id.parquet,0.0,567.0,category_id,568.0,56.0,20.0,20.0
6,et_dayofweek_sin-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float64', element_type=<ElementTyp...",True,False,,,,,,,,,,20.0,20.0
7,et_dayofweek_cos-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float64', element_type=<ElementTyp...",True,False,,,,,,,,,,20.0,20.0
8,price_log_norm-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float32', element_type=<ElementTyp...",True,False,,,,,,,,,,20.0,20.0
9,relative_price_to_avg_categ_id-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float64', element_type=<ElementTyp...",True,False,,,,,,,,,,20.0,20.0


### Transfomerモデルの入出力のスキーマ

In [35]:
model.input_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.cat_path,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension,properties.domain.min,properties.domain.max,properties.domain.name,properties.value_count.min,properties.value_count.max
0,product_id-list,"(Tags.ITEM, Tags.CATEGORICAL, Tags.ID, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.product_id.parquet,118336.0,512.0,0.0,118335.0,product_id,20,20
1,category_id-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.category_id.parquet,568.0,56.0,0.0,567.0,category_id,20,20
2,brand-list,"(Tags.CATEGORICAL, Tags.LIST)","DType(name='int64', element_type=<ElementType....",True,False,,0.0,0.0,.//categories/unique.brand.parquet,2642.0,132.0,0.0,2641.0,brand,20,20
3,product_recency_days_log_norm-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float32', element_type=<ElementTyp...",True,False,,,,,,,,,,20,20
4,et_dayofweek_sin-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float64', element_type=<ElementTyp...",True,False,,,,,,,,,,20,20
5,et_dayofweek_cos-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float64', element_type=<ElementTyp...",True,False,,,,,,,,,,20,20
6,price_log_norm-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float32', element_type=<ElementTyp...",True,False,,,,,,,,,,20,20
7,relative_price_to_avg_categ_id-list,"(Tags.CONTINUOUS, Tags.LIST)","DType(name='float64', element_type=<ElementTyp...",True,False,,,,,,,,,,20,20


In [34]:
model.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.int_domain.min,properties.int_domain.max,properties.value_count.min,properties.value_count.max
0,next-item,(),"DType(name='float32', element_type=<ElementTyp...",True,False,118336,118336,118336,118336
