# Preparing Rees46 eCommerce dataset

## Downloading raw datasets

Dataset description is available at
* https://rees46.com/en/datasets
* https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store

More data is at
* https://drive.google.com/drive/folders/1Nan8X33H8xrXS5XhCKZmSpClFTCJsSpE

In [1]:
from utils import DATASETS_ROOT_DIR
# Directory where those datasets will be downloaded:
DATASETS_ROOT_DIR

2022-10-23 21:01:45.850093: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/cuda/lib:/usr/local/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:
2022-10-23 21:01:45.850173: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


'/home/jupyter/recsys-multi-atrribute-benchmark/datasets'

## Download manually archives from Google drive to `$DATASETS_ROOT_DIR/raw`
For now one need to manually download data from https://drive.google.com/drive/folders/1Nan8X33H8xrXS5XhCKZmSpClFTCJsSpE to `$DATASETS_ROOT_DIR/raw`

TODO: make it automatically

In [2]:
import glob
import os

list(map(os.path.basename, glob.glob(os.path.join(DATASETS_ROOT_DIR, 'raw/rees_ecommerce/*'))))

['2020-Mar.csv.gz',
 '2020-Apr.csv.gz',
 '2020-Feb.csv.gz',
 '2020-Jan.csv.gz',
 '2019-Dec.csv.gz']

## Loading big dataframe in chunks
In this notebook to keep reasonable RAM consumption we load only purchase and add to cart event types, skipping pageviews.

We also directly replace null values by empty strings, and take day resolution for dates.

In [3]:
%%time

import pandas as pd
import numpy as np

_chunks = []
for filename in glob.glob(os.path.join(DATASETS_ROOT_DIR, 'raw/rees_ecommerce/*')):
    with pd.read_csv(filename, chunksize=10 ** 7) as reader:
        for chunk in reader:
            # Setting date resolution to days
            chunk['date'] = chunk['event_time'].str[:10].astype('datetime64[D]')
            # one can subsample pageviews by day
            # counts = chunk.groupby(['event_type', 'user_id', 'date'])['user_id'].transform('count')
            # keep_lines = ((chunk['event_type'] != 'view') | (np.random.uniform(size=len(chunk)) < 5. / counts))
            # here we just filter them out
            keep_lines = (chunk['event_type'] != 'view')
            _chunks.append(chunk[keep_lines].fillna('').drop(['user_session', 'event_time'], axis=1))

del chunk
events = pd.concat(_chunks)
del _chunks
events = events.sort_values('date', ascending=True)

CPU times: user 19min 36s, sys: 1min 11s, total: 20min 48s
Wall time: 20min 54s


In [4]:
events['event_type'].value_counts(dropna=False)

cart        15158617
purchase     5189036
Name: event_type, dtype: int64

We will construct our training dataset as implicit feedback of purchases, so we will restrict all tables to users with at least one purchase

In [5]:
users_with_purchases = events[events['event_type'] == 'purchase']['user_id'].unique()
events = events[np.isin(events['user_id'], users_with_purchases)]

In [6]:
events['event_type'].value_counts(dropna=False)

cart        11786602
purchase     5189036
Name: event_type, dtype: int64

## Choosing features

We will be using following features:
* `product_id`
* `category_code`
* `brand`
* product's `price`

At this point they contain no null (nulls were replaced by empty string)

In [7]:
events[['product_id', 'category_code', 'brand', 'price']].isnull().sum()

product_id       0
category_code    0
brand            0
price            0
dtype: int64

## Feature engineering

Here we will do some simple feature engineering on product attributes:
* parsing `category_code` into 4 levels of categories hierarchy
* clusterize `price`

In [8]:
from toolz import memoize

@memoize
def define_categories(label):
    """
    Parsing category_code labels into 4 levels of categories' hierarchy
    """
    if label == '':
        return '', '', '', ''
    labels = tuple(label.split('.'))
    if len(labels) > 4:
        raise NotImplementedError()
    return labels + ('',) * (4 - len(labels))

events['category1'], events['category2'], events['category3'], events['category4'] = \
    zip(*events['category_code'].map(define_categories))

`category4` is mostly empty, so we won't use that feature

In [9]:
events['category4'].value_counts(dropna=False)

         16962296
piano       13342
Name: category4, dtype: int64

In [10]:
events['category3'].value_counts(dropna=False)[:10]

light            7135027
                 4592235
massager          606955
headphone         432323
refrigerators     380170
vacuum            324322
washer            265951
printer           200670
sandals           180985
slipons           169627
Name: category3, dtype: int64

In [11]:
%%time
from sklearn.cluster import KMeans

kmeans_price = KMeans(n_clusters=200, random_state=42)
kmeans_price.fit(events['price'].sample(5 * 10 ** 5, random_state=43).values.reshape(-1, 1))
cluster_labels = np.round(kmeans_price.cluster_centers_[:, 0], 0).astype(int)
assert len(np.unique(cluster_labels)) == 200
events['priceCluster'] = cluster_labels[kmeans_price.predict(events['price'].values.reshape(-1, 1))]

CPU times: user 2min 11s, sys: 2.65 s, total: 2min 14s
Wall time: 2min 2s


## Encoding features and converting them into `tf.Tensor`

Now let's encode categorical features into ordinal using `tf.keras.layers.StringLookup` and transform out dataset into a dictionary with `tf.Tensor` for each column - result will be kept in `tf_tensors`.

We also keep track of inverse transformation should we want to see a value corresponding to some label - it will be stored in `inverse_lookups` dictionary.

In [12]:
%%time

from utils import get_tensorflow_dataset

item_features = ['product_id', 'category1', 'category2', 'category3', 'brand', 'priceCluster']

tf_tensors, inverse_lookups = get_tensorflow_dataset(events, item_features,
                                                     date_column='date', keep_columns=['user_id', 'event_type'])

2022-10-23 21:28:47.940226: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/cuda/lib:/usr/local/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:
2022-10-23 21:28:47.982594: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-10-23 21:28:48.005438: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (fb5e7a735510): /proc/driver/nvidia/version does not exist


Encoding product_id column
Reserving labels for 28113 categories out of 180632
Encoding category1 column
Reserving labels for 14 categories out of 14
Encoding category2 column
Reserving labels for 61 categories out of 63
Encoding category3 column
Reserving labels for 91 categories out of 92
Encoding brand column
Reserving labels for 2562 categories out of 5015
Encoding priceCluster column
Reserving labels for 200 categories out of 200
CPU times: user 1min 8s, sys: 2.13 s, total: 1min 10s
Wall time: 1min 11s


2022-10-23 21:28:48.303043: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [13]:
from utils import gather_structure
gather_structure(tf_tensors, [0, 1])

{'date': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([18231, 18231], dtype=int32)>,
 'user_id': <tf.Tensor: shape=(2,), dtype=int64, numpy=array([579849385, 560376695])>,
 'event_type': <tf.Tensor: shape=(2,), dtype=string, numpy=array([b'cart', b'purchase'], dtype=object)>,
 'product_id': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([3390,   19], dtype=int32)>,
 'category1': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([10, 10], dtype=int32)>,
 'category2': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([10, 10], dtype=int32)>,
 'category3': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([10, 10], dtype=int32)>,
 'brand': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([103,  11], dtype=int32)>,
 'priceCluster': <tf.Tensor: shape=(2,), dtype=int32, numpy=array([ 61, 165], dtype=int32)>}

In [14]:
inverse_lookups['category2']

<keras.layers.preprocessing.string_lookup.StringLookup at 0x7fa64f87f110>

In [15]:
inverse_lookups['category2'](tf_tensors['category2'][:2])

<tf.Tensor: shape=(2,), dtype=string, numpy=array([b'tools', b'tools'], dtype=object)>

For film ids we can replace reverse mapping by film names for more readability if needed:

In [16]:
from utils import save_inverse_lookups

save_inverse_lookups(inverse_lookups, os.path.join(DATASETS_ROOT_DIR, 'rees_ecommerce/inverse_lookups.pickle'))

In [17]:
del inverse_lookups

## Changing format to event sequences by user

From this point we won't need `pd.DataFrame` anymore and will work with tensorflow objects

In [18]:
del events

Then, let's define for each user and for each type of event (purchase, add_to_cart, pageview) a sequence of corresponding events

In [19]:
import tensorflow as tf

from utils import get_user_sequences, boolean_mask_structure

tf_tensors = {key.decode(): boolean_mask_structure(tf_tensors, tf_tensors['event_type'] == key)
              for key in tf.unique(tf_tensors['event_type'])[0].numpy()}
tf_tensors = get_user_sequences(tf_tensors, 'purchase', 'user_id')

for tensors_dict in tf_tensors.values():
    # no need in event_type it is now in dict keys
    del tensors_dict['event_type']

Now our dict contains additional technical key `_user_index` that encodes what events correspond to what user. To get sequences of events one can simply gather a feature's values using this index:

In [20]:
type(tf_tensors['purchase']['_user_index'])

tensorflow.python.ops.ragged.ragged_tensor.RaggedTensor

First dimension of this tensor corresponds to unique users, second corresponds to line numbers of events for a given user. By taking bounding shape we see number of unique users and maximal number of ratings done by one user in train dataset:

In [21]:
tf_tensors['purchase']['_user_index'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([1635044,    1978], dtype=int32)>

We use same indexing of users for all event types: indexing is based on purchase:

In [22]:
tf_tensors['cart']['_user_index'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([1635044,    2187], dtype=int32)>

Thus we can have empty sequences:

In [23]:
tf.reduce_any(tf_tensors['cart']['_user_index'].row_lengths() == 0)

<tf.Tensor: shape=(), dtype=bool, numpy=True>

Each event type have local indices (this property will be kept for each batch) starting from 0:

In [24]:
tf_tensors['purchase']['_user_index'][:3, :5]

<tf.RaggedTensor [[0, 203, 148584, 627030, 696926], [1, 7751, 9146, 9192, 9353], [2]]>

In [25]:
tf_tensors['cart']['_user_index'][:3, :5]

<tf.RaggedTensor [[83, 267, 434, 831, 332907], [74, 18608, 21758, 21803, 21841],
 [159, 633, 3067810]]>

To get all categories added by user to cart it is enough to gather on correspondant tensor. In this example we limit to 3 users, 5 categories:

In [26]:
tf.gather(tf_tensors['cart']['category2'], tf_tensors['cart']['_user_index'][:3, :5])

<tf.RaggedTensor [[10, 10, 10, 10, 24], [21, 10, 24, 24, 13], [12, 12, 10]]>

## Split into train/val/test

In [37]:
import importlib
import sys
importlib.reload(sys.modules['utils'])

<module 'utils' from '/home/jupyter/recsys-multi-atrribute-benchmark/dataset_preprocessing/utils.py'>

In [38]:
from functools import partial
from toolz import valmap

from utils import restrict_to_user_index_subset

permutation = tf.random.shuffle(tf.range(tf_tensors['purchase']['_user_index'].shape[0]), seed=1729)
tensors = {
    'test': valmap(partial(restrict_to_user_index_subset, indices=permutation[:200000]), tf_tensors),
    'val': valmap(partial(restrict_to_user_index_subset, indices=permutation[200000:400000]), tf_tensors),
    'train': valmap(partial(restrict_to_user_index_subset, indices=permutation[400000:]), tf_tensors)
}

del tf_tensors

CPU times: user 9.55 s, sys: 398 ms, total: 9.95 s
Wall time: 4.72 s


In [56]:
tensors['test']['cart']['_user_index'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([200000,   1074], dtype=int32)>

In [57]:
tensors['train']['cart']['_user_index'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([1235044,    2187], dtype=int32)>

## Batching by users

For further operation let's transform dicts into `tf.data.Dataset` batched by users

In [59]:
from utils import batch_by_user

datasets = {}
for split_name, tensors_dict_by_event in tensors.items():
    datasets[split_name] = batch_by_user(tensors_dict_by_event, 'purchase', 5 * 10 ** 3, seed=12345)
    
del tensors



Now we have `tf.data.Dataset`

In [60]:
type(datasets['train'])

tensorflow.python.data.ops.dataset_ops.ConcatenateDataset

where each batch contain 5000 unique users and batch values is a dict with event type as key:

In [61]:
first_batch = next(iter(datasets['train']))

In [62]:
first_batch.keys()

dict_keys(['cart', 'purchase'])

In [63]:
first_batch['purchase'].keys()

dict_keys(['_user_index', 'date', 'user_id', 'product_id', 'category1', 'category2', 'category3', 'brand', 'priceCluster'])

In [64]:
first_batch['purchase']['_user_index'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([5000,  406], dtype=int32)>

For now we have less users in last batch

In [65]:
for last_batch in datasets['train']:
    pass
last_batch['purchase']['_user_index'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([44, 10], dtype=int32)>

Note that we kept local batch indexing

In [66]:
last_batch['purchase']['_user_index'][:3, :5]

<tf.RaggedTensor [[0, 1, 2, 3, 4], [5, 6, 7], [8]]>

## Saving raw dataset

In [67]:
%%time
for split_name, dataset in datasets.items():
    tf.data.experimental.save(dataset, os.path.join(DATASETS_ROOT_DIR, f'rees_ecommerce/raw_{split_name}_dataset.tf'),
                              compression='GZIP')

CPU times: user 42.4 s, sys: 176 ms, total: 42.6 s
Wall time: 42.5 s


## Aggregate preceding events

Till now we have only features describing items. To describe users let's consider following features:
* for each `user_id`, `date` we look at events done on previous dates
* independently for each item feature we construct lists of those features corresponding to preceding events

So if a user have rated some films

| **product_id** | **category**   | **date** |
|----------------|-------------|----------|
| `1`            | Shoes       | `20/01`  |
| `9`            | Phone       | `25/01`  |
| `8`            | Shoes       | `25/01`  |
| `3`            | Books       | `30/01`  |

we will construct `aggregated_product_id` and `aggregated_category` features as

| **aggregated_product_id** | **aggregated_category**    | **date** |
|---------------------------|----------------------------|----------|
| []                        | []                         | `20/01`  |
| [`1`]                     | [Shoes]                    | `25/01`  |
| [`1`]                     | [Shoes]                    | `25/01`  |
| [`1`, `9`, `8`]           | [Shoes, Phone, Shoes]      | `30/01`   |

and so for each user, for each date corresponding to events we want to predict (`purchase` here)

In [68]:
from functools import partial

from utils import aggregate_preceding_events

aggregated_datesets = {}
for split_name, dataset in datasets.items():
    agg_func = partial(aggregate_preceding_events, target='purchase', item_features=item_features,
                       user_id_column='user_id', date_column='date')
    aggregated_datesets[split_name] = dataset.map(agg_func, num_parallel_calls=2, deterministic=True)

In [69]:
first_batch = next(iter(aggregated_datesets['train']))
first_batch.keys()

dict_keys(['aggregated_cart_product_id', 'aggregated_cart_category1', 'aggregated_cart_category2', 'aggregated_cart_category3', 'aggregated_cart_brand', 'aggregated_cart_priceCluster', 'aggregated_purchase_product_id', 'aggregated_purchase_category1', 'aggregated_purchase_category2', 'aggregated_purchase_category3', 'aggregated_purchase_brand', 'aggregated_purchase_priceCluster', 'product_id', 'category1', 'category2', 'category3', 'brand', 'priceCluster', 'user_id', 'date'])

So resulting structure contains
* aggregated historical features: for each user (1st dim), each target event (2nd dim) we have a list of previous event's attributes (3rd dim)
* raw item features, user id, date: for each user (1st dim) we have a list of target events (2nd dim)

In [70]:
first_batch['aggregated_cart_category1'].bounding_shape()

<tf.Tensor: shape=(3,), dtype=int32, numpy=array([5000,  406,  100], dtype=int32)>

In [71]:
first_batch['category1'].bounding_shape()

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([5000,  406], dtype=int32)>

## Saving resulting datasets

In [72]:
%%time
for split_name, dataset in aggregated_datesets.items():
    tf.data.experimental.save(dataset, os.path.join(DATASETS_ROOT_DIR, f'rees_ecommerce/aggregated_{split_name}_dataset.tf'),
                              compression='GZIP')

CPU times: user 13min 55s, sys: 7.08 s, total: 14min 2s
Wall time: 4min 34s
