In [21]:
# --- 0. Setup and Imports ---
import pandas as pd
import numpy as np
import lightgbm as lgb
import optuna
import gc
import logging
import warnings
from sklearn.model_selection import GroupKFold
from sklearn.feature_extraction.text import TfidfVectorizer

# --- Configuration ---
warnings.filterwarnings('ignore')
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('robust_pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

In [22]:
CACHE_DIR = "/teamspace/studios/this_studio/"

def safe_save(obj, name):
    os.makedirs(CACHE_DIR, exist_ok=True)
    with open(os.path.join(CACHE_DIR, f"{name}.pkl"), "wb") as f:
        pickle.dump(obj, f)

def safe_load(name):
    path = os.path.join(CACHE_DIR, f"{name}.pkl")
    if os.path.exists(path):
        with open(path, "rb") as f:
            return pickle.load(f)
    return None


In [23]:
# --- 1. Data Loading and Initial Preparation ---
def load_and_prepare_data(base_path='/teamspace/uploads/'):
    """Loads all data, standardizes IDs, and fixes basic data types."""
    logger.info("Starting data loading and preparation.")
    try:
        train = pd.read_parquet(base_path + 'train_data.parquet')
        logger.info(f"Loaded train_data.parquet with shape: {train.shape}")
        test = pd.read_parquet(base_path + 'test_data.parquet')
        logger.info(f"Loaded test_data.parquet with shape: {test.shape}")
        events = pd.read_parquet(base_path + 'add_event.parquet')
        logger.info(f"Loaded add_event.parquet with shape: {events.shape}")
        trans = pd.read_parquet(base_path + 'add_trans.parquet')
        logger.info(f"Loaded add_trans.parquet with shape: {trans.shape}")
        offers = pd.read_parquet(base_path + 'offer_metadata.parquet')
        logger.info(f"Loaded offer_metadata.parquet with shape: {offers.shape}")
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise

    # Standardize IDs to string for safe merging
    for df_name, df in zip(['train', 'test', 'events', 'trans', 'offers'], [train, test, events, trans, offers]):
        for col in ['id2', 'id3']:
            if col in df.columns:
                df[col] = df[col].astype(str)
    logger.info("Standardized 'id2' and 'id3' columns to string type.")

    # Prepare target variable
    train['y'] = pd.to_numeric(train['y'], errors='coerce').fillna(0).astype(int)
    
    # Convert timestamps
    train['id4'] = pd.to_datetime(train['id4'], errors='coerce')
    test['id4'] = pd.to_datetime(test['id4'], errors='coerce')
    trans['f370'] = pd.to_datetime(trans['f370'], errors='coerce')
    offers['id12'] = pd.to_datetime(offers['id12'], errors='coerce') # start date
    offers['id13'] = pd.to_datetime(offers['id13'], errors='coerce') # end date
    logger.info("Converted timestamp columns to datetime objects.")
    
    logger.info(f"Data preparation complete. Final shapes - Train: {train.shape}, Test: {test.shape}")
    return train, test, events, trans, offers

In [24]:
# --- 2. Feature Engineering ---
def create_features(df, offers, trans, events):
    """Creates a rich set of features for the ranking model."""
    logger.info(f"Starting feature engineering on dataframe with shape {df.shape}.")
    
    # Merge offer metadata first
    df = df.merge(offers, on='id3', how='left')
    logger.info(f"Shape after merging offers: {df.shape}")

    # --- A. Basic Temporal Features ---
    df['dayofweek'] = df['id4'].dt.dayofweek
    df['hour'] = df['id4'].dt.hour
    logger.info("Created basic temporal features: dayofweek, hour.")
    
    # --- B. Industry Code (id8) Alignment Features ---
    logger.info("Creating id8 (Industry) alignment features.")
    customer_industry_spend = trans.groupby(['id2', 'id8'])['f367'].sum().reset_index()
    customer_total_spend = trans.groupby('id2')['f367'].sum().reset_index().rename(columns={'f367': 'total_spend'})
    customer_industry_spend = customer_industry_spend.merge(customer_total_spend, on='id2', how='left')
    customer_industry_spend['industry_spend_proportion'] = customer_industry_spend['f367'] / customer_industry_spend['total_spend']
    
    df = df.merge(customer_industry_spend[['id2', 'id8', 'industry_spend_proportion']], on=['id2', 'id8'], how='left')
    df['industry_match'] = df['industry_spend_proportion'].notna().astype(int)
    
    # --- C. Behavioral, Fatigue & Session Features ---
    logger.info("Creating behavioral and fatigue features.")
    df = df.sort_values(by=['id2', 'id4']).reset_index(drop=True)
    df['offer_fatigue_user'] = df.groupby(['id2', 'id3']).cumcount()
    df['campaign_fatigue_user'] = df.groupby(['id2', 'id9']).cumcount()
    df['session_impression_gap_secs'] = df.groupby('id2')['id4'].diff().dt.total_seconds()
    
    # --- D. Cold-Start & Global Features ---
    logger.info("Creating cold-start and global features.")
    df['offer_age_days'] = (df['id4'] - df['id12']).dt.days
    df['offer_duration_days'] = (df['id13'] - df['id12']).dt.days
    
    # --- E. Transaction and Event Aggregations ---
    logger.info("Creating transaction and event aggregations.")
    agg_trans = trans.groupby('id2')['f367'].agg(['mean', 'std', 'sum', 'count']).reset_index()
    agg_trans.columns = ['id2', 'trans_mean', 'trans_std', 'trans_sum', 'trans_count']
    df = df.merge(agg_trans, on='id2', how='left')

    events['click_flag'] = events['id7'].notnull().astype(int)
    event_aggs = events.groupby('id2').agg(click_rate=('click_flag', 'mean'), click_count=('click_flag', 'sum'))
    df = df.merge(event_aggs, on='id2', how='left')

    # --- F. Interaction Features ---
    df['offer_popularity'] = df.groupby('id3')['id2'].transform('count')
    df['customer_offer_frequency'] = df.groupby('id2')['id3'].transform('count')

    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    
    logger.info(f"Feature engineering complete. Final shape: {df.shape}")
    return df

In [25]:
# --- 3. Text & Target Encoding ---
def apply_text_and_target_encoding(train_df, test_df, target_col='y', group_col='id2'):
    """Applies TF-IDF and GroupKFold Target Encoding."""
    logger.info("Applying TF-IDF and Target Encoding.")
    
    # TF-IDF on 'f378'
    tfidf = TfidfVectorizer(max_features=30, min_df=5, dtype=np.float32)
    tfidf_train = tfidf.fit_transform(train_df['f378'].fillna('missing'))
    tfidf_test = tfidf.transform(test_df['f378'].fillna('missing'))
    
    tfidf_train_df = pd.DataFrame(tfidf_train.toarray(), columns=[f'tfidf_{i}' for i in range(tfidf_train.shape[1])])
    tfidf_test_df = pd.DataFrame(tfidf_test.toarray(), columns=[f'tfidf_{i}' for i in range(tfidf_test.shape[1])])
    
    train_df = pd.concat([train_df.reset_index(drop=True), tfidf_train_df], axis=1)
    test_df = pd.concat([test_df.reset_index(drop=True), tfidf_test_df], axis=1)
    logger.info(f"Applied TF-IDF. Train shape: {train_df.shape}, Test shape: {test_df.shape}")
    
    # GroupKFold Target Encoding for 'id3'
    logger.info("Starting GroupKFold Target Encoding for 'id3'.")
    kf = GroupKFold(n_splits=5)
    global_mean = train_df[target_col].mean()
    
    train_df['id3_te'] = 0.0
    for fold, (tr_idx, val_idx) in enumerate(kf.split(train_df, train_df[target_col], train_df[group_col])):
        logger.info(f"Processing fold {fold+1}/5 for target encoding.")
        means = train_df.iloc[tr_idx].groupby('id3')[target_col].mean()
        train_df.loc[val_idx, 'id3_te'] = train_df.loc[val_idx, 'id3'].map(means).fillna(global_mean)
        
    full_train_means = train_df.groupby('id3')[target_col].mean()
    test_df['id3_te'] = test_df['id3'].map(full_train_means).fillna(global_mean)
    logger.info("Target encoding complete.")
    
    return train_df, test_df

In [26]:
# --- 4. Model Training with Optuna ---
def train_lgbm_with_optuna(train_df, test_df, features, n_trials=9):
    """Trains a LightGBM Ranker with Optuna for hyperparameter tuning."""
    logger.info(f"Starting model training with Optuna for {n_trials} trials.")
    
    X = train_df[features].copy()
    y = train_df['y'].copy()
    X_test = test_df[features].copy()
    groups = train_df['id2'].copy()
    
    logger.info(f"Training data shape (X): {X.shape}")
    
    for col in X.columns:
        if X[col].isnull().any():
            median_val = X[col].median()
            X[col].fillna(median_val, inplace=True)
            X_test[col].fillna(median_val, inplace=True)
    logger.info("Filled NaN values using column medians.")

    def objective(trial):
        params = {
            'objective': 'lambdarank', 'metric': 'map', 'boosting_type': 'gbdt',
            'n_estimators': trial.suggest_int('n_estimators', 500, 2000),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05),
            'num_leaves': trial.suggest_int('num_leaves', 31, 100),
            'max_depth': trial.suggest_int('max_depth', 7, 11),
            'min_child_samples': trial.suggest_int('min_child_samples', 50, 150),
            'feature_fraction': trial.suggest_float('feature_fraction', 0.7, 1.0),
            'bagging_fraction': trial.suggest_float('bagging_fraction', 0.7, 1.0),
            'lambda_l1': trial.suggest_float('lambda_l1', 1e-6, 5.0, log=True),
            'lambda_l2': trial.suggest_float('lambda_l2', 1e-6, 5.0, log=True),
            'random_state': 42, 'verbose': -1, 'bagging_freq': 1
        }
        
        gkf = GroupKFold(n_splits=5)
        oof_map_scores = []

        for fold, (train_idx, val_idx) in enumerate(gkf.split(X, y, groups)):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
            group_tr = groups.iloc[train_idx].value_counts(sort=False).values
            group_val = groups.iloc[val_idx].value_counts(sort=False).values

            model = lgb.LGBMRanker(**params)
            model.fit(X_train, y_train, group=group_tr,
                      eval_set=[(X_val, y_val)], eval_group=[group_val],
                      eval_metric='map', eval_at=[7],
                      callbacks=[lgb.early_stopping(50, verbose=False)])
            
            map_at_7 = model.best_score_['valid_0']['map@7']
            oof_map_scores.append(map_at_7)
        
        mean_score = np.mean(oof_map_scores)
        logger.info(f"Optuna Trial {trial.number} finished with MAP@7: {mean_score}")
        return mean_score

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials)
    
    logger.info(f"Best trial MAP@7: {study.best_value}")
    logger.info(f"Best params: {study.best_params}")
    
    logger.info("Training final model on all data with best parameters.")
    best_params = study.best_params
    best_params.update({'objective': 'lambdarank', 'metric': 'map'})
    
    final_model = lgb.LGBMRanker(**best_params)
    group_full = groups.value_counts(sort=False).values
    final_model.fit(X, y, group=group_full)
    logger.info("Final model training complete.")
    
    test_preds = final_model.predict(X_test)
    
    return test_preds, final_model

In [27]:
# --- 4. Model Training with Optuna ---
def train_lgbm_with_optuna(train_df, test_df, features, n_trials=10):
    """Trains a LightGBM Ranker with Optuna for hyperparameter tuning."""
    logger.info(f"Starting model training with Optuna for {n_trials} trials.")
    
    X = train_df[features].copy()
    y = train_df['y'].copy()
    X_test = test_df[features].copy()
    groups = train_df['id2'].copy()
    
    # NEW LOGGING: Log data shapes before training
    logger.info(f"Training data shape (X): {X.shape}")
    logger.info(f"Test data shape (X_test): {X_test.shape}")
    
    for col in X.columns:
        if X[col].isnull().any():
            median_val = X[col].median()
            X[col].fillna(median_val, inplace=True)
            X_test[col].fillna(median_val, inplace=True)
    logger.info("Filled NaN values using column medians.") # NEW LOGGING

    def objective(trial):
        params = {
            'objective': 'lambdarank', 'metric': 'map', 'boosting_type': 'gbdt',
            'n_estimators': trial.suggest_int('n_estimators', 500, 2000),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05),
            'num_leaves': trial.suggest_int('num_leaves', 31, 100),
            'max_depth': trial.suggest_int('max_depth', 7, 11),
            'min_child_samples': trial.suggest_int('min_child_samples', 50, 150),
            'feature_fraction': trial.suggest_float('feature_fraction', 0.7, 1.0),
            'bagging_fraction': trial.suggest_float('bagging_fraction', 0.7, 1.0),
            'lambda_l1': trial.suggest_float('lambda_l1', 1e-6, 5.0, log=True),
            'lambda_l2': trial.suggest_float('lambda_l2', 1e-6, 5.0, log=True),
            'random_state': 42, 'verbose': -1, 'bagging_freq': 1
        }
        
        gkf = GroupKFold(n_splits=4)
        oof_map_scores = []

        for fold, (train_idx, val_idx) in enumerate(gkf.split(X, y, groups)):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
            group_tr = groups.iloc[train_idx].value_counts(sort=False).values
            group_val = groups.iloc[val_idx].value_counts(sort=False).values

            model = lgb.LGBMRanker(**params)
            model.fit(X_train, y_train, group=group_tr,
                      eval_set=[(X_val, y_val)], eval_group=[group_val],
                      eval_metric='map', eval_at=[7],
                      callbacks=[lgb.early_stopping(50, verbose=False)])
            
            map_at_7 = model.best_score_['valid_0']['map@7']
            oof_map_scores.append(map_at_7)
        
        mean_score = np.mean(oof_map_scores)
        logger.info(f"Optuna Trial {trial.number} finished with MAP@7: {mean_score}") # NEW LOGGING
        return mean_score

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials)
    
    logger.info(f"Best trial MAP@7: {study.best_value}")
    logger.info(f"Best params: {study.best_params}")
    
    logger.info("Training final model on all data with best parameters.")
    best_params = study.best_params
    best_params.update({'objective': 'lambdarank', 'metric': 'map'})
    
    final_model = lgb.LGBMRanker(**best_params)
    group_full = groups.value_counts(sort=False).values
    final_model.fit(X, y, group=group_full)
    logger.info("Final model training complete.") # NEW LOGGING
    
    test_preds = final_model.predict(X_test)
    
    return test_preds, final_model, study.best_params


In [28]:
# --- 5. Submission Generation ---
def create_submission(test_df, predictions, filename='submission_robust.csv'):
    """Creates the submission file in the required format."""
    logger.info(f"Creating submission file: {filename}")
    submission = test_df[['id1', 'id2', 'id3', 'id5']].copy()
    submission['pred'] = predictions
    
    # Group-wise normalization for ranking
    submission['pred'] = submission.groupby('id2')['pred'].transform(
        lambda x: (x - x.min()) / (x.max() - x.min() + 1e-9)
    )
    submission['id5'] = pd.to_datetime(submission['id5'], errors='coerce').dt.strftime('%m/%d/%y')
    
    submission.to_csv(filename, index=False)
    logger.info(f"Submission file saved to {filename}. Shape: {submission.shape}")
    return submission

In [29]:
# --- 6. Main Execution Block ---
def main():
    """Main pipeline execution function."""
    logger.info("====== Starting Full Robust Pipeline ======")
    
    train, test, events, trans, offers = load_and_prepare_data()
    
    train_fe = create_features(train, offers, trans, events)
    test_fe = create_features(test, offers, trans, events)
    
    train_processed, test_processed = apply_text_and_target_encoding(train_fe, test_fe)
    
    gc.collect()
    
    exclude_cols = ['id1', 'id2', 'id3', 'id4', 'id5', 'id9', 'id10', 'id11', 'id12', 'id13', 'f378', 'start', 'end', 'y']
    features = [col for col in train_processed.columns if col not in exclude_cols]
    features = [f for f in features if train_processed[f].dtype in ['int64', 'float64', 'int32', 'float32', 'int8', 'bool']]
    logger.info(f"Training with {len(features)} features.")
    
    test_predictions, model, _ = train_lgbm_with_optuna(train_processed, test_processed, features)
    
    _ = create_submission(test_processed, test_predictions)
    
    importance_df = pd.DataFrame({
        'feature': features,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    logger.info(f"Top 20 Feature Importances:\n{importance_df.head(20)}")
    
    logger.info("====== Pipeline Finished Successfully ======")

In [30]:
if __name__ == "__main__":
    main()

2025-07-20 17:22:32,152 - INFO - Starting data loading and preparation.
2025-07-20 17:22:38,655 - INFO - Loaded train_data.parquet with shape: (770164, 372)
2025-07-20 17:22:43,122 - INFO - Loaded test_data.parquet with shape: (369301, 371)
2025-07-20 17:22:56,211 - INFO - Loaded add_event.parquet with shape: (21457473, 5)
2025-07-20 17:22:58,185 - INFO - Loaded add_trans.parquet with shape: (6339465, 9)
2025-07-20 17:22:58,374 - INFO - Loaded offer_metadata.parquet with shape: (4164, 12)
2025-07-20 17:23:05,100 - INFO - Standardized 'id2' and 'id3' columns to string type.
2025-07-20 17:23:05,823 - INFO - Converted timestamp columns to datetime objects.
2025-07-20 17:23:05,824 - INFO - Data preparation complete. Final shapes - Train: (770164, 372), Test: (369301, 371)
2025-07-20 17:23:05,825 - INFO - Starting feature engineering on dataframe with shape (770164, 372).
2025-07-20 17:23:11,479 - INFO - Shape after merging offers: (770164, 383)
2025-07-20 17:23:11,510 - INFO - Created basi