In [None]:

import pandas as pd
import ast
import numpy as np

In [None]:

def evaluate_topk_fast(df, k=10):
    # Pre-sort so top-k is at the top per user
    df = df.sort_values(['UserID', 'Rank_pred'], ascending=[True, True])

    # Assign group index per row (unique integer per user)
    user_index, user_pos = np.unique(df['UserID'], return_inverse=True)

    # Count items per user
    user_counts = np.bincount(user_pos)
    user_offsets = np.zeros(len(df), dtype=int)
    np.add.at(user_offsets, np.cumsum(user_counts)[:-1], 1)
    user_offsets = np.cumsum(user_offsets)

    # Mask to keep only top-k per user
    df['row_number'] = df.groupby('UserID').cumcount()
    topk_df = df[df['row_number'] < k].copy()

    # Precision@k
    precision = topk_df['Relevance'].groupby(topk_df['UserID']).mean().mean()

    # Recall@k
    relevant_per_user = df.groupby('UserID')['Relevance'].sum()
    hits_per_user = topk_df.groupby('UserID')['Relevance'].sum()
    recall = (hits_per_user / relevant_per_user).fillna(0).mean()

    # HitRate@k
    hits = (hits_per_user > 0).astype(int)
    hit_rate = hits.mean()

    # MAP@k
    def map_at_k_per_user(x):
        rels = x['Relevance'].values
        precisions = [(rels[:i + 1].sum() / (i + 1)) for i in range(len(rels)) if rels[i]]
        return np.mean(precisions) if precisions else 0
    mapk = topk_df.groupby('UserID').apply(map_at_k_per_user).mean()

    # nDCG@k
    def dcg(rels):
        return np.sum(rels / np.log2(np.arange(2, len(rels) + 2)))
    def ndcg_per_user(x):
        dcg_val = dcg(x['Relevance'].values)
        ideal = x.sort_values('Relevance', ascending=False).head(k)
        idcg_val = dcg(ideal['Relevance'].values)
        return dcg_val / idcg_val if idcg_val > 0 else 0
    ndcg = topk_df.groupby('UserID').apply(ndcg_per_user).mean()

    return {
        'Precision@k': precision,
        'Recall@k': recall,
        'HitRate@k': hit_rate,
        'MAP@k': mapk,
        'nDCG@k': ndcg
    }

# LightGBM

In [None]:
# Load results
results_uwarm_iwarm = pd.read_csv(f'{base_path}\\lightgbm\\lightgbm_warm_user_warm_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])
results_uwarm_icold = pd.read_csv(f'{base_path}\\lightgbm\\lightgbm_warm_user_cold_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])
results_ucold_iwarm = pd.read_csv(f'{base_path}\\lightgbm\\lightgbm_cold_user_warm_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])
results_ucold_icold = pd.read_csv(f'{base_path}\\lightgbm\\lightgbm_cold_user_cold_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])

# Load the test set
test_uwarm_iwarm = pd.read_csv(f'{base_path}\\testset_warm_user_warm_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])
test_uwarm_icold = pd.read_csv(f'{base_path}\\testset_warm_user_cold_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])
test_ucold_iwarm = pd.read_csv(f'{base_path}\\testset_cold_user_warm_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])
test_ucold_icold = pd.read_csv(f'{base_path}\\testset_cold_user_cold_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])

# Merge the results with the test set
results_uwarm_iwarm = results_uwarm_iwarm.merge(test_uwarm_iwarm, on='RatingID', how='left')
results_uwarm_icold = results_uwarm_icold.merge(test_uwarm_icold, on='RatingID', how='left')
results_ucold_iwarm = results_ucold_iwarm.merge(test_ucold_iwarm, on='RatingID', how='left')
results_ucold_icold = results_ucold_icold.merge(test_ucold_icold, on='RatingID', how='left')


In [None]:
# MSE
mse_uwarm_iwarm = mean_squared_error(results_uwarm_iwarm['Rating'], results_uwarm_iwarm['Rating_pred'])
mse_uwarm_icold = mean_squared_error(y_test_uwarm_icold, results_uwarm_icold)
mse_ucold_iwarm = mean_squared_error(y_test_ucold_iwarm, results_ucold_iwarm)
mse_ucold_icold = mean_squared_error(y_test_ucold_icold, results_ucold_icold)
# RMSE
rmse_uwarm_iwarm = root_mean_squared_error(y_test_uwarm_iwarm, results_uwarm_iwarm)
rmse_uwarm_icold = root_mean_squared_error(y_test_uwarm_icold, results_uwarm_icold)
rmse_ucold_iwarm = root_mean_squared_error(y_test_ucold_iwarm, results_ucold_iwarm)
rmse_ucold_icold = root_mean_squared_error(y_test_ucold_icold, results_ucold_icold)
# MAE
mae_uwarm_iwarm = mean_absolute_error(y_test_uwarm_iwarm, y_pred_uwarm_iwarm)
mae_uwarm_icold = mean_absolute_error(y_test_uwarm_icold, y_pred_uwarm_icold)
mae_ucold_iwarm = mean_absolute_error(y_test_ucold_iwarm, y_pred_ucold_iwarm)
mae_ucold_icold = mean_absolute_error(y_test_ucold_icold, y_pred_ucold_icold)

In [None]:
# Create Rank and Rank_pred columns

# Warm user, warm item
results_uwarm_iwarm["Rank"] = results_uwarm_iwarm.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_uwarm_iwarm["Rank_pred"] = results_uwarm_iwarm.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)
# Warm user, cold item
results_uwarm_icold["Rank"] = results_uwarm_icold.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_uwarm_icold["Rank_pred"] = results_uwarm_icold.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)
# Cold user, warm item
results_ucold_iwarm["Rank"] = results_ucold_iwarm.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_ucold_iwarm["Rank_pred"] = results_ucold_iwarm.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)
# Cold user, cold item
results_ucold_icold["Rank"] = results_ucold_icold.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_ucold_icold["Rank_pred"] = results_ucold_icold.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)

# Calculate Relevance
results_uwarm_iwarm["Relevance"] = results_uwarm_iwarm["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)
results_uwarm_icold["Relevance"] = results_uwarm_icold["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)
results_ucold_iwarm["Relevance"] = results_ucold_iwarm["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)
results_ucold_icold["Relevance"] = results_ucold_icold["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)


In [None]:
# Run evaluation
k = 10
results = {}
results['warm user, warm item'] = evaluate_topk_fast(results_uwarm_iwarm, k=k)
results['warm user, cold item'] = evaluate_topk_fast(results_uwarm_icold, k=k)
results['cold user, warm item'] = evaluate_topk_fast(results_ucold_iwarm, k=k)
results['cold user, cold item'] = evaluate_topk_fast(results_ucold_icold, k=k)

# Print evaluation results
for case, metrics in results.items():
    print(f"Evaluation on {case} at top {k}:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    print('-' * 50)

# XGBoost

In [None]:
# Load results
results_uwarm_iwarm = pd.read_csv(f'{base_path}\\xgboost\\xgboost_warm_user_warm_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])
results_uwarm_icold = pd.read_csv(f'{base_path}\\xgboost\\xgboost_warm_user_cold_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])
results_ucold_iwarm = pd.read_csv(f'{base_path}\\xgboost\\xgboost_cold_user_warm_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])
results_ucold_icold = pd.read_csv(f'{base_path}\\xgboost\\xgboost_cold_user_cold_item.csv', usecols=['RatingID', 'Rating'], header= 0, names=['RatingID','Rating_pred'])

# Load the test set
test_uwarm_iwarm = pd.read_csv(f'{base_path}\\testset_warm_user_warm_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])
test_uwarm_icold = pd.read_csv(f'{base_path}\\testset_warm_user_cold_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])
test_ucold_iwarm = pd.read_csv(f'{base_path}\\testset_cold_user_warm_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])
test_ucold_icold = pd.read_csv(f'{base_path}\\testset_cold_user_cold_item.csv', usecols=['RatingID', 'UserID', 'WineID', 'Rating'])

# Merge the results with the test set
results_uwarm_iwarm = results_uwarm_iwarm.merge(test_uwarm_iwarm, on='RatingID', how='left')
results_uwarm_icold = results_uwarm_icold.merge(test_uwarm_icold, on='RatingID', how='left')
results_ucold_iwarm = results_ucold_iwarm.merge(test_ucold_iwarm, on='RatingID', how='left')
results_ucold_icold = results_ucold_icold.merge(test_ucold_icold, on='RatingID', how='left')


In [None]:
# MSE
mse_uwarm_iwarm = mean_squared_error(results_uwarm_iwarm['Rating'], results_uwarm_iwarm['Rating_pred'])
mse_uwarm_icold = mean_squared_error(y_test_uwarm_icold, results_uwarm_icold)
mse_ucold_iwarm = mean_squared_error(y_test_ucold_iwarm, results_ucold_iwarm)
mse_ucold_icold = mean_squared_error(y_test_ucold_icold, results_ucold_icold)
# RMSE
rmse_uwarm_iwarm = root_mean_squared_error(y_test_uwarm_iwarm, results_uwarm_iwarm)
rmse_uwarm_icold = root_mean_squared_error(y_test_uwarm_icold, results_uwarm_icold)
rmse_ucold_iwarm = root_mean_squared_error(y_test_ucold_iwarm, results_ucold_iwarm)
rmse_ucold_icold = root_mean_squared_error(y_test_ucold_icold, results_ucold_icold)
# MAE
mae_uwarm_iwarm = mean_absolute_error(y_test_uwarm_iwarm, y_pred_uwarm_iwarm)
mae_uwarm_icold = mean_absolute_error(y_test_uwarm_icold, y_pred_uwarm_icold)
mae_ucold_iwarm = mean_absolute_error(y_test_ucold_iwarm, y_pred_ucold_iwarm)
mae_ucold_icold = mean_absolute_error(y_test_ucold_icold, y_pred_ucold_icold)

In [None]:
# Create Rank and Rank_pred columns

# Warm user, warm item
results_uwarm_iwarm["Rank"] = results_uwarm_iwarm.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_uwarm_iwarm["Rank_pred"] = results_uwarm_iwarm.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)
# Warm user, cold item
results_uwarm_icold["Rank"] = results_uwarm_icold.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_uwarm_icold["Rank_pred"] = results_uwarm_icold.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)
# Cold user, warm item
results_ucold_iwarm["Rank"] = results_ucold_iwarm.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_ucold_iwarm["Rank_pred"] = results_ucold_iwarm.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)
# Cold user, cold item
results_ucold_icold["Rank"] = results_ucold_icold.groupby("UserID")["Rating_y"].rank(method="first", ascending=False)
results_ucold_icold["Rank_pred"] = results_ucold_icold.groupby("UserID")["Rating_x"].rank(method="first", ascending=False)

# Calculate Relevance
results_uwarm_iwarm["Relevance"] = results_uwarm_iwarm["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)
results_uwarm_icold["Relevance"] = results_uwarm_icold["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)
results_ucold_iwarm["Relevance"] = results_ucold_iwarm["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)
results_ucold_icold["Relevance"] = results_ucold_icold["Rating_y"].apply(lambda x: 1 if x >= 3.5 else 0)


In [None]:
# Run evaluation
k = 10
results = {}
results['warm user, warm item'] = evaluate_topk_fast(results_uwarm_iwarm, k=k)
results['warm user, cold item'] = evaluate_topk_fast(results_uwarm_icold, k=k)
results['cold user, warm item'] = evaluate_topk_fast(results_ucold_iwarm, k=k)
results['cold user, cold item'] = evaluate_topk_fast(results_ucold_icold, k=k)

# Print evaluation results
for case, metrics in results.items():
    print(f"Evaluation on {case} at top {k}:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    print('-' * 50)