In this notebook we will train an LGBM Ranker.

In his very informative post, [Recommendation Systems for Large Datasets](https://www.kaggle.com/competitions/otto-recommender-system/discussion/364721) [@ravishah1](https://www.kaggle.com/ravishah1) explains how re-ranking models are the industry standard for dealing with datasets like we are presented with in this competition, that is ones with high cardinality categories!

Earlier in this competition I shared a notebook [co-visitation matrix - simplified, imprvd logic 🔥](https://www.kaggle.com/code/radek1/co-visitation-matrix-simplified-imprvd-logic) which introduces the co-visitation matrix that can be used for candidate generation and scoring. (to read more about co-visitation matrices and how they work, please see [💡 What is the co-visiation matrix, really?](https://www.kaggle.com/competitions/otto-recommender-system/discussion/365358))

Here, we will only look at ranking. I don't expect this notebook to achieve a particularly good score, but it will provide all the low level plumbing needed for training ranking models. One will be able to build on it and improve the result (via for instance adding new candidates generated using co-visitation matrices!).

For data processing we will use [polars](https://www.pola.rs/). Polars is a very interesting library that I wanted to try for a very long time now. It is written in Rust and embraces running on multiple cores. And I must say it delivers! I liked the API quite a bit and its speed (though in that department `cudf` would still be my first choice!). I am however not touching my GPU quata on Kaggle just yet as I have a couple of things lined up that I would like to share with you that definitely will require the GPU! 🙂

To simplify the code, I am using a version of the dataset that I shared [here](https://www.kaggle.com/datasets/radek1/otto-train-and-test-data-for-local-validation). No need for dealing with `jsonl` files any longer as it's all `parquet` files now! (Specifically, I am using a version of this dataset that I preprared for local validation [in this notebook](https://www.kaggle.com/code/radek1/a-robust-local-validation-framework).)

## Other resources you might find useful:


* [💡 [2 methods] How-to ensemble predictions 🏅🏅🏅](https://www.kaggle.com/code/radek1/2-methods-how-to-ensemble-predictions)
* [📖 What are some good resources to learn about how gradient-boosted tree ranking models work?](https://www.kaggle.com/competitions/otto-recommender-system/discussion/366477)
* [💡What is a good initial goal in the competition? How to improve beyond it? 📈](https://www.kaggle.com/competitions/otto-recommender-system/discussion/368685)
* [💡How to improve the results of your Approximate Nearest Neighbor search! (annoy)](https://www.kaggle.com/competitions/otto-recommender-system/discussion/368385)
* [from zero to 60 in 2 seconds or less 🏎️🚓🚓🚓](https://www.kaggle.com/competitions/otto-recommender-system/discussion/367058)


# Packages 

In [1]:
# ! pip install pandas

In [2]:
import polars as pl
import glob
import pandas as pd
import gc
from sklearn.pipeline import Pipeline
import joblib
import os
import lightgbm

# Config 

In [7]:
debug = True


final_submission = False
candidate_model_version = 'candidate_v2_train1_data'
estimator = 40
rerank_model_version = f'rerank_v1_{estimator}'
# if True, val data are left for val; otherwise, there's no validation data
for_local_val = True

train_data_dir = '../submission/candidate_for_rerank_training/'
val_data_dir = '../submission/candidate_for_validation/'

type2id = {"clicks": 0, "carts": 1, "orders": 2}
# id2type = dict(zip(type2id.values(), type2id.keys()))
model_path = f'../model_training/{rerank_model_version}'
if not os.path.isdir(model_path):
    os.makedirs(model_path)
model_file = os.path.join(model_path, 'ranker.pkl')


if final_submission:
    test_candidate_file = os.path.join('../submission/final_submission_candiate/', f'{candidate_model_version}_test_submission.csv')
    final_submission_file = os.path.join('../submission/final_submission', f'{rerank_model_version}_test_submission.csv')
else:
    test_candidate_file = os.path.join('../submission/candidate_for_validation/', f'{candidate_model_version}_test_submission.csv')
    final_submission_file = os.path.join('../submission/submission_for_validation', f'{rerank_model_version}_test_submission.csv')
    



feature_cols = ['aid', 'type', 'action_num_reverse_chrono', 'session_length', 'log_recency_score',
#                 'type_weighted_log_recency_score'
               ]
target = 'gt'


debug_candidate_file = '../submission/debug/debug_submission.csv'

In [8]:
rerank_model_version

'rerank_v1_40'

In [9]:
model_file

'../model_training/rerank_v1_40/ranker.pkl'

In [10]:
train_data_path = os.path.join(train_data_dir, f'{candidate_model_version}_test_submission.csv')
val_data_path = os.path.join(val_data_dir, f'{candidate_model_version}_test_submission.csv')

if for_local_val:
    train_label_path = '../data/parquet/train2_label/*.parquet'
else:
    train_label_path = '../data/parquet/val_label/*.parquet'
    
if debug:
    test = pl.read_csv(debug_candidate_file)
else:
    test = pl.read_csv(test_candidate_file)

In [11]:
train_data_path

'../submission/candidate_for_rerank_training/candidate_v2_train1_data_test_submission.csv'

In [12]:
val_data_path

'../submission/candidate_for_validation/candidate_v2_train1_data_test_submission.csv'

In [13]:
train_label_path

'../data/parquet/train2_label/*.parquet'

In [14]:
# ! head -n 10000 {train_data_path} > ../submission/debug/debug_submission.csv

In [15]:
if debug: 
    train = pl.read_csv(debug_candidate_file)
else:
#     train = pl.read_csv('../data/val_candidates.csv')
    train = pl.read_csv(train_data_path)
train_labels = pl.read_parquet(train_label_path)
test_label_file = '../data/parquet/val_label/*.parquet'

test_labels = pl.read_parquet(test_label_file)

In [16]:
train.shape

(9999, 2)

In [17]:
train_labels.shape

(2738344, 3)

In [18]:
test_labels.shape

(2189204, 3)

In [20]:
train.head()

session_type,labels
str,str
"""8643220_clicks...","""573273 399315 ..."
"""8643221_clicks...","""921137 1543291..."
"""8643222_clicks...","""1037630 930597..."
"""8643223_clicks...","""1811963 206418..."
"""8643224_clicks...","""778561 1106262..."


# Data Processing

# Function

In [15]:
def get_session(row):
    session = row
#     print(session)
    return session.split('_')[0]
def get_type(row):
    session = row
#     print(session)
    return session.split('_')[1]
def data_preprocess(train):
    return train.with_columns(
                [
                    pl.col('labels').str.split(' '),
        #             pl.col('session_type').str.split('_').map(lambda s: s[0]),
                    pl.col('session_type').apply(lambda s: get_session(s)).alias('session'),
                    pl.col('session_type').apply(lambda s: get_type(s)).alias('type')
                ]
            ).explode('labels').with_columns(
                [
                    pl.col('labels').cast(pl.datatypes.Int32).alias('aid'),
                     pl.col('session').cast(pl.datatypes.Int32),
                    pl.col('type').apply(lambda x: type2id[x])
                ]
            ).drop(['session_type', 'labels']).with_columns(
                [
                    pl.col('session').cast(pl.datatypes.Int32),
                    pl.col('type').cast(pl.datatypes.UInt8),
                    pl.col('aid').cast(pl.datatypes.Int32)
                ]
            )
    

def add_action_num_reverse_chrono(df):
    return df.select([
        pl.col('*'),
        pl.col('session').cumcount().reverse().over('session').alias('action_num_reverse_chrono')
    ])

def add_session_length(df):
    return df.select([
        pl.col('*'),
        pl.col('session').count().over('session').alias('session_length')
    ])

def add_log_recency_score(df):
    linear_interpolation = 0.1 + ((1-0.1) / (df['session_length']-1)) * (df['session_length']-df['action_num_reverse_chrono']-1)
    return df.with_columns(pl.Series(2**linear_interpolation - 1).alias('log_recency_score')).fill_nan(1)

# def add_type_weighted_log_recency_score(df):
#     type_weights = {0:1, 1:6, 2:3}
#     type_weighted_log_recency_score = pl.Series(df['log_recency_score'] / df['type'].apply(lambda x: type_weights[x]))
#     return df.with_column(type_weighted_log_recency_score.alias('type_weighted_log_recency_score'))

def add_train_label(df, train_labels=train_labels):
    train = df
    train_labels = train_labels.explode('ground_truth').with_columns([
        pl.col('ground_truth').alias('aid'),
        pl.col('type').apply(lambda x: type2id[x])
    ])[['session', 'type', 'aid']]

    train_labels = train_labels.with_columns([
        pl.col('session').cast(pl.datatypes.Int32),
        pl.col('type').cast(pl.datatypes.UInt8),
        pl.col('aid').cast(pl.datatypes.Int32)
    ])
    train_labels = train_labels.with_column(pl.lit(1).alias('gt'))
    train = train.join(train_labels, how='left', on=['session', 'type', 'aid']).with_column(pl.col('gt').fill_null(0))
    return train

def add_test_label(df, train_labels=test_labels):
    train = df
    train_labels = train_labels.explode('ground_truth').with_columns([
        pl.col('ground_truth').alias('aid'),
        pl.col('type').apply(lambda x: type2id[x])
    ])[['session', 'type', 'aid']]

    train_labels = train_labels.with_columns([
        pl.col('session').cast(pl.datatypes.Int32),
        pl.col('type').cast(pl.datatypes.UInt8),
        pl.col('aid').cast(pl.datatypes.Int32)
    ])
    train_labels = train_labels.with_column(pl.lit(1).alias('gt'))
    train = train.join(train_labels, how='left', on=['session', 'type', 'aid']).with_column(pl.col('gt').fill_null(0))
    return train

def apply(df, pipeline):
    for f in pipeline:
        df = f(df)
    return df

train_pipeline = [ data_preprocess, add_action_num_reverse_chrono, add_session_length, add_log_recency_score, 
            add_train_label
           ]
test_pipeline = [ data_preprocess, add_action_num_reverse_chrono, add_session_length, add_log_recency_score, 
            add_test_label
           ]

# Codes 

In [16]:
# # train['labels'] = 
# # train = 
# train = (
#     train.with_columns(
#         [
#             pl.col('session').alias('session_type')
#         ])
#     .drop(['session', '__index_level_0__'])
#     .with_columns(
#         [
# #             pl.col('labels').str.split(' '),
# #             pl.col('session_type').str.split('_').map(lambda s: s[0])
#             pl.col('session_type').apply(lambda s: get_session(s)).alias('session'),
#             pl.col('session_type').apply(lambda s: get_type(s)).alias('type')
# #             pl.col("session_type").arr().get(0).alias("a"),
#         ]
#     )
#     .explode('labels')
#     .with_columns(
#         [
#             pl.col('labels').cast(pl.datatypes.Int32).alias('aid'),
#              pl.col('session').cast(pl.datatypes.Int32),
#             pl.col('type').apply(lambda x: type2id[x])
#         ]
#     )
#     .drop(['session_type', 'labels'])
#     .with_columns(
#         [
#             pl.col('session').cast(pl.datatypes.Int32),
#             pl.col('type').cast(pl.datatypes.UInt8),
#             pl.col('aid').cast(pl.datatypes.Int32)
#         ]
#     )
    
# )
# # .with_columns(
# #     [
# #         pl.col('session_type').alias('new')
# #     ]
# # )
# # .with_columns(
# #     [
# #         pl.col('session_type').get(0)
# #     ]
# # )

In [32]:
train.shape

(9999, 2)

In [33]:
train_labels.shape

(2738344, 3)

In [35]:
train.head()

session_type,labels
str,str
"""8643220_clicks...","""573273 399315 ..."
"""8643221_clicks...","""921137 1543291..."
"""8643222_clicks...","""1037630 930597..."
"""8643223_clicks...","""1811963 206418..."
"""8643224_clicks...","""778561 1106262..."


In [40]:
train.select([
        pl.col('labels')    ])

labels
str
"""573273 399315 ..."
"""921137 1543291..."
"""1037630 930597..."
"""1811963 206418..."
"""778561 1106262..."
"""655488 1566421..."
"""1845885 171073..."
"""436885 460141 ..."
"""791627 613637 ..."
"""1696741 103011..."


In [38]:
train.select([pl.col('lables')])

NotFoundError: lables

In [24]:

# train =

In [25]:
train.head()

session_type,labels
str,str
"""8643220_clicks...","""573273 399315 ..."
"""8643221_clicks...","""921137 1543291..."
"""8643222_clicks...","""1037630 930597..."
"""8643223_clicks...","""1811963 206418..."
"""8643224_clicks...","""778561 1106262..."


In [26]:
print(f"{train.shape}; {train_labels.shape}")

(9999, 2); (2738344, 3)


In [27]:
train.head()

session_type,labels
str,str
"""8643220_clicks...","""573273 399315 ..."
"""8643221_clicks...","""921137 1543291..."
"""8643222_clicks...","""1037630 930597..."
"""8643223_clicks...","""1811963 206418..."
"""8643224_clicks...","""778561 1106262..."


In [28]:
# train = train.drop('ts')

In [29]:
train.head()

session_type,labels
str,str
"""8643220_clicks...","""573273 399315 ..."
"""8643221_clicks...","""921137 1543291..."
"""8643222_clicks...","""1037630 930597..."
"""8643223_clicks...","""1811963 206418..."
"""8643224_clicks...","""778561 1106262..."


In [None]:
train = apply(train, train_pipeline)
test = apply(test, test_pipeline)

In [None]:
train.head()

All done!

In [None]:
train.head()

In [None]:
train.shape

In [None]:
train['type'].value_counts()

Now we need to process our labels a little bit and merge them onto our train set.

In [None]:
# test_label_file = '../data/parquet/val_label/*.parquet'

In [None]:
# test_labels = pl.read_parquet(test_label_file)

In [None]:
# test_labels = test_labels.explode('ground_truth').with_columns([
#     pl.col('ground_truth').alias('aid'),
#     pl.col('type').apply(lambda x: type2id[x])
# ])[['session', 'type', 'aid']]

In [None]:
test_labels.head()

In [None]:
test_labels.shape

In [None]:
train_labels = train_labels.explode('ground_truth').with_columns([
    pl.col('ground_truth').alias('aid'),
    pl.col('type').apply(lambda x: type2id[x])
])[['session', 'type', 'aid']]

train_labels = train_labels.with_columns([
    pl.col('session').cast(pl.datatypes.Int32),
    pl.col('type').cast(pl.datatypes.UInt8),
    pl.col('aid').cast(pl.datatypes.Int32)
])
train_labels = train_labels.with_column(pl.lit(1).alias('gt'))
train = train.join(train_labels, how='left', on=['session', 'type', 'aid']).with_column(pl.col('gt').fill_null(0))

In [None]:
train_labels.shape

In [None]:
train_labels.head()

In [None]:
train_labels.head()

In [None]:
train_labels['gt'].value_counts()

In [None]:
train.head()

In [None]:
type2id

In [None]:
# train['type']

In [None]:
# train[train['type']==2]#['gt'].value_counts()

In [None]:
train['gt'].value_counts()

Ok, so we now have our preprocessed dataset, a column with ground truth, which means that the only thing we are missing for our Ranker is... information how to group individual rows into sessions!

In [None]:
train.shape

In [None]:
train.head()

In [None]:
def get_session_lenghts(df):
    return df.groupby('session').agg([
        pl.col('session').count().alias('session_length')
    ])['session_length'].to_numpy()

In [None]:
session_lengths_train = get_session_lenghts(train)
session_lengths_test = get_session_lenghts(test)

In [None]:
session_lengths_train.shape

In [None]:
session_lengths_train

# Model training

In [None]:
import lightgbm

In [None]:
from lightgbm.sklearn import LGBMRanker

In [None]:
ranker = LGBMRanker(
    objective="lambdarank",
    metric="ndcg",
    boosting_type="dart",
    n_estimators=estimator, 
    importance_type='gain',
    eval_at=[5]
)

In [None]:
estimator

In [None]:
# train[feature_cols]

In [None]:
train[feature_cols].shape

In [None]:
test[feature_cols].shape

In [None]:
train[feature_cols].head()

In [None]:
test[feature_cols].head()

In [None]:
# train_labels['gt']

In [None]:
final_submission_file

In [None]:
min(session_lengths_train)

In [None]:
ranker = ranker.fit(
    X=train[feature_cols].to_pandas(),
    y=train[target].to_pandas(),
    group=session_lengths_train,
    eval_set=[(train[feature_cols].to_pandas(), train[target].to_pandas()),
             (test[feature_cols].to_pandas(), test[target].to_pandas())
             ],
    eval_group=[session_lengths_train, session_lengths_test]
)

In [None]:
pipe = Pipeline([
    ('model', ranker)
])

In [None]:
debug

In [None]:
if not debug:
    joblib.dump(
        value=pipe,
        filename=model_file)

In [None]:
del train, train_labels
gc.collect()

# Load models 

In [None]:
new_pipeline = joblib.load(
    filename=model_file
)

In [None]:
new_pipeline

# Predict on test data

Let's load our test set, process it and predict on it.

In [None]:
final_submission_file

In [None]:
test_candidate_file

In [None]:
# ! ls ../submission/candiate_for_validation/

In [None]:
# ../submission/candidate_for_validation/

In [None]:
# assert len(test['session_type'].unique()) == 5015409

In [None]:
test.head()

In [None]:
scores = new_pipeline.predict(test[feature_cols].to_pandas())

# Create submission

In [None]:
test = test.with_columns(pl.Series(name='score', values=scores))
test_predictions = test.sort(['session', 'score'], reverse=True).groupby('session').agg([
    pl.col('aid').limit(20).list()
])

In [None]:
test_predictions.head()

In [None]:
session_types = []
labels = []

for session, preds in zip(test_predictions['session'].to_numpy(), test_predictions['aid'].to_numpy()):
    l = ' '.join(str(p) for p in preds)
    for session_type in ['clicks', 'carts', 'orders']:
        labels.append(l)
        session_types.append(f'{session}_{session_type}')

In [None]:
! ls -al {final_submission_file}

In [None]:
submission = pl.DataFrame({'session_type': session_types, 'labels': labels})
if not debug:
    submission.write_csv(final_submission_file)

In [None]:
final_submission_file