# Otto RecSys - Candidate ReRank Model - Use source code

## Data Preparing

In [1]:
!git clone https://github.com/triet4p/otto-rec-sys.git

Cloning into 'otto-rec-sys'...
remote: Enumerating objects: 95, done.[K
remote: Counting objects: 100% (95/95), done.[K
remote: Compressing objects: 100% (65/65), done.[K
remote: Total 95 (delta 43), reused 76 (delta 24), pack-reused 0 (from 0)[K
Receiving objects: 100% (95/95), 1.64 MiB | 18.67 MiB/s, done.
Resolving deltas: 100% (43/43), done.


In [2]:
%cd otto-rec-sys
!git checkout 33986ff48aa765cb612d02b94565c7818c0fbfe6

/kaggle/working/otto-rec-sys
Note: switching to '33986ff48aa765cb612d02b94565c7818c0fbfe6'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

HEAD is now at 33986ff Update training.py


In [3]:
import sys
sys.path.append('/kaggle/working/otto-rec-sys')

In [4]:
%%time
from src.core.utils import load_raw_data_parquet
# Load train parquet

train_df = load_raw_data_parquet('/kaggle/input/otto-chunk-data-inparquet-format/train_parquet/*')

CPU times: user 30.4 s, sys: 29.3 s, total: 59.8 s
Wall time: 40.5 s


In [5]:
%%time
from src.candidate_generation.loader import load_covisit_matrix
MATRIX_BASE_PATH = '/kaggle/input/otto-precal-covisit-matrices/submission/train/'

df_clicks_train = load_covisit_matrix(MATRIX_BASE_PATH + 'top_*_clicks_*.pqt', 'clicks', 20)
df_buys_train = load_covisit_matrix(MATRIX_BASE_PATH + 'top_*_carts_orders_*.pqt', 'buys', 15)
df_buy2buy_train = load_covisit_matrix(MATRIX_BASE_PATH + 'top_*_buy2buy_*.pqt', 'buy2buy', 15)

CPU times: user 18.7 s, sys: 7.3 s, total: 26 s
Wall time: 22.4 s


In [6]:
from src.pipeline.preprocess import sample_train
TRAIN_SESSION_SAMPLE_RATE = 0.18 # Lấy 40% số session

sampled_train_dfs = []
seeds = [42,43,44]

for i in range(len(seeds)):
    sampled_train_dfs.append(sample_train(train_df,
                                          TRAIN_SESSION_SAMPLE_RATE,
                                          seed=seeds[i]))

In [7]:
import gc
from src.pipeline.preprocess import get_history_and_label_df
history_dfs = []
history_source_dfs = []
truth_label_dfs = []

for i in range(len(seeds)):
    history_df, truth_label_df, history_source_df = get_history_and_label_df(sampled_train_dfs[i])
    history_dfs.append(history_df)
    truth_label_dfs.append(truth_label_df)
    history_source_dfs.append(history_source_df)

    del history_df, truth_label_df
    gc.collect()

History and Labels for training have been created.
History and Labels for training have been created.
History and Labels for training have been created.


In [8]:
from src.pipeline.preprocess import get_popular_items_df

popular_items_dfs = []
for i in range(len(seeds)):
    popular_items_dfs.append(get_popular_items_df(sampled_train_dfs[i]))

In [9]:
from src.pipeline.preprocess import pre_compute_item_popularity
item_popularity_dfs = []
for i in range(len(seeds)):
    item_popularity_dfs.append(pre_compute_item_popularity(sampled_train_dfs[i]))

In [10]:
import os
import polars as pl
from tqdm import tqdm
from src.pipeline.preprocess import process_chunk

N_CHUNKS = 20
for i in range(len(seeds)):
    history_df = history_dfs[i]
    popular_items_df = popular_items_dfs[i]
    all_sessions = history_df['session'].unique().to_list()
    chunk_size = len(all_sessions) // N_CHUNKS

    TEMP_CHUNK_PATH = f'/kaggle/working/temp_candidate_chunks/sample_{i}/' # Thư mục để lưu các file tạm
    
    os.makedirs(TEMP_CHUNK_PATH, exist_ok=True)
    
    print(f"\n--- Processing {len(all_sessions)} sessions in {N_CHUNKS+1} chunks ---")
    for i in tqdm(range(N_CHUNKS + 1)):
        start = i * chunk_size
        end = (i + 1) * chunk_size
        if start >= len(all_sessions):
            break
        
        session_chunk_ids = all_sessions[start:end]
        history_chunk = history_df.filter(pl.col('session').is_in(session_chunk_ids))
        
        # Gọi hàm xử lý cho chunk
        chunk_result = process_chunk(history_chunk, popular_items_df,
                                     df_clicks_train, df_buys_train, df_buy2buy_train)
        # Thêm đặc trưng cuối cùng cho nguồn popular
        chunk_result = chunk_result.with_columns(
            pl.col('candidate_aid').is_in(popular_items_df['candidate_aid']).cast(pl.UInt8).alias('source_popular')
        )
        # --- THAY ĐỔI QUAN TRỌNG: LƯU RA FILE THAY VÌ APPEND VÀO LIST ---
        chunk_result.write_parquet(TEMP_CHUNK_PATH + f'candidates_chunk_{i}.pqt')
        
        # Dọn dẹp bộ nhớ
        gc.collect()


--- Processing 547624 sessions in 21 chunks ---


100%|██████████| 21/21 [03:46<00:00, 10.79s/it]



--- Processing 548829 sessions in 21 chunks ---


100%|██████████| 21/21 [03:48<00:00, 10.88s/it]



--- Processing 548386 sessions in 21 chunks ---


100%|██████████| 21/21 [04:36<00:00, 13.18s/it]


In [11]:
del df_clicks_train, df_buys_train, df_buy2buy_train, popular_items_dfs, sampled_train_dfs
_ = gc.collect()

In [12]:
import numpy as np
np.random.seed(42)

## Training

In [13]:
# 4. Tạo ra 3 bộ dữ liệu huấn luyện riêng biệt
# Định nghĩa các siêu tham số cho việc lấy mẫu
TARGET_RATIOS = {'clicks': 10, 'carts': 20, 'orders': 30}
POS_RATE = {'clicks': 0.5, 'carts': 1.0, 'orders': 1.0}
POPULAR_FRACTION = 0.5 # 50% mẫu âm sẽ là "khó", 50% là ngẫu nhiên

In [14]:
%%time
from src.pipeline.training import create_training_set_for_type
for i in range(len(seeds)):
    TEMP_CHUNK_PATH = f'/kaggle/working/temp_candidate_chunks/sample_{i}/'
    lazy_final_df = pl.scan_parquet(TEMP_CHUNK_PATH + 'candidates_chunk_*.pqt')
    lazy_final_df = lazy_final_df.fill_null(0)

    for pred_type in ['clicks', 'carts', 'orders']:
        training_sets = create_training_set_for_type(
            lazy_final_df, truth_label_dfs[i], item_popularity_dfs[i], pred_type,
            positive_rate = POS_RATE[pred_type],
            target_neg_pos_ratio=TARGET_RATIOS[pred_type], popular_fraction=POPULAR_FRACTION
        )
        training_sets.write_parquet(TEMP_CHUNK_PATH + f'training_set_{pred_type}.pqt')
        del training_sets
        gc.collect()


--- Creating training set for 'clicks'  ---
0
Collecting positive samples for '0'...
Collecting all negative samples for '0'...
Positives: 318297. Total negatives to sample: 3182970 (1591485 popular + 1591485 random)
Sampling popular negatives...
Sampling random negatives...

--- Creating training set for 'carts'  ---
1
Collecting positive samples for '1'...
Collecting all negative samples for '1'...
Positives: 94736. Total negatives to sample: 1894720 (947360 popular + 947360 random)
Sampling popular negatives...
Sampling random negatives...

--- Creating training set for 'orders'  ---
2
Collecting positive samples for '2'...
Collecting all negative samples for '2'...
Positives: 37009. Total negatives to sample: 1110270 (555135 popular + 555135 random)
Sampling popular negatives...
Sampling random negatives...

--- Creating training set for 'clicks'  ---
0
Collecting positive samples for '0'...
Collecting all negative samples for '0'...
Positives: 318053. Total negatives to sample: 3

In [15]:
import polars as pl
import numpy as np
import lightgbm as lgb
import gc
import matplotlib.pyplot as plt
import seaborn as sns

# --- 1. Feature Tĩnh (Item Features) ---
def create_time_window_features(feature_source_df: pl.DataFrame, time_window_days: int = None) -> pl.DataFrame:
    if time_window_days is not None:
        last_ts = feature_source_df['ts'].max()
        start_ts = last_ts - (time_window_days * 24 * 60 * 60)
        source_df = feature_source_df.filter(pl.col('ts') >= start_ts)
        suffix = f'_{time_window_days}d'
    else:
        source_df = feature_source_df
        suffix = '_all'
        
    item_feats = source_df.group_by('aid').agg([
        pl.count().alias(f'item_total_counts{suffix}'),
        pl.col('type').filter(pl.col('type') == 0).count().alias(f'item_click_counts{suffix}'),
        pl.col('type').filter(pl.col('type') == 1).count().alias(f'item_cart_counts{suffix}'),
        pl.col('type').filter(pl.col('type') == 2).count().alias(f'item_order_counts{suffix}'),
    ]).rename({'aid': 'candidate_aid'})
    
    # Tính tỷ lệ chuyển đổi (Smoothing +10 để tránh nhiễu ở item ít tương tác)
    item_feats = item_feats.with_columns([
        (pl.col(f'item_order_counts{suffix}') / (pl.col(f'item_click_counts{suffix}') + 10)).alias(f'item_buy_ratio{suffix}'),
        (pl.col(f'item_cart_counts{suffix}') / (pl.col(f'item_click_counts{suffix}') + 10)).alias(f'item_cart_ratio{suffix}'),
    ])
    return item_feats

# --- 2. Feature Động (Session Context) ---
def create_session_context_features(session_context_df: pl.DataFrame) -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]:
    """
    Tính toán tất cả các feature dựa trên context của session.
    Trả về: (session_level_features, interaction_level_features, last_item_info)
    """
    # A. Session Level Features
    session_feats = session_context_df.group_by('session').agg([
        pl.count().alias('session_length'),
        pl.col('aid').n_unique().alias('session_unique_aids'),
        pl.col('ts').max().alias('session_end_ts'),
        (pl.col('ts').max() - pl.col('ts').min()).alias('session_duration'),
    ])
    
    # B. Interaction Level Features (Lặp lại & Thời gian cuối)
    interaction_feats = session_context_df.group_by(['session', 'aid']).agg([
        pl.count().alias('num_repetitions'),
        pl.col('ts').max().alias('last_item_ts')
    ]).rename({'aid': 'candidate_aid'})
    
    # C. Last Item Info (Item cuối cùng user xem)
    last_items = session_context_df.sort('ts').group_by('session', maintain_order=True).last()
    last_items = last_items.select(['session', 'aid']).rename({'aid': 'last_aid'})
    
    return session_feats, interaction_feats, last_items

In [16]:
def add_sorted_rank_features(df: pl.DataFrame, fill_value: int = 999) -> pl.DataFrame:
    """Tạo feature rank tốt nhất từ tất cả các nguồn."""
    rank_cols = [col for col in df.columns if col.startswith('rank_')]
    if not rank_cols: return df

    # Tạo list rank, sort và lấy ra các giá trị min
    rank_exprs = [pl.col(c).fill_null(fill_value) for c in rank_cols]
    
    df = df.with_columns(
        pl.concat_list(rank_exprs).list.sort().alias('temp_sorted_ranks')
    )
    
    # Tách ra min_rank_1 (best), min_rank_2 (2nd best)
    new_cols = [
        pl.col('temp_sorted_ranks').list.get(0).alias('min_rank_1'),
        pl.col('temp_sorted_ranks').list.get(1).alias('min_rank_2'),
        # Số lượng nguồn gợi ý item này
        (pl.col('temp_sorted_ranks').list.eval(pl.element() < fill_value).list.sum()).alias('n_sources_present')
    ]
    return df.with_columns(new_cols).drop('temp_sorted_ranks')

def add_features(df: pl.DataFrame, 
                 time_window_feats_list: list[pl.DataFrame], # [item_all, item_7d]
                 session_feats: pl.DataFrame,
                 interaction_feats: pl.DataFrame,
                 last_items: pl.DataFrame) -> pl.DataFrame:
   
    # 1. Join các bảng gốc (Giữ nguyên)
    for tw_feat in time_window_feats_list:
        df = df.join(tw_feat, on='candidate_aid', how='left')
    df = df.join(session_feats, on='session', how='left')
    df = df.join(interaction_feats, on=['session', 'candidate_aid'], how='left')
    df = df.join(last_items, on='session', how='left')

    # --- NHÓM 1: TREND / VELOCITY (Tốc độ tăng trưởng của Item) ---
    # So sánh 7 ngày vs All time. 
    # Logic: Item có tỷ trọng click trong 7 ngày cao bất thường so với lịch sử -> Đang Hot.
    
    # Giả sử time_window_feats_list[0] là ALL, [1] là 7D
    # Các cột sẽ có suffix '_all' và '_7d'
    
    df = df.with_columns([
        # Tỷ lệ click gần đây / click tổng (Cộng 10 để tránh chia 0 và nhiễu)
        (pl.col('item_click_counts_7d').fill_null(0) / (pl.col('item_click_counts_all').fill_null(0) + 10)).alias('click_trend_7d_vs_all'),
        
        # Tỷ lệ order gần đây / order tổng
        (pl.col('item_order_counts_7d').fill_null(0) / (pl.col('item_order_counts_all').fill_null(0) + 10)).alias('order_trend_7d_vs_all'),
        
        # Conversion Rate thay đổi thế nào? (CR 7 ngày - CR All)
        (pl.col('item_buy_ratio_7d').fill_null(0) - pl.col('item_buy_ratio_all').fill_null(0)).alias('conversion_trend_diff')
    ])

    # --- NHÓM 2: CROSS-SOURCE COMPARISON (So sánh giữa các nguồn Co-visit) ---
    # Logic: Sự chênh lệch thứ hạng giữa các nguồn nói lên điều gì?
    # Ví dụ: Rank Buy2Buy thấp (tốt) nhưng Rank Clicks cao (tệ) -> Item này ít người click nhưng hễ click là mua -> Tiềm năng cao.
    
    # Fill null rank bằng 999 trước khi tính toán
    rank_cols = ['rank_clicks', 'rank_buys', 'rank_buy2buy']
    for c in rank_cols:
        if c not in df.columns:
            df = df.with_columns(pl.lit(999).alias(c))
        else:
            df = df.with_columns(pl.col(c).fill_null(999))

    df = df.with_columns([
        # Chênh lệch rank
        (pl.col('rank_clicks') - pl.col('rank_buy2buy')).alias('rank_diff_click_buy2buy'),
        (pl.col('rank_buys') - pl.col('rank_buy2buy')).alias('rank_diff_buys_buy2buy'),
        
        # Tổng hợp trọng số (Weighted Sum) - Tạo ra một "Siêu điểm số"
        (pl.col('wgt_buy2buy').fill_null(0) * 2 + pl.col('wgt_buys').fill_null(0) * 1).alias('combined_buy_weight')
    ])

    # --- NHÓM 3: CONTEXTUAL RECENCY (Tính gần đây kết hợp ngữ cảnh) ---
    # Logic: Recency quan trọng, nhưng Recency của một item "Hot" quan trọng hơn Recency của item "Rác".
    
    # Tính Recency cơ bản trước
    df = df.with_columns(
        (pl.col('session_end_ts') - pl.col('last_item_ts')).fill_null(7*24*3600).alias('recency_score')
    )
    
    # Log Recency để giảm biên độ số (giây -> log giây)
    df = df.with_columns(
        pl.col('recency_score').log1p().alias('log_recency_score')
    )
    
    # Tương tác: Điểm Co-visit chia cho thời gian (Càng gần càng giá trị)
    # Thêm 1 vào log_recency để tránh chia 0
    df = df.with_columns([
        (pl.col('wgt_buy2buy').fill_null(0) / (pl.col('log_recency_score') + 1)).alias('wgt_buy2buy_decayed'),
        (pl.col('wgt_clicks').fill_null(0) / (pl.col('log_recency_score') + 1)).alias('wgt_clicks_decayed')
    ])

    # --- NHÓM 4: CÁC CỜ (FLAGS) QUAN TRỌNG (Giữ lại từ cũ) ---
    df = df.with_columns([
        (pl.col('candidate_aid') == pl.col('last_aid')).cast(pl.Int8).fill_null(0).alias('is_last_viewed'),
        # Item này có phải là item phổ biến nhất trong session không? (Logic đơn giản: count > 1)
        (pl.col('num_repetitions') > 1).cast(pl.Int8).alias('is_repeated_in_session')
    ])

    # --- Sorted Ranks (Giữ nguyên - rất mạnh) ---
    df = add_sorted_rank_features(df)

    # Dọn dẹp
    df = df.fill_null(0)
    cols_to_drop = ['session_end_ts', 'last_item_ts', 'last_aid', 'session_duration', 'first_item_ts'] 
    df = df.drop([c for c in cols_to_drop if c in df.columns])
    
    return df

In [17]:
def select_best_features(df: pl.DataFrame, target_type: str, top_k: int = 60):
    """
    Huấn luyện nhanh 1 model để chọn ra top_k features tốt nhất cho target_type.
    """
    print(f"  >> Performing Feature Selection for {target_type}...")
    
    ignore_cols = ['session', 'candidate_aid', 'label']
    feature_cols = [c for c in df.columns if c not in ignore_cols]
    
    # Sample dữ liệu để chạy nhanh (ví dụ 2 triệu dòng)
    if len(df) > 2_000_000:
        df_sample = df.sample(n=2_000_000, seed=42)
    else:
        df_sample = df
        
    X = df_sample.select(feature_cols).to_numpy()
    y = df_sample.select('label').to_numpy().ravel()
    groups = df_sample.group_by('session', maintain_order=True).len()['len'].to_numpy()
    
    # Train model nhẹ
    model = lgb.LGBMRanker(
        objective="lambdarank", metric="map",
        n_estimators=50, learning_rate=0.1, max_depth=5,
        importance_type='gain', random_state=42, n_jobs=-1
    )
    model.fit(X, y, group=groups)
    
    # Lấy feature importance
    imp_df = pl.DataFrame({
        'feature': feature_cols,
        'gain': model.feature_importances_
    }).sort('gain', descending=True)
    
    # Chọn top K
    best_feats = imp_df.head(top_k)['feature'].to_list()
    print(f"     Selected {len(best_feats)} features. Top 5: {best_feats[:5]}")
    
    # (Tùy chọn) In ra các feature bị loại bỏ để kiểm tra
    # dropped = [f for f in feature_cols if f not in best_feats]
    # print(f"     Dropped: {dropped[:5]}...")
    
    return best_feats

def train_final_model(df: pl.DataFrame, features: list, model_type: str):
    """Huấn luyện model chính thức với danh sách feature đã chọn."""
    print(f"  >> Training Final Model for {model_type} with {len(features)} features...")
    
    df = df.sort('session')
    X = df.select(features).to_numpy()
    y = df.select('label').to_numpy().ravel()
    groups = df.group_by('session', maintain_order=True).len()['len'].to_numpy()
    
    model = lgb.LGBMRanker(
        objective="lambdarank", metric="map",
        n_estimators=500, learning_rate=0.05, num_leaves=32,
        subsample=0.8, colsample_bytree=0.7,
        random_state=42, n_jobs=-1
    )
    
    # --- SỬA LỖI Ở ĐÂY ---
    # Thêm eval_set và eval_group
    model.fit(
        X, 
        y, 
        group=groups, 
        eval_set=[(X, y)],       # Đưa tập train vào làm tập đánh giá
        eval_group=[groups],     # Cung cấp thông tin group cho tập đánh giá
        callbacks=[lgb.early_stopping(50, verbose=False)] # Tăng patience lên 50 cho an toàn
    )
    return model

In [18]:
%%time

# Dictionary để lưu danh sách feature tốt nhất cho mỗi loại (sẽ được điền ở vòng lặp 0)
best_features_map = {} 
trained_models_lst = []

# Giả sử history_source_dfs đã có sẵn từ các bước trước
for i in range(len(seeds)):
    print(f"\n{'='*30}\nPROCESSING SAMPLE {i}\n{'='*30}")
    
    TEMP_CHUNK_PATH = f'/kaggle/working/temp_candidate_chunks/sample_{i}/'
    
    # 1. Chuẩn bị Features (Tĩnh & Động)
    history_source_df = history_source_dfs[i]
    
    # Feature Tĩnh (Item) - Tính trên history_source_df (để tránh leak)
    time_window_feats = [
        create_time_window_features(history_source_df, None),
        create_time_window_features(history_source_df, 7)
    ]
    
    # Feature Động (Session) - Tính trên history_source_df
    session_feats, interaction_feats, last_items = create_session_context_features(history_source_df)

    # Dictionary lưu model của sample này
    current_sample_models = {}

    for pred_type in ['clicks', 'carts', 'orders']:
        print(f"\n--- Processing {pred_type} ---")
        
        # 2. Load dữ liệu thô (Candidate + Label)
        df_train = pl.scan_parquet(TEMP_CHUNK_PATH + f'training_set_{pred_type}.pqt').collect()
        
        # 3. Thêm tất cả Features (Full set)
        df_train = add_features(
            df_train, 
            time_window_feats, 
            session_feats, 
            interaction_feats, 
            last_items
        )
        
        # 4. Feature Selection (Chỉ chạy ở Sample đầu tiên)
        if i == 0:
            # Chọn top 50-60 feature tốt nhất cho loại dự đoán này
            selected_feats = select_best_features(df_train, pred_type, top_k=35)
            best_features_map[pred_type] = selected_feats
        else:
            print(f"  >> Reusing selected features from Sample 0 for {pred_type}")
            
        # 5. Huấn luyện Model Chính thức
        # Chỉ dùng các feature đã được chọn trong best_features_map
        final_feats = best_features_map[pred_type]
        model = train_final_model(df_train, final_feats, pred_type)
        
        # Lưu model
        model.booster_.save_model(TEMP_CHUNK_PATH + f'lgbm_ranker_{pred_type}_optimized.txt')
        current_sample_models[pred_type] = model
        
        # Dọn dẹp RAM ngay lập tức
        del df_train, model
        gc.collect()

    trained_models_lst.append(current_sample_models)
    
    # Dọn dẹp các biến feature lớn
    del time_window_feats, session_feats, interaction_feats, last_items
    gc.collect()

print("\nAll training finished!")


PROCESSING SAMPLE 0


  pl.count().alias(f'item_total_counts{suffix}'),
  pl.count().alias('session_length'),
  pl.count().alias('num_repetitions'),



--- Processing clicks ---
  >> Performing Feature Selection for clicks...
[LightGBM] [Info] Total groups: 462875, total data: 2000000
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.214096 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 5953
[LightGBM] [Info] Number of data points in the train set: 2000000, number of used features: 38
     Selected 35 features. Top 5: ['source_history', 'recency_score', 'wgt_clicks', 'wgt_clicks_decayed', 'rank_clicks']
  >> Training Final Model for clicks with 35 features...
[LightGBM] [Info] Total groups: 516928, total data: 3501267
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.308321 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 5691
[LightGBM] [

  pl.count().alias(f'item_total_counts{suffix}'),
  pl.count().alias('session_length'),
  pl.count().alias('num_repetitions'),



--- Processing clicks ---
  >> Reusing selected features from Sample 0 for clicks
  >> Training Final Model for clicks with 35 features...
[LightGBM] [Info] Total groups: 516740, total data: 3498583
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.372291 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 5696
[LightGBM] [Info] Number of data points in the train set: 3498583, number of used features: 35

--- Processing carts ---
  >> Reusing selected features from Sample 0 for carts
  >> Training Final Model for carts with 35 features...
[LightGBM] [Info] Total groups: 458315, total data: 1984752
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.167808 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Tota

  pl.count().alias(f'item_total_counts{suffix}'),
  pl.count().alias('session_length'),
  pl.count().alias('num_repetitions'),



--- Processing clicks ---
  >> Reusing selected features from Sample 0 for clicks
  >> Training Final Model for clicks with 35 features...
[LightGBM] [Info] Total groups: 515740, total data: 3500442
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.367729 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 5690
[LightGBM] [Info] Number of data points in the train set: 3500442, number of used features: 35

--- Processing carts ---
  >> Reusing selected features from Sample 0 for carts
  >> Training Final Model for carts with 35 features...
[LightGBM] [Info] Total groups: 457486, total data: 1997289
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.144652 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Tota

In [19]:
for i in range(len(seeds)):
    TEMP_CHUNK_PATH = f'/kaggle/working/temp_candidate_chunks/sample_{i}/'
    for pred_type in ['clicks', 'carts', 'orders']:
        trained_models_lst[i][pred_type].booster_.save_model(TEMP_CHUNK_PATH + f'lgbm_ranker_{pred_type}.txt')

## Validation

In [20]:
%%time
from src.pipeline.preprocess import *
from src.core.utils import *
# Load train parquet
train_df = load_raw_data_parquet('/kaggle/input/otto-chunk-data-inparquet-format/train_parquet/*')
# Load test parquet
valid_df = load_raw_data_parquet('/kaggle/input/otto-chunk-data-inparquet-format/test_parquet/*')

CPU times: user 33.8 s, sys: 51.4 s, total: 1min 25s
Wall time: 38.9 s


In [21]:
%%time
from src.candidate_generation.loader import *
MATRIX_BASE_PATH = '/kaggle/input/otto-precal-covisit-matrices/submission/all/'

df_clicks_valid = load_covisit_matrix(MATRIX_BASE_PATH + 'top_*_clicks_*.pqt', 'clicks', 20)
df_buys_valid = load_covisit_matrix(MATRIX_BASE_PATH + 'top_*_carts_orders_*.pqt', 'buys', 15)
df_buy2buy_valid = load_covisit_matrix(MATRIX_BASE_PATH + 'top_*_buy2buy_*.pqt', 'buy2buy', 15)

CPU times: user 21.5 s, sys: 9.76 s, total: 31.3 s
Wall time: 25.9 s


In [22]:
history_df = valid_df.select(['session', 'aid']).unique()

In [23]:
%%time
import polars as pl
# Prepare global popular candidate
top_clicks_popular = train_df.filter(pl.col('type') == 0)['aid'].value_counts().sort(['count'], descending=[True]).head(15)['aid']
top_carts_popular = train_df.filter(pl.col('type') == 1)['aid'].value_counts().sort(['count'], descending=[True]).head(20)['aid']
top_orders_popular = train_df.filter(pl.col('type') == 2)['aid'].value_counts().sort(['count'], descending=[True]).head(20)['aid']

popular_items = pl.concat([top_clicks_popular, 
                           top_carts_popular, 
                           top_orders_popular]).unique()

CPU times: user 1min 26s, sys: 28.9 s, total: 1min 55s
Wall time: 1min 31s


In [24]:
popular_items_df = pl.DataFrame({'candidate_aid': popular_items})
popular_items_df

candidate_aid
i64
29735
33343
108125
125278
152547
…
1502122
1562705
1603001
1629608


In [25]:
item_popularity_df = pre_compute_item_popularity(train_df)

In [26]:
time_window_feats = [
    create_time_window_features(train_df, None),
    create_time_window_features(train_df, 7)
]
session_feats, interaction_feats, last_items = create_session_context_features(valid_df)

  pl.count().alias(f'item_total_counts{suffix}'),
  pl.count().alias('session_length'),
  pl.count().alias('num_repetitions'),


In [27]:
from tqdm import tqdm
N_CHUNKS = 30

all_sessions = history_df['session'].unique().to_list()
chunk_size = len(all_sessions) // N_CHUNKS


TEMP_PREDICTION_CHUNK_PATH = '/kaggle/working/temp_prediction_chunks/' # Thư mục để lưu các file tạm
os.makedirs(TEMP_PREDICTION_CHUNK_PATH, exist_ok=True)

stats_before_filtering = []
stats_after_filtering = []

W_HISTORY = 12.0
W_REPETITION = 2.0
W_BUY2BUY = 0.6
W_BUYS = 0.3
W_CLICKS = 0.2

PRE_FILTER_TOP_K = 40
PRE_FILTER_RANDOM_N = 25

print(f"\n--- Processing {len(all_sessions)} sessions in {N_CHUNKS+1} chunks ---")
for i in tqdm(range(N_CHUNKS + 1)):
    start = i * chunk_size
    end = (i + 1) * chunk_size
    if start >= len(all_sessions):
        break
    
    session_chunk_ids = all_sessions[start:end]
    history_chunk = history_df.filter(pl.col('session').is_in(session_chunk_ids))
    
    # Gọi hàm xử lý cho chunk
    chunk_result = process_chunk(history_chunk, popular_items_df,
                                 df_clicks_valid, df_buys_valid, df_buy2buy_valid)
    # Thêm đặc trưng cuối cùng cho nguồn popular
    chunk_result = chunk_result.with_columns(
        pl.col('candidate_aid').is_in(popular_items_df['candidate_aid']).cast(pl.UInt8).alias('source_popular')
    )
    # --- B. THÊM FEATURE CHO CHUNK ---
    feature_chunk = add_features(
        chunk_result, 
        time_window_feats,
        session_feats,
        interaction_feats,
        last_items,
    )

    # ===================================================================
    # BƯỚC ĐO LƯỜNG #1: TRƯỚC KHI LỌC
    # ===================================================================
    # Tính số lượng ứng viên cho mỗi session trong chunk này
    count_before = feature_chunk.group_by('session').count()
    # Lấy giá trị trung bình và thêm vào list
    avg_before = count_before['count'].mean()
    if avg_before is not None:
        stats_before_filtering.append(avg_before)

    # ===================================================================
    # BƯỚC PRE-FILTERING (Tăng tốc lgb.predict)
    # ===================================================================
    epsilon = 1e-9 # Để tránh chia cho 0
    feature_chunk = feature_chunk.with_columns([
        # Chuẩn hóa num_repetitions_in_session
        (
            (pl.col('num_repetitions') - pl.col('num_repetitions').min().over('session')) /
            (pl.col('num_repetitions').max().over('session') - pl.col('num_repetitions').min().over('session') + epsilon)
        ).alias('norm_repetition'),
        
        # Chuẩn hóa wgt_buy2buy
        (
            (pl.col('wgt_buy2buy') - pl.col('wgt_buy2buy').min().over('session')) /
            (pl.col('wgt_buy2buy').max().over('session') - pl.col('wgt_buy2buy').min().over('session') + epsilon)
        ).alias('norm_wgt_buy2buy'),
        
        # Chuẩn hóa wgt_buys
        (
            (pl.col('wgt_buys') - pl.col('wgt_buys').min().over('session')) /
            (pl.col('wgt_buys').max().over('session') - pl.col('wgt_buys').min().over('session') + epsilon)
        ).alias('norm_wgt_buys'),
        
        # Chuẩn hóa wgt_clicks
        (
            (pl.col('wgt_clicks') - pl.col('wgt_clicks').min().over('session')) /
            (pl.col('wgt_clicks').max().over('session') - pl.col('wgt_clicks').min().over('session') + epsilon)
        ).alias('norm_wgt_clicks'),
    ]).fill_nan(0) # Điền 0 cho các trường hợp max == min
    
    # --- TÍNH TOÁN ĐIỂM SỐ HEURISTIC (Sử dụng các feature đã chuẩn hóa) ---
    feature_chunk = feature_chunk.with_columns(
        (
            (pl.col('source_history') * W_HISTORY) +
            (pl.col('norm_repetition') * W_REPETITION) +
            (pl.col('norm_wgt_buy2buy') * W_BUY2BUY) +
            (pl.col('norm_wgt_buys') * W_BUYS) +
            (pl.col('norm_wgt_clicks') * W_CLICKS)
        ).alias('heuristic_score')
    )

    print(feature_chunk.to_pandas().isna().sum().sum())
    
    
    # --- Sắp xếp TOÀN BỘ chunk theo session và điểm heuristic ---
    feature_chunk_sorted = feature_chunk.sort(['session', 'heuristic_score'], descending=[False, True])
    
    # --- Lấy Top K (Exploitation) ---
    # .head() sẽ lấy các dòng đầu tiên (có điểm cao nhất) cho mỗi nhóm session
    top_k_candidates = feature_chunk_sorted.group_by('session', maintain_order=True).head(PRE_FILTER_TOP_K)
    
    # --- Lấy Phần còn lại ---
    # Dùng anti_join để tìm tất cả các ứng viên không thuộc top K
    remaining_candidates = feature_chunk.join(
        top_k_candidates.select(['session', 'candidate_aid']),
        on=['session', 'candidate_aid'], 
        how='anti'
    )
    
        # 1. Xáo trộn (shuffle) các dòng trong mỗi nhóm session
    # 2. Lấy N dòng đầu tiên (.head(N)) từ mỗi nhóm đã được xáo trộn
    random_candidates = remaining_candidates.select(
        pl.all().shuffle().over('session')
    ).group_by('session', maintain_order=True).head(PRE_FILTER_RANDOM_N)

    
    # --- Gộp 2 phần lại ---
    feature_chunk_filtered = pl.concat([
        top_k_candidates,
        random_candidates
    ])
    
    # Loại bỏ các cột tạm thời
    cols_to_drop = [col for col in feature_chunk_filtered.columns if col.startswith('norm_') or col == 'heuristic_score']
    feature_chunk_filtered = feature_chunk_filtered.drop(cols_to_drop)
    
    # ===================================================================
    # BƯỚC ĐO LƯỜNG #2: SAU KHI LỌC
    # ===================================================================
    count_after = feature_chunk_filtered.group_by('session').count()
    avg_after = count_after['count'].mean()
    if avg_after is not None:
        stats_after_filtering.append(avg_after)
        
    print(f"  Chunk {i}: Avg candidates Before={avg_before:.1f}, After={avg_after:.1f}")
    
    # --- C. DỰ ĐOÁN TRÊN CHUNK (BƯỚC MỚI) ---
    print(f"  Chunk {i}: Predicting scores...")
    
    # Sắp xếp chunk theo session để đảm bảo thứ tự
    feature_chunk_filtered = feature_chunk_filtered.sort('session')
    
    # List để lưu các mảng điểm số từ mỗi bộ model
    all_scores = {'clicks': [], 'carts': [], 'orders': []}
    
    # Lặp qua 3 bộ model đã được tải
    for model_type in ['clicks', 'carts', 'orders']:
        # 1. Lấy danh sách feature chuẩn cho loại model này
        # (Đây là danh sách đã được chọn lọc ở bước Feature Selection khi train)
        selected_features = best_features_map[model_type]
        
        # 2. Tạo X_chunk CHỈ VỚI các feature này từ DataFrame
        # Lưu ý: Phải select từ DataFrame, không phải từ numpy array tổng
        X_chunk_for_type = feature_chunk_filtered.select(selected_features).to_numpy()
        
        # 3. Ensemble: Lặp qua các model trong list và dự đoán
        for model_set in trained_models_lst:
            model = model_set[model_type]
            # Dự đoán và thêm vào list
            pred = model.predict(X_chunk_for_type)
            all_scores[model_type].append(pred)
        
    # Lấy trung bình các điểm số
    avg_scores_clicks = np.mean(all_scores['clicks'], axis=0)
    avg_scores_carts = np.mean(all_scores['carts'], axis=0)
    avg_scores_orders = np.mean(all_scores['orders'], axis=0)
    
    # --- LOGIC ENSEMBLE KẾT THÚC ---
    
    # --- D. LƯU KẾT QUẢ ĐÃ THU GỌN ---
    # Chỉ lưu các cột cần thiết cho việc xếp hạng cuối cùng
    # --- D. LƯU KẾT QUẢ ĐÃ ĐƯỢC ENSEMBLE ---
    prediction_chunk = feature_chunk_filtered.select(['session', 'candidate_aid']).with_columns([
        pl.Series("score_clicks", avg_scores_clicks),
        pl.Series("score_carts", avg_scores_carts),
        pl.Series("score_orders", avg_scores_orders)
    ])
    
    # Lưu chunk kết quả dự đoán ra đĩa
    prediction_chunk.write_parquet(TEMP_PREDICTION_CHUNK_PATH + f'predictions_chunk_{i}.pqt')
    
    del chunk_result, feature_chunk, X_chunk_for_type, prediction_chunk, feature_chunk_filtered
        
    gc.collect()


--- Processing 1671803 sessions in 31 chunks ---


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 0: Avg candidates Before=116.4, After=63.3
  Chunk 0: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 1: Avg candidates Before=110.0, After=63.2
  Chunk 1: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 2: Avg candidates Before=108.3, After=63.2
  Chunk 2: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 3: Avg candidates Before=107.8, After=63.3
  Chunk 3: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 4: Avg candidates Before=105.8, After=63.4
  Chunk 4: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 5: Avg candidates Before=108.7, After=62.9
  Chunk 5: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 6: Avg candidates Before=104.4, After=63.0
  Chunk 6: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 7: Avg candidates Before=104.6, After=63.2
  Chunk 7: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 8: Avg candidates Before=104.8, After=63.2
  Chunk 8: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 9: Avg candidates Before=103.3, After=63.2
  Chunk 9: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 10: Avg candidates Before=111.1, After=63.2
  Chunk 10: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 11: Avg candidates Before=104.9, After=63.1
  Chunk 11: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 12: Avg candidates Before=104.6, After=63.2
  Chunk 12: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 13: Avg candidates Before=104.2, After=63.2
  Chunk 13: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 14: Avg candidates Before=107.2, After=63.2
  Chunk 14: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 15: Avg candidates Before=105.0, After=63.2
  Chunk 15: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 16: Avg candidates Before=103.4, After=63.2
  Chunk 16: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 17: Avg candidates Before=104.2, After=63.2
  Chunk 17: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 18: Avg candidates Before=106.6, After=63.2
  Chunk 18: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 19: Avg candidates Before=103.3, After=63.1
  Chunk 19: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 20: Avg candidates Before=102.7, After=63.1
  Chunk 20: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 21: Avg candidates Before=105.3, After=63.2
  Chunk 21: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 22: Avg candidates Before=107.6, After=63.2
  Chunk 22: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 23: Avg candidates Before=103.8, After=63.2
  Chunk 23: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 24: Avg candidates Before=103.3, After=63.2
  Chunk 24: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 25: Avg candidates Before=105.4, After=63.3
  Chunk 25: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 26: Avg candidates Before=104.1, After=63.2
  Chunk 26: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 27: Avg candidates Before=101.7, After=63.1
  Chunk 27: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 28: Avg candidates Before=98.9, After=63.1
  Chunk 28: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 29: Avg candidates Before=96.8, After=63.2
  Chunk 29: Predicting scores...


  count_before = feature_chunk.group_by('session').count()


0


  count_after = feature_chunk_filtered.group_by('session').count()


  Chunk 30: Avg candidates Before=62.7, After=60.8
  Chunk 30: Predicting scores...


100%|██████████| 31/31 [3:47:25<00:00, 440.19s/it]


In [28]:
del df_clicks_valid, df_buys_valid, df_buy2buy_valid, popular_items_df
_ = gc.collect()

In [29]:
print("--- Aggregating all prediction chunks ---")
TEMP_PREDICTION_CHUNK_PATH = '/kaggle/working/temp_prediction_chunks/'

# Sử dụng scan_parquet để đọc tất cả các file chunk một cách "lười biếng"
lazy_predictions_df = pl.scan_parquet(TEMP_PREDICTION_CHUNK_PATH + 'predictions_chunk_*.pqt')

# .collect() để hiện thực hóa DataFrame. Bước này bây giờ rất nhanh và nhẹ.
predictions_df = lazy_predictions_df.collect()

print(f"Aggregated predictions DataFrame shape: {predictions_df.shape}")

--- Aggregating all prediction chunks ---
Aggregated predictions DataFrame shape: (105628733, 5)


In [30]:
print("\n--- Ranking candidates and creating separate prediction dataframes ---")

# Dictionary để lưu kết quả cuối cùng
final_predictions = {}

for model_type in ['clicks', 'carts', 'orders']:
    
    score_col = f'score_{model_type}'
    preds_for_type = predictions_df.select(['session', 'candidate_aid', score_col])
    
    # Sắp xếp và lấy top 20
    top_20_preds = preds_for_type.sort(score_col, descending=True) \
                                 .group_by('session', maintain_order=False) \
                                 .head(20) \
                                 .group_by('session', maintain_order=True) \
                                 .agg(pl.col('candidate_aid').alias('labels'))
    
    final_predictions[model_type] = top_20_preds

# --- BƯỚC 1 & 2 (Không đổi) ---
pred_df_clicks = final_predictions['clicks'].with_columns(pl.col('session').cast(pl.Utf8) + "_clicks")
pred_df_carts = final_predictions['carts'].with_columns(pl.col('session').cast(pl.Utf8) + "_carts")
pred_df_orders = final_predictions['orders'].with_columns(pl.col('session').cast(pl.Utf8) + "_orders")

submission_df = pl.concat([
    pred_df_clicks,
    pred_df_carts,
    pred_df_orders
]).rename({'session': 'session_type'})

# --- BƯỚC 3: CHUYỂN ĐỔI LIST THÀNH CHUỖI (ĐÃ SỬA LỖI) ---
submission_df = submission_df.with_columns(
    # 1. Áp dụng `cast(pl.Utf8)` cho TỪNG PHẦN TỬ bên trong list
    pl.col('labels').list.eval(pl.element().cast(pl.Utf8))
    # 2. Bây giờ mới JOIN các chuỗi đó lại
    .list.join(" ")
)

# --- BƯỚC 4: LƯU RA FILE CSV (Không đổi) ---
print("Saving submission.csv...")
submission_df.write_csv("/kaggle/working/submission.csv")

print("Submission file created successfully!")
display(submission_df.head())


--- Ranking candidates and creating separate prediction dataframes ---
Saving submission.csv...
Submission file created successfully!


session_type,labels
str,str
"""13245763_clicks""","""1564562 1353965 1174319 127018…"
"""13810206_clicks""","""1776643 889222 114709 1406660 …"
"""13773565_clicks""","""168507 1141500 594728 1739065 …"
"""13779704_clicks""","""323291 1070279 1330138 1708158…"
"""13730564_clicks""","""659680 1645990 1695413 1403918…"
