# Package 

In [2]:
%load_ext autoreload
%autoreload 2
import sys
import logging
base_dir = '../'
sys.path.append(base_dir)
import os
import warnings
warnings.simplefilter('ignore')

import pickle
import gc
import re
import polars as pl
from collections import defaultdict, Counter

import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
from tqdm.auto import tqdm
import polars as pl
from utils import *
from src.eval import model_eval
from src.config import raw_data_session_id_dir, candidate_dir, model_for_eval, candidate_file_name

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
candidate_file_name

'{task}_{data_type}_{model_version}_{model_for_eval}_top{topn}.parquet'

In [4]:
# df_prod = pd.read_csv('data/products_train.csv')
# df_prod

# Config 

In [6]:
debug = False

model_version = 'co_visit'

topn = 300
if debug:
    n_rows = 1000
else:
    n_rows = None
# debug_session_num = 100
train_data_dir = '.'
test_data_dir = '.'
task = 'task1'

model_dir = f'../model_training/{model_version}/'

# target locales: locales needed for task1
target_locals = ["DE", 'JP', 'UK']

# if model_for_eval:
model_file = os.path.join(model_dir, f'{model_version}_{model_for_eval}_for_eval.parquet')
submit_file = os.path.join('../data/sub_files/', f'submission_{task}_{model_version}_{model_for_eval}_for_eval.parquet')

In [39]:
train_file_name = candidate_file_name.format(
    task=task
    , data_type='train'
    , model_version=model_version
    , model_for_eval=model_for_eval
    , topn=topn
)
eval_file_name = candidate_file_name.format(
    task=task
    , data_type='eval'
    , model_version=model_version
    , model_for_eval=model_for_eval
    , topn=topn
)
test_file_name = candidate_file_name.format(
    task=task
    , data_type='test'
    , model_version=model_version
    , model_for_eval=model_for_eval
    , topn=topn
)
print(train_file_name)
print(eval_file_name)
print(test_file_name)

task1_train_co_visit_True_top300.parquet
task1_eval_co_visit_True_top300.parquet
task1_test_co_visit_True_top300.parquet


In [8]:
! mkdir {model_dir}

mkdir: cannot create directory ‘../model_training/co_visit/’: File exists


In [9]:
model_file

'../model_training/co_visit/co_visit_True_for_eval.parquet'

In [10]:
submit_file

'../data/sub_files/submission_task1_co_visit_True_for_eval.parquet'

# Data 

In [11]:
# ! ls ../{raw_data_session_id_dir}

In [12]:
train_pl = pl.scan_parquet(os.path.join(base_dir, raw_data_session_id_dir, 'sessions_train.parquet'), n_rows=n_rows).filter(pl.col('locale').is_in(target_locals)).with_columns(pl.col('prev_items').apply(str2list))

eval_pl = pl.scan_parquet(os.path.join(base_dir, raw_data_session_id_dir, 'sessions_eval.parquet'), n_rows=n_rows).filter(pl.col('locale').is_in(target_locals)).with_columns(pl.col('prev_items').apply(str2list))

# df_sess.head(3).collect()
test_pl = pl.scan_parquet(os.path.join(base_dir, raw_data_session_id_dir, 'sessions_test_task1.parquet'), n_rows=n_rows).with_columns(pl.col('prev_items').apply(str2list))


# Function 

In [13]:
# 'item', 'next_item_prediction', 'next_item_weight'

In [14]:
def nic_rec(target_pl, nic_model, topn=topn):
    rec_num = topn
    def get_next_items(x):
        prev_items = x['prev_items']
        local_rec = x['next_item_prediction']
        final = [ele for ele in local_rec if ele not in prev_items]
        return final

    final_cols = ['session_id', 'next_item_prediction', 'rec_num']
    target_pl = (
        target_pl
            .with_columns(
                pl.col('prev_items').arr.get(-1).alias('last_item')
            )
            .join(nic_model, how='left', left_on='last_item', right_on='item')
            .with_columns(
                pl.when(pl.col('next_item_prediction').is_null())
                    .then([])
                    .otherwise(pl.col('next_item_prediction'))
                    .alias('next_item_prediction')
            ).with_columns(
                    pl.struct(["prev_items", "next_item_prediction"]).apply(
                        lambda x: get_next_items(x)).alias('next_item_prediction')
            )
            .with_columns(
                    pl.col('next_item_prediction').arr.head(rec_num)
            )
            .with_columns(
                pl.col('next_item_prediction').arr.lengths().alias('rec_num')
            )
            .select(
                final_cols
            )
    )#.head(3).collect()
    return target_pl

# Next Item Statistics 

In [13]:
train_data = train_pl.with_columns(
    pl.col('prev_items').arr.concat(pl.col('next_item')) 
)
if not model_for_eval:
    eval_data = eval_pl.with_columns(
    pl.col('prev_items').arr.concat(pl.col('next_item')) 
)
else:
    eval_data = eval_pl
test_data = test_pl

In [14]:
cols_to_keep = ['prev_items']

all_train_data = pl.concat([train_data.select(cols_to_keep), eval_data.select(cols_to_keep), test_data.select(cols_to_keep)], how='vertical')

In [15]:
# all_train_data.head().collect()

In [16]:
def get_cnt(row):
    prev_items = row.to_list()
    # print(type(prev_items))
    # print(prev_items)
    length = len(prev_items)
    res = []
    for idx1, ele1 in enumerate(prev_items):
        for idx2, ele2 in enumerate(prev_items):
            if idx1 != idx2:
                ele2 = prev_items[idx2]
                # combined_ele = '_'.join([ele1, ele2])
                weight = str(1)
                res.append(
                    # combined_ele
                    [
                        # combined_ele
                    ele1
                    , ele2
                     , weight
                    ]
                          )
    return res
        

In [17]:
next_items_pl = (
    all_train_data.with_columns(
        pl.col('prev_items').apply(lambda x: get_cnt(x))
    )
    .explode('prev_items')
    .select(
        pl.col('prev_items').arr.get(0).alias('current_item')
        , pl.col('prev_items').arr.get(1).alias('next_item')

        , pl.col('prev_items').arr.get(2).alias('weight').cast(pl.Float32)
    ).groupby(['current_item', 'next_item'])
    .agg(
        pl.col('weight').sum()
    ).sort(['current_item', 'weight'], descending=True)
    .groupby(['current_item'])
    .agg(
        pl.col('next_item')
        , pl.col('weight')
    )
    .select(
        pl.col('current_item').alias('item')
        , pl.col('next_item').alias('next_item_prediction')
        , pl.col('weight').alias('next_item_weight')
    )
)

In [18]:
# next_items_pl.head().collect()

## Save model 

In [19]:
model_file

'../model_training/co_visit/co_visit_True_for_eval.parquet'

In [20]:
%%time
next_items_pl = next_items_pl.collect()

CPU times: user 11min 50s, sys: 31.2 s, total: 12min 21s
Wall time: 9min 13s


In [21]:
next_items_pl.sample(3)

item,next_item_prediction,next_item_weight
str,list[str],list[f32]
"""B07VST1K92""","[""B001DZBMWQ"", ""B00J2UUKZE"", … ""B07S3QXCRK""]","[1.0, 1.0, … 1.0]"
"""B09Q2Z5H5T""","[""B09HKNNPSQ"", ""B09Q2Z5H5T"", … ""B09DPGVXCK""]","[4.0, 4.0, … 1.0]"
"""B09NZYXN2K""","[""B09NZYXN2K"", ""B09LCGLYMD"", … ""B07ZGDTGFJ""]","[12.0, 8.0, … 1.0]"


In [22]:
next_items_pl.select(pl.col('next_item_prediction').arr.lengths().alias('rec_num')).describe()

describe,rec_num
str,f64
"""count""",1335037.0
"""null_count""",0.0
"""mean""",37.213671
"""std""",66.923547
"""min""",1.0
"""max""",3396.0
"""median""",18.0


In [23]:
next_items_pl.write_parquet(model_file)

In [24]:
del next_items_pl

## Read Model 

In [15]:
model_file

'../model_training/co_visit/co_visit_True_for_eval.parquet'

In [16]:
next_items_pl = pl.scan_parquet(model_file)

In [17]:
type_dict = next_items_pl.schema

In [18]:
type_dict.keys()

dict_keys(['item', 'next_item_prediction', 'next_item_weight'])

## Model eval 

In [19]:
# train_pl.schema

In [20]:
# eval_pl.schema

In [21]:
# nic_rec(target_pl=eval_pl.head(100), nic_model=next_items_pl, topn=topn).select(pl.col('next_item_prediction').arr.lengths().alias('rec_num')).collect().describe()

In [22]:
eval_candidate_pl = nic_rec(target_pl=eval_pl, nic_model=next_items_pl, topn=topn)#.head().collect()

In [22]:
# eval_candidate_pl.head().co

In [23]:

# if_hit = pl.element().rank()
target_df = eval_pl.join(eval_candidate_pl, how='left', on='session_id')


# eval_final.head().collect()

In [24]:
target_df.select(pl.col('next_item_prediction').arr.lengths().alias('rec_num')).collect().describe()

describe,rec_num
str,f64
"""count""",1000.0
"""null_count""",0.0
"""mean""",107.991
"""std""",104.708523
"""min""",0.0
"""max""",300.0
"""median""",65.0


In [25]:
%%time
# model_eval(target_df=target_df)
eval_final = (
        target_df
        .lazy()
        .with_columns(
            pl.col('next_item_prediction').cast(pl.List(pl.Utf8))
        )
        .with_columns(
            pl.concat_list([pl.col('next_item'), pl.col('next_item_prediction')]).alias('mrr')
        )
        .with_columns(
            pl.col('mrr').arr.eval(
                pl.arg_where(pl.element()==pl.element().first())
            )
        ).with_columns(
            pl.col('mrr').arr.eval(
                pl.when(pl.element()==0).then(0).otherwise(1/pl.element())
            )
        ).with_columns(
            pl.col('mrr').arr.sum()
            , pl.col('next_item_prediction').arr.head(20).arr.contains(pl.col('next_item')).mean().alias('recall@20')
            , pl.col('next_item_prediction').arr.head(100).arr.contains(pl.col('next_item')).mean().alias('recall@100')
            , pl.col('next_item_prediction').arr.head(topn).arr.contains(pl.col('next_item')).mean().alias('recall@all')


        )
)
final_res = eval_final.select(
        pl.count().alias('total_sessions')
        , pl.col('mrr').mean()
        , pl.col('recall@20').mean()
        , pl.col('recall@100').mean()
        , pl.col('recall@all').mean()

    ).collect()
final_res

CPU times: user 6.59 s, sys: 904 ms, total: 7.49 s
Wall time: 6.13 s


total_sessions,mrr,recall@20,recall@100,recall@all
u32,f64,f64,f64,f64
1000,0.243219,0.448,0.535,0.552


## Candidate Saving 

In [26]:
# nic_rec(target_pl=train_pl.head(100), nic_model=next_items_pl).select('rec_num').collect().describe()

In [23]:
train_candidate_pl = nic_rec(target_pl=train_pl, nic_model=next_items_pl)# .collect()

In [24]:
test_candidate_pl = nic_rec(target_pl=test_pl, nic_model=next_items_pl)#.head().collect()

In [25]:
%%time
train_candidate_pl = train_candidate_pl.collect()
eval_candidate_pl = eval_candidate_pl.collect()
test_candidate_pl = test_candidate_pl.collect()

CPU times: user 7min 39s, sys: 1min 6s, total: 8min 46s
Wall time: 8min 42s


In [26]:
model_version

'co_visit'

In [27]:
for data_pl in [train_candidate_pl, eval_candidate_pl, test_candidate_pl]:
    print(data_pl.select('rec_num').describe())

shape: (7, 2)
┌────────────┬────────────┐
│ describe   ┆ rec_num    │
│ ---        ┆ ---        │
│ str        ┆ f64        │
╞════════════╪════════════╡
│ count      ┆ 2.946273e6 │
│ null_count ┆ 0.0        │
│ mean       ┆ 113.772529 │
│ std        ┆ 103.984359 │
│ min        ┆ 1.0        │
│ max        ┆ 300.0      │
│ median     ┆ 74.0       │
└────────────┴────────────┘
shape: (7, 2)
┌────────────┬────────────┐
│ describe   ┆ rec_num    │
│ ---        ┆ ---        │
│ str        ┆ f64        │
╞════════════╪════════════╡
│ count      ┆ 326443.0   │
│ null_count ┆ 0.0        │
│ mean       ┆ 113.654944 │
│ std        ┆ 104.162114 │
│ min        ┆ 0.0        │
│ max        ┆ 300.0      │
│ median     ┆ 74.0       │
└────────────┴────────────┘
shape: (7, 2)
┌────────────┬────────────┐
│ describe   ┆ rec_num    │
│ ---        ┆ ---        │
│ str        ┆ f64        │
╞════════════╪════════════╡
│ count      ┆ 316971.0   │
│ null_count ┆ 0.0        │
│ mean       ┆ 155.308303 │
│ std 

In [40]:
train_candidate_pl.write_parquet(os.path.join(base_dir, candidate_dir,
                                                        train_file_name)
                                          )

In [41]:
eval_candidate_pl.write_parquet(os.path.join(base_dir, candidate_dir,
                                                       eval_file_name))

In [42]:
test_candidate_pl.write_parquet(os.path.join(base_dir, candidate_dir, 
                                                       test_file_name))

In [43]:
eval_file_name

'task1_eval_co_visit_True_top300.parquet'

## Save test result

In [None]:
predictions = test_pl.join(test_candidate_pl, how='left', on='session_id').collect()[['locale', 'next_item_prediction']].to_pandas()

In [None]:
submit_file

In [None]:
check_predictions(predictions, test_sessions=test_pl.collect().to_pandas(), 
                  # check_products=True, product_df=products
                 )
# Its important that the parquet file you submit is saved with pyarrow backend
predictions.to_parquet(submit_file, engine='pyarrow')

In [None]:
!aicrowd submission create -c task-1-next-product-recommendation -f {submit_file}

# Validate result 

In [None]:
# train_candidate_pl = pl.scan_parquet(os.path.join(base_dir, candidate_dir,
#                                                         train_file_name))
# eval_candidate_pl = pl.scan_parquet(os.path.join(base_dir, candidate_dir,
#                                                         eval_file_name))
# test_candidate_pl = pl.scan_parquet(os.path.join(base_dir, candidate_dir,
#                                                         test_file_name))

In [None]:
# for data_pl in [train_candidate_pl, eval_candidate_pl, test_candidate_pl]:
#     print(data_pl.select('rec_num').collect().describe())