# ETL with NVTabular

* following tutotial [here](https://github.com/NVIDIA-Merlin/Transformers4Rec/blob/main/examples/end-to-end-session-based/01-ETL-with-NVTabular.ipynb)

In [None]:
# !gsutil cp gs://kaggle-yoochoose-data/yoochoose-clicks.dat ./data

In [3]:
import os
import glob

import numpy as np
import pandas as pd
import gc

import cudf
import cupy

import nvtabular as nvt
from nvtabular.ops import *
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags

In [4]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
PROJECT_NUM = !gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)"
PROJECT_NUM = PROJECT_NUM[0]
LOCATION = 'us-central1'
REGION = "us-central1"

# VERTEX_SA = '934903580331-compute@developer.gserviceaccount.com'
VERTEX_SA = 'jt-vertex-sa@hybrid-vertex.iam.gserviceaccount.com'

print(f"PROJECT_ID: {PROJECT_ID}")
print(f"PROJECT_NUM: {PROJECT_NUM}")
print(f"LOCATION: {LOCATION}")
print(f"REGION: {REGION}")
print(f"VERTEX_SA: {VERTEX_SA}")

PROJECT_ID: hybrid-vertex
PROJECT_NUM: 934903580331
LOCATION: us-central1
REGION: us-central1
VERTEX_SA: jt-vertex-sa@hybrid-vertex.iam.gserviceaccount.com


In [35]:
DATA_FOLDER='data'
FILENAME_PATTERN = 'yoochoose-clicks.dat'
DATA_PATH=f'{DATA_FOLDER}/{FILENAME_PATTERN}'

WORKFLOW_DIR='workflows'

TRANSFORMED_WORKFLOW=f'{WORKFLOW_DIR}/processed_nvt'
OUTPUT_WORKFLOW_DIR=f'{WORKFLOW_DIR}/workflow_etl'

OUTPUT_DIR=f'{DATA_FOLDER}/sessions_by_day'
OVERWRITE = False

print(f"DATA_FOLDER: {DATA_FOLDER}")
print(f"DATA_PATH: {DATA_PATH}")
print(f"TRANSFORMED_WORKFLOW: {TRANSFORMED_WORKFLOW}")
print(f"OUTPUT_WORKFLOW_DIR: {OUTPUT_WORKFLOW_DIR}")
print(f"OUTPUT_DIR: {OUTPUT_DIR}")

DATA_FOLDER: data
DATA_PATH: data/yoochoose-clicks.dat
TRANSFORMED_WORKFLOW: workflows/processed_nvt
OUTPUT_WORKFLOW_DIR: workflows/workflow_etl
OUTPUT_DIR: data/sessions_by_day


In [36]:
# ! rm -rf {WORKFLOW_DIR}
# ! rm -rf {TRANSFORMED_WORKFLOW}
# ! rm -rf {OUTPUT_WORKFLOW_DIR}
# ! rm -rf {OUTPUT_FOLDER}

! mkdir {WORKFLOW_DIR}
! mkdir {OUTPUT_DIR}
! mkdir {TRANSFORMED_WORKFLOW}
! mkdir {OUTPUT_WORKFLOW_DIR}

mkdir: cannot create directory ‘data/sessions_by_day’: File exists


In [58]:
# !ls
! rm -rf {OUTPUT_DIR}
! mkdir {OUTPUT_DIR}

## Load and clean raw data

In [37]:
interactions_df = cudf.read_csv(
    DATA_PATH, 
    sep=',', 
    names=['session_id','timestamp', 'item_id', 'category'], 
    dtype=['int', 'datetime64[s]', 'int', 'int']
)

In [38]:
interactions_df.head()

Unnamed: 0,session_id,timestamp,item_id,category
0,1,2014-04-07 10:51:09,214536502,0
1,1,2014-04-07 10:54:09,214536500,0
2,1,2014-04-07 10:54:46,214536506,0
3,1,2014-04-07 10:57:00,214577561,0
4,2,2014-04-07 13:56:37,214662742,0


In [39]:
print("Count with in-session repeated interactions: {}".format(len(interactions_df)))

# Sorts the dataframe by session and timestamp, to remove consecutive repetitions
interactions_df.timestamp = interactions_df.timestamp.astype(int)
interactions_df = interactions_df.sort_values(['session_id', 'timestamp'])
past_ids = interactions_df['item_id'].shift(1).fillna()
session_past_ids = interactions_df['session_id'].shift(1).fillna()

# Keeping only no consecutive repeated in session interactions
interactions_df = interactions_df[~((interactions_df['session_id'] == session_past_ids) & (interactions_df['item_id'] == past_ids))]

print("Count after removed in-session repeated interactions: {}".format(len(interactions_df)))

Count with in-session repeated interactions: 33003944
Count after removed in-session repeated interactions: 28971543


In [40]:
items_first_ts_df = interactions_df.groupby('item_id').agg({'timestamp': 'min'}).reset_index().rename(columns={'timestamp': 'itemid_ts_first'})

interactions_merged_df = interactions_df.merge(items_first_ts_df, on=['item_id'], how='left')

interactions_merged_df.head()

Unnamed: 0,session_id,timestamp,item_id,category,itemid_ts_first
0,10698,1396387353,214717007,0,1396321524
1,10698,1396387465,214716928,0,1396321848
2,10698,1396388002,214717007,0,1396321524
3,10698,1396389055,214717003,0,1396322598
4,10698,1396389088,214716926,0,1396321393


In [14]:
# interactions_merged_df.to_parquet(f'{DATA_FOLDER}/interactions_merged_df.parquet')

In [41]:
# free gpu memory
del interactions_df, session_past_ids, items_first_ts_df
gc.collect()

19

## Define a preprocessing workflow with NVTabular

In [42]:
# Encodes categorical features as contiguous integers
cat_feats = ColumnSelector(['session_id', 'category', 'item_id']) >> nvt.ops.Categorify(start_index=1)

# create time features
session_ts = ColumnSelector(['timestamp'])
session_time = (
    session_ts >> 
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >> 
    nvt.ops.Rename(name = 'event_time_dt')
)
sessiontime_weekday = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >> 
    nvt.ops.Rename(name ='et_dayofweek')
)

# Derive cyclical features: Define a custom lambda function 
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')

# Compute Item recency: Define a custom Op 
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['itemid_ts_first']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["itemid_ts_first"]

    @property
    def output_dtype(self):
        return np.float64
    
recency_features = session_ts >> ItemRecency() 
# Apply standardization to this continuous feature
recency_features_norm = recency_features >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32) >> nvt.ops.Rename(name='product_recency_days_log_norm')

time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin + 
    recency_features_norm
)

features = ColumnSelector(['timestamp', 'session_id']) + cat_feats + time_features 

### Define the preprocessing of sequential features

Once the item features are generated, the objective of this cell is to group interactions at the session level, sorting the interactions by time. We additionally truncate all sessions to first 20 interactions and filter out sessions with less than 2 interactions.

In [43]:
# Define Groupby Operator
groupby_features = features >> nvt.ops.Groupby(
    groupby_cols=["session_id"], 
    sort_cols=["timestamp"],
    aggs={
        'item_id': ["list", "count"],
        'category': ["list"],  
        'timestamp': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'product_recency_days_log_norm': ["list"]
        },
    name_sep="-") >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])


# Truncate sequence features to first interacted 20 items 
SESSIONS_MAX_LENGTH = 20 

groupby_features_list = groupby_features['item_id-list', 'category-list', 'et_dayofweek_sin-list', 'product_recency_days_log_norm-list']
groupby_features_truncated = groupby_features_list >> nvt.ops.ListSlice(0, SESSIONS_MAX_LENGTH, pad=True) >> nvt.ops.Rename(postfix = '_seq')

# Calculate session day index based on 'event_time_dt-first' column
day_index = ((groupby_features['event_time_dt-first'])  >> 
    nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >> 
    nvt.ops.Rename(f = lambda col: "day_index")
)

# Select features for training 
selected_features = groupby_features['session_id', 'item_id-count'] + groupby_features_truncated + day_index

# Filter out sessions with less than 2 interactions 
MINIMUM_SESSION_LENGTH = 2
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH) 

Avoid Numba low occupancy warnings:

In [44]:
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

### Execute NVTabular workflow

* Once we have defined the general workflow (`filtered_sessions`), we provide our cudf dataset to `nvt.Dataset` class which is optimized to split data into chunks that can fit in device memory and to handle the calculation of complex global statistics 
* Then, we execute the pipeline that fits and transforms data to get the desired output features.

In [45]:
dataset = nvt.Dataset(interactions_merged_df)
workflow = nvt.Workflow(filtered_sessions)
# Learn features statistics necessary of the preprocessing workflow
workflow.fit(dataset)
# Apply the preprocessing workflow in the dataset and convert the resulting Dask cudf dataframe to a cudf dataframe
sessions_gdf = workflow.transform(dataset).compute()

In [46]:
sessions_gdf.head()

Unnamed: 0,session_id,item_id-count,item_id-list_seq,category-list_seq,et_dayofweek_sin-list_seq,product_recency_days_log_norm-list_seq,day_index
0,2,200,"[2223, 2125, 1800, 123, 3030, 1861, 1076, 1285...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, ...","[1.1285199e-06, 1.1285199e-06, 1.1285199e-06, ...","[-1.1126341, -0.9665389, -0.1350116, -0.127809...",27
1,3,200,"[34959, 24004, 32503, 39480, 28132, 47339, 351...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, ...","[0.43388295, 0.43388295, 0.43388295, 0.4338829...","[0.3110803, 0.475488, -3.0278225, -3.0278225, ...",58
2,4,200,"[23212, 30448, 16468, 2052, 22490, 31097, 6243...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, ...","[0.9749277, 0.9749277, 0.9749277, 0.9749277, 0...","[0.6801631, 0.7174695, 0.7185285, 0.7204116, 0...",71
3,5,200,"[230, 451, 732, 1268, 2014, 567, 497, 439, 338...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, ...","[0.43388295, 0.43388295, 0.43388295, 0.4338829...","[1.3680888, -0.6530481, -0.69314253, -0.590593...",149
4,6,200,"[23, 70, 160, 70, 90, 742, 851, 359, 734, 878,...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, ...","[0.43388295, 0.43388295, 0.43388295, 0.4338829...","[1.3714824, 1.3715883, 1.3715737, 1.3715955, 1...",149


In [47]:
workflow.fit_transform(dataset).to_parquet(TRANSFORMED_WORKFLOW)

In [48]:
!ls $TRANSFORMED_WORKFLOW

_file_list.txt	_metadata  _metadata.json  part_0.parquet  schema.pbtxt


In [49]:
# TRANSFORMED_WORKFLOW='workflow_etl'

workflow.save(OUTPUT_WORKFLOW_DIR)

!ls $OUTPUT_WORKFLOW_DIR

categories  metadata.json  workflow.pkl


In [50]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension
0,session_id,(Tags.CATEGORICAL),int64,False,False,,0.0,0.0,1.0,.//categories/unique.session_id.parquet,0.0,9249730.0,session_id,9249731.0,512.0
1,item_id-count,(Tags.CATEGORICAL),int32,False,False,,0.0,0.0,1.0,.//categories/unique.item_id.parquet,0.0,52740.0,item_id,52741.0,512.0
2,item_id-list_seq,"(Tags.CATEGORICAL, Tags.LIST)",int64,True,False,,0.0,0.0,1.0,.//categories/unique.item_id.parquet,0.0,52740.0,item_id,52741.0,512.0
3,category-list_seq,"(Tags.CATEGORICAL, Tags.LIST)",int64,True,False,,0.0,0.0,1.0,.//categories/unique.category.parquet,0.0,335.0,category,336.0,42.0
4,et_dayofweek_sin-list_seq,"(Tags.CATEGORICAL, Tags.LIST)",float32,True,False,,,,,,,,,,
5,product_recency_days_log_norm-list_seq,"(Tags.CATEGORICAL, Tags.LIST)",float32,True,False,,,,,,,,,,
6,day_index,(Tags.CATEGORICAL),int64,False,False,,,,,,,,,,


## Export pre-processed data by day

In [59]:
sessions_gdf = sessions_gdf[sessions_gdf.day_index>=178]
sessions_gdf.shape

(106246, 7)

In [60]:
from transformers4rec.data.preprocessing import save_time_based_splits

save_time_based_splits(
    data=nvt.Dataset(sessions_gdf),
    output_dir=OUTPUT_DIR, # TODO: parameterize
    partition_col='day_index',
    timestamp_col='session_id', 
)

Creating time-based splits: 100% 5/5 [00:00<00:00,  8.47it/s]


In [61]:
def list_files(startpath):
    """
    Util function to print the nested structure of a directory
    """
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, "").count(os.sep)
        indent = " " * 4 * (level)
        print("{}{}/".format(indent, os.path.basename(root)))
        subindent = " " * 4 * (level + 1)
        for f in files:
            print("{}{}".format(subindent, f))

In [62]:
list_files(OUTPUT_DIR)

sessions_by_day/
    179/
        valid.parquet
        train.parquet
        test.parquet
    182/
        valid.parquet
        train.parquet
        test.parquet
    180/
        valid.parquet
        train.parquet
        test.parquet
    181/
        valid.parquet
        train.parquet
        test.parquet
    178/
        valid.parquet
        train.parquet
        test.parquet


In [30]:
# free gpu memory
del  sessions_gdf
gc.collect()

579

## Checking the preprocessed outputs

In [63]:
TEST_DATA = f'{OUTPUT_DIR}/178/test.parquet'
print(f"TEST_DATA: {TEST_DATA}")

TEST_DATA: data/sessions_by_day/178/test.parquet


In [64]:
df = pd.read_parquet(TEST_DATA)
df

Unnamed: 0,session_id,item_id-count,item_id-list_seq,category-list_seq,et_dayofweek_sin-list_seq,product_recency_days_log_norm-list_seq
3,1281,71,"[3052, 2705, 1216, 1467, 4913, 4705, 8798, 132...","[2, 5, 2, 2, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 2, ...","[-0.43388462, -0.43388462, -0.43388462, -0.433...","[-0.6125705, 1.5240668, -0.5935936, 1.5236577,..."
14,1959,63,"[804, 1342, 836, 202, 2848, 10318, 9488, 10318...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, ...","[-0.43388462, -0.43388462, -0.43388462, -0.433...","[-0.7107696, -0.55946445, 1.5232942, 1.5224572..."
21,2845,56,"[2586, 429, 1945, 2365, 51, 423, 650, 11850, 1...","[5, 2, 2, 2, 2, 2, 2, 3, 3, 14, 2, 2, 2, 2, 13...","[-0.43388462, -0.43388462, -0.43388462, -0.433...","[1.5229999, -0.5597885, 1.5228726, 1.5231084, ..."
30,4621,48,"[425, 293, 2816, 736, 1889, 293, 1889, 1625, 2...","[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, ...","[-0.43388462, -0.43388462, -0.43388462, -0.433...","[1.5255986, 1.4784576, 1.5253938, 1.525775, -1..."
39,5990,44,"[597, 713, 338, 620, 1219, 4284, 12239, 14743,...","[2, 2, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 4, 4, 4, ...","[-0.43388462, -0.43388462, -0.43388462, -0.433...","[-0.5477065, -0.2365508, 0.42582276, -0.943305..."
...,...,...,...,...,...,...
35550,6756357,2,"[855, 33, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[-0.43388462, -0.43388462, 0.0, 0.0, 0.0, 0.0,...","[-0.63608223, -0.53757286, 0.0, 0.0, 0.0, 0.0,..."
35553,6756365,2,"[814, 1806, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0...","[2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[-0.43388462, -0.43388462, 0.0, 0.0, 0.0, 0.0,...","[1.5238583, 1.5231452, 0.0, 0.0, 0.0, 0.0, 0.0..."
35572,6756450,2,"[166, 1540, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0...","[2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[-0.43388462, -0.43388462, 0.0, 0.0, 0.0, 0.0,...","[-0.18613431, 1.5255576, 0.0, 0.0, 0.0, 0.0, 0..."
35575,6756457,2,"[1950, 2098, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[-0.43388462, -0.43388462, 0.0, 0.0, 0.0, 0.0,...","[-0.4119286, -0.2083148, 0.0, 0.0, 0.0, 0.0, 0..."


In [65]:
import gc
del df
gc.collect()

2541