# Data Preprocessing

In [1]:
!pip install polars



In [2]:
import polars as pl

train = pl.read_parquet('../input/otto-train-and-test-data-for-local-validation/train.parquet')
validA = pl.read_parquet('../input/otto-train-and-test-data-for-local-validation/test.parquet')
validB = pl.read_parquet('../input/otto-train-and-test-data-for-local-validation/test_labels.parquet')

In [3]:
print(train.dtypes)
print(validA.dtypes)

[Int32, Int32, Int32, UInt8]
[Int32, Int32, Int32, UInt8]


In [4]:
import numpy as np

# Get subset of data 
fraction_of_sessions = 0.2

train_sessions = train['session'].sample(fraction=fraction_of_sessions, seed=42)
train = train.filter(pl.col("session").is_in(train_sessions))
train = train.sort("session")

validation_sessions = validA['session'].sample(fraction=fraction_of_sessions, seed=42)
validA = validA.filter(pl.col("session").is_in(validation_sessions))
validA = validA.sort("session")

validB = validB.filter(pl.col("session").is_in(validation_sessions))
validB = validB.sort("session")

max_ts, min_ts = max(train['ts']), min(train['ts'])

print(train.shape[0], validA.shape[0], validB.shape[0])

print(train, validA, validB)

152268691 5763556 1007030
shape: (152_268_691, 4)
┌──────────┬─────────┬────────────┬──────┐
│ session  ┆ aid     ┆ ts         ┆ type │
│ ---      ┆ ---     ┆ ---        ┆ ---  │
│ i32      ┆ i32     ┆ i32        ┆ u8   │
╞══════════╪═════════╪════════════╪══════╡
│ 0        ┆ 1517085 ┆ 1659304800 ┆ 0    │
│ 0        ┆ 1563459 ┆ 1659304904 ┆ 0    │
│ 0        ┆ 1309446 ┆ 1659367439 ┆ 0    │
│ 0        ┆ 16246   ┆ 1659367719 ┆ 0    │
│ …        ┆ …       ┆ …          ┆ …    │
│ 11098496 ┆ 219035  ┆ 1661119183 ┆ 0    │
│ 11098507 ┆ 1195266 ┆ 1661119189 ┆ 0    │
│ 11098512 ┆ 8664    ┆ 1661119192 ┆ 0    │
│ 11098522 ┆ 1524949 ┆ 1661119197 ┆ 0    │
└──────────┴─────────┴────────────┴──────┘ shape: (5_763_556, 4)
┌──────────┬─────────┬────────────┬──────┐
│ session  ┆ aid     ┆ ts         ┆ type │
│ ---      ┆ ---     ┆ ---        ┆ ---  │
│ i32      ┆ i32     ┆ i32        ┆ u8   │
╞══════════╪═════════╪════════════╪══════╡
│ 11098529 ┆ 1105029 ┆ 1661119200 ┆ 0    │
│ 11098531 ┆ 452188  ┆ 16

## Candidate and Feature Generation

In [5]:
# Simple candidate generator - used to benchmark results
def generate_candidates_simple(df, dict_format=False):
    df = df.groupby(['session', 'aid']).agg(pl.count())
    df = df.sort('session', 'count', descending=[False, True]).groupby('session').head(50)
    if dict_format:
        df = df.groupby('session').agg(pl.col("aid"))
        result = {}
        for session, aid_list in zip(df['session'].to_list(), df['aid'].to_list()):
            result[session] = aid_list 
        return result
    
    return df.select('session', 'aid').sort(['session', 'aid'])

In [6]:
type_weight = {0: 1, 1: 6, 2: 3}

def generate_covisit(df, event_type):
    # look at carts and orders only when generating buy2buy covisit matrix
    if event_type == 2:
        df = df.filter(pl.col('type').is_in([1, 2]))
    # sort by session_id and recency
    df = df.sort(['session', 'ts'], descending=[False, True])
    # get most recent events for each session, otherwise merge is too costly
    df = df.groupby('session').tail(10)
    # find pairs of items that were accessed within a day of each other
    df = df.join(df, on='session', suffix="_next")
    df = df.filter((pl.col("aid") != pl.col("aid_next")) & 
                   (pl.col("type") != pl.col("type_next")) &
                   (pl.col("ts") != pl.col("ts_next")))
    df = df.with_columns(((pl.col("ts_next") - pl.col("ts")) / (24 * 60 * 60 * 1000)).alias("days_elapsed"))
    df = df.filter((pl.col("days_elapsed") >= 0) & (pl.col("days_elapsed") <= 1))
    
    # used for predicting clicks given past clicks/carts/orders
    if event_type == 0:
        # weight is combination of count and recency (represented by timestamp with max-min scaling)
        df = df.with_columns(weight = (1 + 3*(pl.col("ts") - min_ts)/(max_ts - min_ts)))
        df = df.groupby(['aid', 'aid_next']).agg(pl.sum("weight"))
    # used for predicting future cart/orders given past carts/orders
    elif event_type == 1:
        # pairs are weighted according to event type
        df = df.with_columns(pl.col("type").map_dict(type_weight).alias("weight"))
        df = df.groupby(['aid', 'aid_next']).agg(pl.sum("weight"))
    elif event_type == 2:
        # count of number of occurences of each pair
        df = df.groupby(['aid', 'aid_next']).agg(pl.count().alias('weight'))
    
    df = df.sort(['aid', 'aid_next', 'weight'], descending=[False, False, True]).groupby('aid').head(40).select(['aid', 'aid_next'])
    df = df.groupby('aid').agg(pl.col('aid_next'))
    covisit = {}
    for aid, aid_list in zip(df["aid"], df["aid_next"]):
        covisit[aid] = aid_list.to_list()
    
    return covisit

In [7]:
from collections import Counter

type_weight = {0: 1, 1: 6, 2: 3}

def generate_candidates(df, covisit):
    aids = df["aid"].to_list()
    types = df["type"].to_list()
    unique_aids = list(dict.fromkeys(aids[::-1]))

    time_weights = np.logspace(0.1,1,len(aids),base=2, endpoint=True)-1
    aids_counter = {}
    for aid, w, t in zip(aids, time_weights, types):
        aids_counter[aid] = aids_counter.get(aid, 0) + w * type_weight[t]

    aids_counter_sorted = sorted(aids_counter.items(), key=lambda x: x[1])
    candidates = [k for k, v in aids_counter_sorted]

    if len(candidates) <= 20:
        secondary_candidates_counter = Counter()
        for candidate in candidates:
            secondary_candidates_counter.update(covisit.get(aid, []))  
        secondary_candidates = [k for k, v in secondary_candidates_counter.most_common(40)]
        return candidates[:40] + secondary_candidates[:(40 - len(candidates))]

    return candidates[:40]

In [8]:
# Generate Session Features 
def generate_session_features(train_df_user):
    df_session_grouped = train_df_user.sort('session', 'ts', descending=[False, True]).groupby('session')
    df_session_grouped_clicks = train_df_user.filter(train_df_user['type'] == 0).sort('session', 'ts', descending=[False, True]).groupby('session')
    df_session_grouped_carts = train_df_user.filter(train_df_user['type'] == 1).sort('session', 'ts', descending=[False, True]).groupby('session')
    df_session_grouped_orders = train_df_user.filter(train_df_user['type'] == 2).sort('session', 'ts', descending=[False, True]).groupby('session')

    # last event type and aid
    last_event_type = df_session_grouped.agg(last_type=pl.col('type').first())

    # length of session
    session_length = df_session_grouped.agg(session_length=pl.col('ts').first() - pl.col('ts').last())

    # duplication rate

    # number of clicks, carts, orders, events that session
    clicks_ratio = df_session_grouped_clicks.agg(clicks_ratio=pl.col('type').count())
    carts_ratio = df_session_grouped_carts.agg(carts_ratio=pl.col('type').count())
    orders_ratio = df_session_grouped_orders.agg(orders_ratio=pl.col('type').count())
    events_ratio = df_session_grouped.agg(events_ratio=pl.col('type').count())

    return [last_event_type, session_length, events_ratio, clicks_ratio, carts_ratio, orders_ratio]

In [9]:
# AID FEATURES
train_df_item = pl.concat([train, validA])

# total interaction for aid
interaction_counts = train_df_item['aid'].value_counts().sort(by='counts').rename({"counts":"interaction_count"})

# click for aid
click_counts = train_df_item.filter(train_df_item['type'] == 0).groupby('aid').agg(click_count=pl.count('type'))

# cart for aid
cart_counts = train_df_item.filter(train_df_item['type'] == 1).groupby('aid').agg(cart_count=pl.count('type'))

# buy for aid
buy_counts = train_df_item.filter(train_df_item['type'] == 2).groupby('aid').agg(buy_count=pl.count('type'))

# last ts for aid
last_ts = train_df_item.groupby('aid').agg(last_ts=pl.col('ts').max())

# click, cart, buy ratios
global_events_ratio = click_counts.join(cart_counts, on='aid').join(buy_counts, on='aid')
global_events_ratio = global_events_ratio.with_columns(click2cart = pl.col('click_count') / pl.col('cart_count'))
global_events_ratio = global_events_ratio.with_columns(cart2buy = pl.col('cart_count') / pl.col('buy_count'))
global_events_ratio = global_events_ratio.with_columns(click2buy = pl.col('click_count') / pl.col('buy_count'))
global_events_ratio = global_events_ratio.drop('click_count').drop('cart_count').drop('buy_count')

# interaction rate in last 7 days
last_ts_int = train_df_item['ts'].max()
last_7_days_count = train_df_item.filter(train_df_item['ts'] >= last_ts_int - (7 * 24 * 60 * 60))['aid'].value_counts().sort(by='counts').rename({"counts":"last7days_count"})
merged = interaction_counts.join(last_7_days_count, on='aid')
merged = merged.with_columns(last7days_interaction_rate = pl.col('last7days_count') / pl.col('interaction_count'))
last7days_interaction_count = merged.drop('interaction_count').drop('last7days_count')

# inclusion rate in all sessions
# sessions_with_aid / total_sessions
total_sessions = train_df_item['aid'].n_unique()
unique_sessions = train_df_item.groupby('aid').agg(unique_sessions=pl.n_unique('session')).sort(by='unique_sessions')
inclusion_rate = unique_sessions.with_columns(inclusion_rate = pl.col('unique_sessions') * 1000 / total_sessions).drop('unique_sessions')

# average interactions per hour over all sessions
num_hours = train_df_item.groupby('aid').agg(num_hours = (pl.max('ts') - pl.min('ts')) / (60 * 60)).sort(by='num_hours')
num_hours = num_hours.with_columns(num_hours = pl.when(num_hours['num_hours']==0).then(1).otherwise(num_hours['num_hours']))
merged = num_hours.join(interaction_counts, on='aid')
average_interactions_ph = merged.with_columns(average_interactions_ph = pl.col('interaction_count') / pl.col('num_hours'))
average_interactions_ph = average_interactions_ph.drop('interaction_count').drop('num_hours')

# average num clicks before buy
# this bit too hard
xd = train_df_item.sort(by=['session','ts'])
aid_features = [interaction_counts, click_counts, buy_counts, last_ts, global_events_ratio, last7days_interaction_count, inclusion_rate, average_interactions_ph]

In [10]:
import pandas as pd

DISK_PIECES = 4

# Improved speed for 2X using polars. 
def pqt_to_dict(path):
    return pl.read_parquet(path).groupby('aid_x').agg(pl.col('aid_y')).to_pandas().set_index('aid_x').aid_y.apply(list).to_dict()

# LOAD THREE CO-VISITATION MATRICES
covisit_clicks = pqt_to_dict(f'/kaggle/input/otto-covisitation-matrix-parquet-files/top_20_clicks_v7_0.pqt')

for k in range(1,DISK_PIECES): 
    covisit_clicks.update(pd.read_parquet(f'/kaggle/input/otto-covisitation-matrix-parquet-files/top_20_clicks_v7_{k}.pqt') )


In [11]:
# simple candidate generator
# candidates_df = generate_candidates_simple(pl.concat([train, validA]))
# train_df = validA.unique(subset=['session'])
# train_df = candidates_df.join(train_df, on="session", how="inner").select(['session', 'aid'])

EVENT_TYPE = 0
# Add candidates + features to df
train_df = validA.to_pandas()
# covisit loaded above 
# covisit = generate_covisit(pl.concat([train, validA]), EVENT_TYPE)
train_df = train_df.sort_values(['session', 'ts'], ascending=[True, False]).groupby('session').apply(lambda x: generate_candidates(x, covisit_clicks)).reset_index(name="aid")
train_df = pl.from_pandas(train_df)
train_df.columns = ['session', 'aid']
train_df = train_df.explode('aid')
train_df = train_df.select([
    pl.col('session').cast(pl.datatypes.Int32).alias('session'), 
    pl.col('aid').cast(pl.datatypes.Int32).alias('aid')
])

session_features = generate_session_features(validA)
for session in session_features:
    train_df = train_df.join(session, on='session', how='left')

for aid in aid_features:
    train_df = train_df.join(aid, on='aid', how='left')

In [12]:
# add gts
type2id = {"clicks": 0, "carts": 1, "orders": 2}
validB_long = validB.explode("ground_truth").select([
    pl.col("session").cast(pl.datatypes.Int32),
    pl.col("ground_truth").cast(pl.datatypes.Int32).alias("aid"),
    pl.col("type").map_dict(type2id).cast(pl.datatypes.Int8),
]).with_columns(pl.lit(1).alias('gt'))

validB_click = validB_long.filter(pl.col("type") == 0)
validB_cart = validB_long.filter(pl.col("type") == 1)
validB_order = validB_long.filter(pl.col("type") == 2)

train_df_click = train_df.join(validB_click, on=["session", "aid"], how="left").with_columns(pl.all().fill_null(0)).drop("type")
train_df_cart = train_df.join(validB_cart, on=["session", "aid"], how="left").with_columns(pl.all().fill_null(0)).drop("type")
train_df_order = train_df.join(validB_order, on=["session", "aid"], how="left").with_columns(pl.all().fill_null(0)).drop("type")

print(train_df_click, train_df_cart, train_df_order)

shape: (6_503_686, 19)
┌──────────┬─────────┬───────────┬────────────┬───┬────────────┬──────────────┬──────────────┬─────┐
│ session  ┆ aid     ┆ last_type ┆ session_le ┆ … ┆ last7days_ ┆ inclusion_ra ┆ average_inte ┆ gt  │
│ ---      ┆ ---     ┆ ---       ┆ ngth       ┆   ┆ interactio ┆ te           ┆ ractions_ph  ┆ --- │
│ i32      ┆ i32     ┆ u8        ┆ ---        ┆   ┆ n_rate     ┆ ---          ┆ ---          ┆ i32 │
│          ┆         ┆           ┆ i32        ┆   ┆ ---        ┆ f64          ┆ f64          ┆     │
│          ┆         ┆           ┆            ┆   ┆ f64        ┆              ┆              ┆     │
╞══════════╪═════════╪═══════════╪════════════╪═══╪════════════╪══════════════╪══════════════╪═════╡
│ 11098529 ┆ 1105029 ┆ 0         ┆ 0          ┆ … ┆ 0.020202   ┆ 0.077767     ┆ 0.297562     ┆ 1   │
│ 11098531 ┆ 624163  ┆ 2         ┆ 546        ┆ … ┆ 0.013889   ┆ 0.026104     ┆ 0.147819     ┆ 0   │
│ 11098531 ┆ 1553691 ┆ 2         ┆ 546        ┆ … ┆ 0.225806   ┆ 0.0

In [13]:
%%time
from collections import defaultdict

validA = validA.sort(['session', 'ts'], descending=[False, True])

count_in_session = defaultdict(lambda: 0)
for row in validA.iter_rows():
    count_in_session[(row[0], row[1])] += 1

def session_item_features(df):
    df = df.with_columns(
        pl.struct(pl.col(['session', 'aid'])).apply(lambda x: count_in_session[x['aid'], x['session']]).alias("count_within_session"),
    )
    return df

train_df_click = session_item_features(train_df_click).select([pl.all().exclude('gt'), pl.col('gt')])
train_df_cart = session_item_features(train_df_cart).select([pl.all().exclude('gt'), pl.col('gt')])
train_df_order = session_item_features(train_df_order).select([pl.all().exclude('gt'), pl.col('gt')])

CPU times: user 1min 2s, sys: 6.03 s, total: 1min 8s
Wall time: 1min 8s


In [14]:
import polars as pl
data = {'a': [1, 2, 3],
       'b': [2, 3, 4]}

def f(a, b):
    return a + b
df = pl.DataFrame(data)

df.shape[0]

3

In [15]:
# output
import pathlib

path_click: pathlib.Path = '/kaggle/working/train_df_click.parquet'
path_cart: pathlib.Path = '/kaggle/working/train_df_cart.parquet'
path_order: pathlib.Path = '/kaggle/working/train_df_order.parquet'

train_df_click.write_parquet(path_click)
train_df_cart.write_parquet(path_cart)
train_df_order.write_parquet(path_order)

# Load Test Data

In [16]:
# Get test data
import numpy as np
import pandas as pd

from pathlib import Path

data_path = Path('/kaggle/input/recsys-dataset/')

test_sessions = pd.DataFrame()
chunks = pd.read_json(data_path / 'otto-recsys-test.jsonl', lines=True, chunksize=100_000)

for e, chunk in enumerate(chunks):
    event_dict = {
        'session': [],
        'aid': [],
        'ts': [],
        'type': [],
    }
    if e < 2:
        for session, events in zip(chunk['session'].tolist(), chunk['events'].tolist()):
            for event in events:
                event_dict['session'].append(session)
                event_dict['aid'].append(event['aid'])
                event_dict['ts'].append(event['ts'])
                event_dict['type'].append(event['type'])
        chunk_session = pd.DataFrame(event_dict)
        test_sessions = pd.concat([test_sessions, chunk_session])
    else:
        break
        

test_sessions = pl.from_pandas(test_sessions.reset_index(drop=True))
test_sessions = test_sessions.groupby('session').agg(pl.all()).sort(by='session')

In [17]:
# Split test data into testA (session up to certain point) and testB (prediction)
dictA = {'session': [], 'aid': [], 'ts': [], 'type': []}
dictB = {'session': [], 'aid': [], 'ts': [], 'type': []}

for row in test_sessions.iter_rows():
    split_idx = np.random.randint(1,len(row[1]))
    dictA['session'].append(row[0])
    dictA['aid'].append(row[1][:split_idx])
    dictA['ts'].append(row[2][:split_idx])
    dictA['type'].append(row[3][:split_idx])

    dictB['session'].append(row[0])
    dictB['aid'].append(row[1][split_idx:])
    dictB['ts'].append(row[2][split_idx:])
    dictB['type'].append(row[3][split_idx:])
    
testA = pl.DataFrame(data=dictA).explode(['aid', 'ts', 'type'])
testA = testA.select([pl.all().exclude('type'), pl.col('type').map_dict(type2id).alias('type')])
testB = pl.DataFrame(data=dictB)

In [18]:
# Generate test candidates
# Actual Candidates Generator
test_df = testA.to_pandas()
# covisit loaded above 
# covisit = generate_covisit(pl.concat([train, validA]), EVENT_TYPE)
test_df = test_df.sort_values(['session', 'ts'], ascending=[True, False]).groupby('session').apply(lambda x: generate_candidates(x, covisit_clicks)).reset_index(name="aid")
test_df = pl.from_pandas(test_df)
test_df.columns = ['session', 'aid']
test_df = test_df.explode('aid')
test_df = test_df.select([
    pl.col('session').cast(pl.datatypes.Int32).alias('session'), 
    pl.col('aid').cast(pl.datatypes.Int32).alias('aid')
])

# Using simple candidates generator
# test_df = generate_candidates_simple(testA).select([pl.all().cast(pl.datatypes.Int32)])    
# session_features = generate_session_features(testA.select([
#     pl.col('session').cast(pl.datatypes.Int32),
#     pl.col('aid').cast(pl.datatypes.Int32),
#     pl.col('ts').apply(lambda x: x / 1000).cast(pl.datatypes.Int32),
#     pl.col("type").map_dict(type2id).cast(pl.datatypes.Int8)
# ]))

for session in session_features:
    test_df = test_df.join(session, on='session', how='left')

for aid in aid_features:
    test_df = test_df.join(aid, on='aid', how='left')
    
test_df = test_df.with_columns(pl.all().fill_null(0))
print(test_df)

shape: (1_383_298, 18)
┌──────────┬─────────┬───────────┬────────────┬───┬───────────┬────────────┬────────────┬────────────┐
│ session  ┆ aid     ┆ last_type ┆ session_le ┆ … ┆ click2buy ┆ last7days_ ┆ inclusion_ ┆ average_in │
│ ---      ┆ ---     ┆ ---       ┆ ngth       ┆   ┆ ---       ┆ interactio ┆ rate       ┆ teractions │
│ i32      ┆ i32     ┆ u8        ┆ ---        ┆   ┆ f64       ┆ n_rate     ┆ ---        ┆ _ph        │
│          ┆         ┆           ┆ i32        ┆   ┆           ┆ ---        ┆ f64        ┆ ---        │
│          ┆         ┆           ┆            ┆   ┆           ┆ f64        ┆            ┆ f64        │
╞══════════╪═════════╪═══════════╪════════════╪═══╪═══════════╪════════════╪════════════╪════════════╡
│ 12899779 ┆ 59625   ┆ 0         ┆ 0          ┆ … ┆ 0.0       ┆ 0.0        ┆ 0.004894   ┆ 0.020442   │
│ 12899779 ┆ 941596  ┆ 0         ┆ 0          ┆ … ┆ 61.0      ┆ 0.0        ┆ 0.022297   ┆ 0.140253   │
│ 12899779 ┆ 731692  ┆ 0         ┆ 0          ┆ … 

In [19]:
%%time
from collections import defaultdict

testA = testA.sort(['session', 'ts'], descending=[False, True])

count_in_session = defaultdict(lambda: 0)
for row in testA.iter_rows():
    count_in_session[(row[0], row[1])] += 1

def session_item_features(df):
    df = df.with_columns(
        pl.struct(pl.col(['session', 'aid'])).apply(lambda x: count_in_session[x['aid'], x['session']]).alias("count_within_session"),
    )
    return df

test_df = session_item_features(test_df)

CPU times: user 6.91 s, sys: 332 ms, total: 7.24 s
Wall time: 7.1 s


In [20]:
print(test_df)

shape: (1_383_298, 19)
┌──────────┬─────────┬───────────┬────────────┬───┬────────────┬────────────┬────────────┬────────────┐
│ session  ┆ aid     ┆ last_type ┆ session_le ┆ … ┆ last7days_ ┆ inclusion_ ┆ average_in ┆ count_with │
│ ---      ┆ ---     ┆ ---       ┆ ngth       ┆   ┆ interactio ┆ rate       ┆ teractions ┆ in_session │
│ i32      ┆ i32     ┆ u8        ┆ ---        ┆   ┆ n_rate     ┆ ---        ┆ _ph        ┆ ---        │
│          ┆         ┆           ┆ i32        ┆   ┆ ---        ┆ f64        ┆ ---        ┆ i64        │
│          ┆         ┆           ┆            ┆   ┆ f64        ┆            ┆ f64        ┆            │
╞══════════╪═════════╪═══════════╪════════════╪═══╪════════════╪════════════╪════════════╪════════════╡
│ 12899779 ┆ 59625   ┆ 0         ┆ 0          ┆ … ┆ 0.0        ┆ 0.004894   ┆ 0.020442   ┆ 0          │
│ 12899779 ┆ 941596  ┆ 0         ┆ 0          ┆ … ┆ 0.0        ┆ 0.022297   ┆ 0.140253   ┆ 0          │
│ 12899779 ┆ 731692  ┆ 0         ┆ 0     

In [21]:
actual_events = testB.explode(['aid', 'ts', 'type']).select([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('aid').cast(pl.datatypes.Int32),
    pl.col('ts').apply(lambda x: x / 1000).cast(pl.datatypes.Int32),
    pl.col('type').map_dict(type2id).cast(pl.datatypes.Int8)
])

In [22]:
# output
import pathlib

path_test: pathlib.Path = '/kaggle/working/test_df.parquet'
path_actual_events: pathlib.Path = '/kaggle/working/actual_events.parquet'

test_df.write_parquet(path_test)
actual_events.write_parquet(path_actual_events)