In [17]:
import pandas as pd 
import json
import os
import shutil
import dask.dataframe as dd
from dask.distributed import Client
import joblib

In [None]:
client = Client(memory_limit='2GB', memory_target_fraction=0.6, local_directory='/path/to/directory')

2025-03-20 11:18:06,133 - distributed.scheduler - ERROR - Task ('explode-getitem-be68a887399e5e1eba86a2085c646cfc', 1) marked as failed because 4 workers died while trying to run it
Task exception was never retrieved
future: <Task finished name='Task-72706' coro=<Client._gather.<locals>.wait() done, defined at c:\Users\eloua\AppData\Local\Programs\Python\Python313\Lib\site-packages\distributed\client.py:2394> exception=AllExit()>
Traceback (most recent call last):
  File "c:\Users\eloua\AppData\Local\Programs\Python\Python313\Lib\site-packages\distributed\client.py", line 2403, in wait
    raise AllExit()
distributed.client.AllExit
2025-03-20 11:22:21,188 - distributed.scheduler - ERROR - Task ('explode-getitem-be68a887399e5e1eba86a2085c646cfc', 1) marked as failed because 4 workers died while trying to run it


# Chargement des data sets

In [None]:
parquet_file = "../data/final_output.parquet"
data = dd.read_parquet(parquet_file, blocksize='15MB')

def get_first_n_percent(partition, rows_to_keep):
    return partition.head(rows_to_keep) 
data = data.map_partitions(get_first_n_percent, rows_to_keep=20000)

print(f"Nombre de partitions: {data.npartitions}")

Nombre de partitions: 207


In [None]:
test_file = "../data/test.jsonl"
with open(test_file, "r", encoding="utf-8") as f:
    df = [json.loads(line) for line in f]
flattened_data = []
for record in df:
    session_id = record["session"]
    for event in record["events"]:
        event["session"] = session_id 
        flattened_data.append(event)

test_df = pd.DataFrame(flattened_data)

In [4]:
test_df.shape

(6928123, 4)

In [7]:
test_df.head()

Unnamed: 0,aid,ts,type,session
0,59625,1661724000278,clicks,12899779
1,1142000,1661724000378,clicks,12899780
2,582732,1661724058352,clicks,12899780
3,973453,1661724109199,clicks,12899780
4,736515,1661724136868,clicks,12899780


In [3]:
parquet_file = "../data/test_labels.parquet"
labels = dd.read_parquet(parquet_file)

In [11]:
labels.tail()

Unnamed: 0,session,type,ground_truth
2212687,12899774,clicks,[1399483]
2212688,12899775,clicks,[1760714]
2212689,12899776,clicks,[1737908]
2212690,12899777,clicks,[384045]
2212691,12899778,clicks,[32070]


# Traitement des données

In [4]:
def add_action_reverse(df):
    df["action_reverse"] = df.groupby("session").cumcount()
    df["action_reverse"] = df.groupby("session")["action_reverse"].apply(lambda x: x.max() - x ).reset_index(drop=True) #, meta=('action_reverse', 'int32'))
    return df

def add_session_length(df):
    df['session_length'] = df.groupby('session')['session'].transform('count') #, meta=('session_length', 'int32'))
    return df

def add_log_recency_score(df):
    linear_interpolation = 0.1 + ((1 - 0.1) / (df['session_length'] - 1)) * (df['session_length'] - df['action_reverse'] - 1)
    df['log_recency_score'] = (2 ** linear_interpolation) - 1
    df['log_recency_score'] = df['log_recency_score'].fillna(1)
    return df

def add_type_weighted_log_recency_score(df):
    type_weights = {0: 1, 1: 6, 2: 3}
    df['type_weighted_log_recency_score'] = df['log_recency_score'] / df['type'].map(type_weights)
    return df

def apply_pipeline(df, pipeline):
    if isinstance(df, (dd.DataFrame, pd.DataFrame)):
        for f in pipeline:
            df = f(df)
        return df
    else:
        raise TypeError("Input doit être un DataFrame Pandas ou Dask DataFrame")
    
def process_partition(partition):
    type_mapping = {
        'clicks': 0,
        'carts': 1,
        'orders': 2
    }
    partition['type'] = partition['type'].map(type_mapping)
    partition['type'] = partition['type'].astype('int8')
    partition = apply_pipeline(partition, pipeline)
    expected_columns = ['session', 'action_reverse', 'session_length', 'log_recency_score', 
                        'type_weighted_log_recency_score', 'aid', 'ts', 'type']
    missing_columns = [col for col in expected_columns if col not in partition.columns]
    if missing_columns:
        raise ValueError(f"Colonnes manquantes après transformation: {missing_columns}")
    return partition[expected_columns] 

In [5]:
pipeline = [add_action_reverse, add_session_length, add_log_recency_score, add_type_weighted_log_recency_score]

In [6]:
meta = {
    'session': 'int32',
    'action_reverse': 'int32',
    'session_length': 'int32',
    'log_recency_score': 'float32',
    'type_weighted_log_recency_score': 'float32',
    'aid': 'int32',
    'ts': 'int32',
    'type': 'int8'
}

In [7]:
df_processed = data.map_partitions(process_partition, meta=meta)

In [None]:
df_processed.to_parquet('dataframe.parquet', engine='pyarrow')

In [4]:
train = dd.read_parquet('dataframe_1.parquet', blocksize='15MB')

In [9]:
train.tail(20)

Unnamed: 0,session,action_reverse,session_length,log_recency_score,type_weighted_log_recency_score,aid,ts,type
19980,3747760,19,119,0.808865,0.808865,29735,1660758692329,0
19981,3747760,18,119,0.818453,0.136409,29735,1660758699999,1
19982,3747760,17,119,0.828092,0.828092,29735,1660758746118,0
19983,3747760,16,119,0.837783,0.837783,310546,1660758780780,0
19984,3747760,15,119,0.847524,0.141254,310546,1660758839293,1
19985,3747760,14,119,0.857317,0.857317,823143,1660759391052,0
19986,3747760,13,119,0.867162,0.144527,823143,1660759398244,1
19987,3747760,12,119,0.87706,0.87706,823143,1660759421569,0
19988,3747760,11,119,0.88701,0.88701,310546,1660759479681,0
19989,3747760,10,119,0.897012,0.897012,493104,1660859048661,0


# Ground truth

In [5]:
type2id = {"clicks": 0, "carts": 1, "orders": 2}

df_train_labels = labels.explode('ground_truth')

df_train_labels['aid'] = df_train_labels['ground_truth']
df_train_labels['type'] = df_train_labels['type'].map(type2id)
df_train_labels = df_train_labels[['session', 'type', 'aid']]

df_train_labels['session'] = df_train_labels['session'].astype('int32')
df_train_labels['type'] = df_train_labels['type'].astype('uint8')
df_train_labels['aid'] = df_train_labels['aid'].astype('int32')

df_train_labels['gt'] = 1

df_train = train.merge(df_train_labels, on=['session', 'type', 'aid'], how='left')

df_train['gt'] = df_train['gt'].fillna(0).astype('uint8')

#train.to_parquet('train_processed.parquet', write_index=False)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('type', 'float64'))

+------------------------+------------+-------------+
| Merge columns          | left dtype | right dtype |
+------------------------+------------+-------------+
| ('session', 'session') | int64      | int32       |
| ('type', 'type')       | int8       | uint8       |
| ('aid', 'aid')         | int64      | int32       |
+------------------------+------------+-------------+
Cast dtypes explicitly to avoid unexpected results.


In [6]:
df_train.head()

Unnamed: 0,session,action_reverse,session_length,log_recency_score,type_weighted_log_recency_score,aid,ts,type,gt
0,5899776,3,4,0.071773,0.071773,1489275,1660039772288,0,0
1,5899776,2,4,0.319508,0.319508,1826552,1660043110728,0,0
2,5899776,1,4,0.624505,0.624505,1632206,1660048043858,0,0
3,5899776,0,4,1.0,1.0,1531634,1660048104470,0,0
4,5899777,1,2,0.071773,0.071773,1086210,1660039772327,0,0


In [7]:
def get_session_lengths(df):
    return df.groupby('session')['session'].count().compute().to_numpy()

In [8]:
session_lengths_train = get_session_lengths(df_train)

In [13]:
session_lengths_train

array([10, 39, 34, ...,  3, 12, 50], shape=(239072,))

# Model training

In [9]:
from lightgbm.sklearn import LGBMRanker

In [10]:
ranker = LGBMRanker(
    objective="lambdarank",
    metric="ndcg",
    boosting_type="dart",
    n_estimators=20,
    importance_type='gain',
)

In [11]:
feature_cols = ['aid', 'type', 'action_reverse', 'session_length', 'log_recency_score', 'type_weighted_log_recency_score']
target = 'gt'

In [14]:
df_train_pd = df_train[feature_cols].compute() 
target_pd = df_train[target].compute()

In [15]:
ranker = ranker.fit(
    df_train_pd,
    target_pd,
    group=session_lengths_train,
)

[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.336858 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1278
[LightGBM] [Info] Number of data points in the train set: 4140000, number of used features: 6


In [18]:
joblib.dump(ranker, "lightgbm_ranker.pkl")

['lightgbm_ranker.pkl']

In [None]:
ranker = joblib.load("lightgbm_ranker.pkl")