<a href="https://colab.research.google.com/github/kunalsonalkar/transformers-nlp/blob/main/Transformers4Rec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [50]:
import pandas as pd
df = pd.read_parquet('Oct-2019.parquet')

In [51]:
df

Unnamed: 0,event_type,product_id,category_id,category_code,brand,price,user_id,event_time_ts,user_session,prod_first_event_time_ts
0,view,12712529,2053013553559896355,,hankook,70.79,513605798,1569900208000000000,3,1569900208000000000
1,view,12702204,2053013553559896355,,bridgestone,72.07,513605798,1569900523000000000,3,1569900523000000000
2,view,12718922,2053013553559896355,,dunlop,72.59,513605798,1569900702000000000,3,1569900686000000000
3,view,12711730,2053013553559896355,,goodride,45.45,513605798,1569902687000000000,3,1569902687000000000
4,view,12708497,2053013553559896355,,nitto,68.21,513605798,1569902730000000000,3,1569902730000000000
...,...,...,...,...,...,...,...,...,...,...
3632734,view,1003815,2053013555631882655,electronics.smartphone,huawei,772.19,556219359,1570072482000000000,1072337,1569897178000000000
3632735,view,24000062,2053013563332625239,,cafemimi,2.06,525006580,1570010103000000000,1072338,1570010103000000000
3632736,view,1307366,2053013558920217191,computers.notebook,lenovo,248.63,551857167,1569994602000000000,1072339,1569896704000000000
3632737,view,2501614,2053013564003713919,appliances.kitchen.oven,redmond,164.71,306441847,1570073293000000000,1072340,1569893529000000000


In [53]:
import nvtabular as nvt
item_id = ['product_id'] >> nvt.ops.TagAsItemID()
cat_feats = item_id + ['category_code', 'brand', 'user_id', 'category_id', 'event_type'] >> nvt.ops.Categorify()

In [62]:
df['event_time_ts'] = df['event_time_ts'].astype(int)
df['event_time_dt'] = pd.to_datetime(df['event_time_ts'])

In [122]:
session_time = ['event_time_dt']

#session_time = (
#    session_ts >>
#    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >>
    #nvt.ops.LambdaOp(lambda col: cp.asarray(cp.to_datetime(col.to_pandas(), unit='s'))) >>
#    nvt.ops.Rename(name = 'event_time_dt')
#)


sessiontime_weekday = (
    session_time >>
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >>
    nvt.ops.Rename(name ='et_dayofweek')
)

In [123]:
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

def get_cycled_feature_value_cos(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_cos = np.cos(2*np.pi*value_scaled)
    return value_cos

In [124]:
from nvtabular.ops import Operator
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags

In [125]:
weekday_sin = (sessiontime_weekday >>
               (lambda col: get_cycled_feature_value_sin(col+1, 7)) >>
               nvt.ops.Rename(name = 'et_dayofweek_sin') >>
               nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
              )

weekday_cos= (sessiontime_weekday >>
              (lambda col: get_cycled_feature_value_cos(col+1, 7)) >>
              nvt.ops.Rename(name = 'et_dayofweek_cos') >>
              nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
             )

In [126]:
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['prod_first_event_time_ts']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["prod_first_event_time_ts"]

    @property
    def output_dtype(self):
        return np.float64

In [127]:
recency_features = ['event_time_ts'] >> ItemRecency()
recency_features_norm = (recency_features >>
                         nvt.ops.LogOp() >>
                         nvt.ops.Normalize(out_dtype=np.float32) >>
                         nvt.ops.Rename(name='product_recency_days_log_norm')
                        )

In [128]:
time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin +
    weekday_cos +
    recency_features_norm
)

In [129]:
price_log = ['price'] >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32) >> nvt.ops.Rename(name='price_log_norm')

In [130]:
def relative_price_to_avg_categ(col, gdf):
    epsilon = 1e-5
    col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int)
    return col

avg_category_id_pr = ['category_id'] >> nvt.ops.JoinGroupby(cont_cols =['price'], stats=["mean"]) >> nvt.ops.Rename(name='avg_category_id_price')
relative_price_to_avg_category = (
    avg_category_id_pr >>
    nvt.ops.LambdaOp(relative_price_to_avg_categ, dependency=['price']) >>
    nvt.ops.Rename(name="relative_price_to_avg_categ_id") >>
    nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

In [131]:
groupby_feats = ['event_time_ts', 'user_session'] + cat_feats + time_features + price_log + relative_price_to_avg_category

In [132]:
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["user_session"],
    sort_cols=["event_time_ts"],
    aggs={
        'user_id': ['first'],
        'product_id': ["list", "count"],
        'category_code': ["list"],
        'brand': ["list"],
        'category_id': ["list"],
        'event_time_ts': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'et_dayofweek_cos': ["list"],
        'price_log_norm': ["list"],
        'relative_price_to_avg_categ_id': ["list"],
        'product_recency_days_log_norm': ["list"]
        },
    name_sep="-")

In [133]:
groupby_features_list = groupby_features['product_id-list',
        'category_code-list',
        'brand-list',
        'category_id-list',
        'et_dayofweek_sin-list',
        'et_dayofweek_cos-list',
        'price_log_norm-list',
        'relative_price_to_avg_categ_id-list',
        'product_recency_days_log_norm-list']

In [134]:
SESSIONS_MAX_LENGTH = 20
MINIMUM_SESSION_LENGTH = 2

In [135]:
groupby_features_trim = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH, pad=True)

In [136]:
day_index = ((groupby_features['event_time_dt-first'])  >>
             nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >>
             nvt.ops.Rename(f = lambda col: "day_index") >>
             nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
            )

In [137]:
sess_id = groupby_features['user_session'] >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])

selected_features = sess_id + groupby_features['product_id-count'] + groupby_features_trim + day_index

In [138]:
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["product_id-count"] >= MINIMUM_SESSION_LENGTH)

In [140]:
import os

In [141]:
workflow = nvt.Workflow(filtered_sessions)
dataset = nvt.Dataset(df)
workflow.fit_transform(dataset).to_parquet(os.path.join("processed_nvt"))

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-141-3a6587d3bddb>", line 3, in <cell line: 3>
    workflow.fit_transform(dataset).to_parquet(os.path.join("processed_nvt"))
  File "/usr/local/lib/python3.10/dist-packages/merlin/io/dataset.py", line 982, in to_parquet
    _ddf_to_dataset(
  File "/usr/local/lib/python3.10/dist-packages/merlin/io/dask.py", line 420, in _ddf_to_dataset
    out = dask.compute(out, scheduler="synchronous")[0]
  File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 628, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py", line 102, in transform
    transformed_data = self._execute_node(node, transformable, capture_dtypes, strict)
  File "/usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py", lin

TypeError: object of type 'NoneType' has no len()