In [None]:
!pip install implicit tqdm --quiet

import os
import random
import threadpoolctl

os.environ['OPENBLAS_NUM_THREADS'] = '1'
threadpoolctl.threadpool_limits(1, "blas")

import time
import heapq
import itertools
from collections import Counter
from typing import List, Optional, Any
from pprint import pprint
import numpy as np
np.random.seed(42)
import pandas as pd
import matplotlib.pyplot as plt
import scipy.sparse as sp
from scipy.sparse import csr_matrix, coo_matrix
from tabulate import tabulate
from tqdm import tqdm
from gcsfs import GCSFileSystem
from pandas_gbq import to_gbq
from google.auth import exceptions
from google.cloud import bigquery
from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import bm25_weight
from implicit.evaluation import train_test_split, precision_at_k, ranking_metrics_at_k

# Evaluation metrics & methods

### Precision:

In [None]:
def precision(recommended_items, relevant_items):
    if len(recommended_items) == 0:
        return 0
    relevant_in_rec = sum(1 for item in recommended_items if item in relevant_items)
    precision = relevant_in_rec / len(recommended_items)
    return precision

### Mean Average Precision at K (MAP@K):

In [None]:
def calculate_average_precision(recommended_items, relevant_items):
    ap = 0.0
    num_relevant = 0

    for i, item in enumerate(recommended_items):
        if item in relevant_items:
            num_relevant += 1
            ap += num_relevant / (i + 1)

    if num_relevant > 0:
        ap /= num_relevant
    return ap

### Gini coefficient

In [None]:
def gini(array: dict):
    array = np.array(list(array.values()))
    if array.size == 0:
        return 0.0
    if np.amin(array) < 0:
        array -= np.amin(array)

    array = np.sort(array)
    index = np.arange(1, array.shape[0] + 1)
    n = array.shape[0]
    return ((np.sum((2 * index - n - 1) * array)) / (n * np.sum(array)))

# Offline Evaluation

### BigQuery globals

In [None]:
RUN_ID = "als-3month-datapoints-context"  # @param {type:"string"}
MODEL = "None"  # @param {type:"string"}

BUCKET_NAME = ""  # @param {type:"string"}
DATA_PATH = ""  # @param {type:"string"}
DATA_URI = ""  # @param {type:"string"}

# Local specs
TO_LOCAL = False  # @param {type:"boolean"}
FILENAME = None  # @param {type:"string"}

# Bigquery specs
TO_BIGQUERY = True  # @param {type:"boolean"}
BQ_DATASET_ID = ""  # @param {type:"string"}
BQ_TABLE_ID = ""  # @param {type:"string"}
PROJECT_ID = ""  # @param {type:"string"}

# Data preprocessing

### Loading data

In [None]:
def load_csv_from_gcs(data_uri, n_items=1):
    # List of all Parquet files in the directory

    gcs = GCSFileSystem()

    # Get all files in the bucket path
    files = gcs.glob(f"{data_uri}/**")  # `**` ensures recursive search
    # Filter for Parquet files
    files = [f"gs://{file}" for file in files if file.endswith(".csv")]
    # Combine all Parquet files into a single DataFrame
    df = pd.concat([pd.read_csv(file) for file in files[:n_items]], ignore_index=True)
    print("Rows in DF before preprocessing:", df.shape[0])
    df = df[df["contentType"] == "SERIES"]
    df.rename(columns={'durationSec': 'playingTime'}, inplace=True)
    df.loc[:, 'firstStart'] = pd.to_datetime(df['firstStart'], errors='coerce')
    df = df[df['playingTime'] >= 250]
    df['playingTime'] = df['playingTime'].clip(upper=12000)
    user_item_counts = df.groupby('profileId')['itemId'].count()
    valid_users = user_item_counts[user_item_counts >= 5].index
    df = df[df['profileId'].isin(valid_users)]
    print("Rows in DF after preprocessing:", df.shape[0])
    return df

### Splitting / preprocessing

In [None]:
def prepare_data(dataframe, apply_bm25=True, K1=3.0, B=1.5, split_date='2024-10-22'):
    print("Rows in DF:", dataframe.shape[0])

    # Split into train and test using date
    print("Splitting")
    split_date = pd.to_datetime(split_date)
    train_dataframe = dataframe[dataframe['firstStart'] < split_date].copy()
    test_dataframe = dataframe[dataframe['firstStart'] >= split_date].copy()

    # Force the types as string -> integers
    print("Converting to category")
    train_dataframe['profileId'] = train_dataframe['profileId'].astype(str).astype('category')
    train_dataframe['itemId'] = train_dataframe['itemId'].astype(str).astype('category')

    test_dataframe['profileId'] = test_dataframe['profileId'].astype(str).astype('category')
    test_dataframe['itemId'] = test_dataframe['itemId'].astype(str).astype('category')

    print("Creating sparse matrix")
    items_mapping = dict(enumerate(train_dataframe['itemId'].cat.categories))
    users_mapping = dict(enumerate(train_dataframe['profileId'].cat.categories))

    train_matrix = coo_matrix(
        (train_dataframe['playingTime'].astype(np.float32),
         (train_dataframe['profileId'].cat.codes,
          train_dataframe['itemId'].cat.codes))
    ).tocsr()

    # BM25 transformation
    print("Applying BM25")
    if apply_bm25:
        train_matrix = bm25_weight(train_matrix, K1=K1, B=B).tocsr()

    # Reverse mappings
    reverse_user_mapping = {idx: profile_id for profile_id, idx in users_mapping.items()}
    reverse_item_mapping = {idx: item_id for item_id, idx in items_mapping.items()}

    # If a user is not present in the user_mapping, e.g. the user does not
    # appear in the training set, we will get a NaN
    test_dataframe.loc[:, 'user_index'] = test_dataframe['profileId'].map(reverse_user_mapping)
    test_dataframe.loc[:, 'item_index'] = test_dataframe['itemId'].map(reverse_item_mapping)

    nan_user_indices = test_dataframe['user_index'].isna().sum()
    nan_user_indices_item = test_dataframe['item_index'].isna().sum()
    print(f"sum of missing users in test_dataframe: {nan_user_indices}")
    print(f"sum of missing items in test_dataframe: {nan_user_indices_item}")

    # Drop the cases where the item or user has not been seen in the test cases
    len_before_removing_nan = len(test_dataframe)
    test_dataframe = test_dataframe.dropna(subset=['user_index', 'item_index'])
    print(f"Removing rows where user_index or item_index is NaN from test_dataframe: from {len_before_removing_nan} to {len(test_dataframe)}, removed {len_before_removing_nan - len(test_dataframe)}")

    # Convert indices to integers
    test_dataframe['user_index'] = test_dataframe['user_index'].astype(int)
    test_dataframe['item_index'] = test_dataframe['item_index'].astype(int)

    print("Create duration scores")
    duration_scores = train_dataframe.groupby('itemId', observed=False)['playingTime'].sum().to_dict()

    len_before_removing_user_index = len(test_dataframe)
    test_dataframe = test_dataframe[test_dataframe['user_index'].isin(train_matrix.indptr)]
    print(f"Removing rows where user_index not in train_matrix: from {len_before_removing_user_index} to {len(test_dataframe)}, removed {len_before_removing_user_index - len(test_dataframe)}")

    # Group by 'user_index' and take the first 5 rows for each group, we're only intrested in this.
    len_before_5_cap = len(test_dataframe)
    test_dataframe = test_dataframe.groupby('user_index').head(5)
    print(f"Grouping and limiting test to 5 users, from {len_before_5_cap} to {len(test_dataframe)}, removed {len_before_5_cap - len(test_dataframe)}")

    print(f"Test rows after removing missing users: {len(test_dataframe)}")
    test_cases = np.array(
        list(zip(test_dataframe['user_index'].astype(int), test_dataframe['itemId'].astype(str), test_dataframe['firstStart']))
    )

    print("\nSIZE:")
    print(f"Train: {len(train_dataframe) / len(dataframe) * 100:.2f}%")
    print(f"Test: {len(test_dataframe) / len(dataframe) * 100:.2f}%")
    print(f"Test Cases: {len(test_cases)}")
    print("\nSHAPE:")
    print(f"Sparse matrix (train): {train_matrix.shape}")

    print("\nUSERS & ITEMS")
    print(f"Min playingTime: {dataframe['playingTime'].min()}, Max playingTime: {dataframe['playingTime'].max()}")
    print(f"Users in df: {dataframe['profileId'].nunique()}, Items in df: {dataframe['itemId'].nunique()}")
    print(f"Users in mapping: {len(users_mapping)}, Items in mapping: {len(items_mapping)}")

    return train_matrix, users_mapping, items_mapping, reverse_user_mapping, test_cases, duration_scores

In [None]:
df = load_csv_from_gcs(DATA_URI, n_items=3)

In [None]:
train_matrix, users_mapping, items_mapping, reverse_user_mapping, test_cases, duration_scores = prepare_data(
    df, apply_bm25=False, K1=3.0, B=1.5, split_date='2024-10-14')

### Applying time of day context to test cases

In [None]:
# Convert to time of day context
test_cases = [
    (user_id, str(itemid), ["night", "morning",
     "afternoon", "evening"][(context.hour // 6) % 4])
    for user_id, itemid, context in test_cases
]

In [None]:
test_cases[0]

### Top K most popular items for popularity baseline

In [None]:
# Ranking items by popularity
item_counts = Counter(df['itemId'])
items_ranked_by_popularity = [str(item) for item, _ in item_counts.most_common()]

In [None]:
del df

### Get contextual weights

In [None]:
client = bigquery.Client()

query = """
WITH ranked_items AS (
    SELECT *,
           ROW_NUMBER() OVER(PARTITION BY itemId ORDER BY date DESC) as rn
    FROM `mytable`
)
SELECT *
FROM ranked_items
WHERE rn = 1;
"""

tod_context_scores = client.query(query).to_dataframe()

In [None]:
# Hashmap of all context scores for quick lookup
context_mapping = {
    "timeofday": tod_context_scores.set_index('itemId').to_dict(orient='index'),
}

del tod_context_scores

## Post-filtering methods

In [None]:
def rerank_with_context(context_rec_item_ids: List[int], context_rec_item_scores: List[int], context_type, context, n, w=None):
    missing_context_item = 0
    contextualized_scores = []
    context_mapping_type = context_mapping.get(context_type, {})
    for item_idx, score in zip(context_rec_item_ids, context_rec_item_scores):
        item_id = items_mapping.get(item_idx)
        context_score = context_mapping.get(
            context_type, {}).get(item_id, {}).get(context, 1)
        if context_score == 1:
            missing_context_item += 1
            continue
        post_filtering_score = context_score * score
        # string, int
        contextualized_scores.append((item_id, post_filtering_score))

    top_n_scores = heapq.nlargest(n, contextualized_scores, key=lambda x: x[1])
    return top_n_scores, missing_context_item

## Evaluation methods

In [None]:
def combine_metrics(coverage_map, hitrate_map, popularity_list, map_score, precision_list, gini_index_map):
    avg_coverage = len(coverage_map) / train_matrix.shape[0]
    avg_hitrate = np.mean(hitrate_map)
    avg_popularity = sum(popularity_list) / len(popularity_list) if popularity_list else 0
    avg_map = map_score / len(test_cases)
    avg_precision = np.mean(precision_list)
    gini_value = gini(gini_index_map)
    return avg_coverage, avg_hitrate, avg_popularity, avg_map, avg_precision, gini_value


def compile_results(combination_id, factors, reg, iterations, k, coverage, hitrate, popularity, gini, map_score, precision, missing_ids, missing_context_item, model_name, avg_filtered_for_user, train_time, custom_scoring_time, full_model_run_time, start_time, test_cases):
    return {
        'id': combination_id,
        'factors': int(factors),
        'regularization': float(reg),
        'iterations': int(iterations),
        'k': int(k),
        'coverage': float(coverage),
        'hitrate': float(hitrate),
        'popularity': float(popularity),
        'gini': float(gini),
        'map': float(map_score),
        'precision': float(precision),
        'missing_id_in_test': int(missing_ids),
        'missing_context_item': int(missing_context_item),
        'model': model_name,
        'run_id': f'{RUN_ID}',
        'filtered_from_observed': int(avg_filtered_for_user),
        'train_time': float(train_time),
        'custom_scoring_time': float(custom_scoring_time),
        'full_model_eval_time': float(full_model_run_time),
        'started': float(start_time),
        'ended': float(time.time()),
        'train_matrix_shape': str(train_matrix.shape),
        'test_cases': str(len(test_cases))
    }


def evaluation(model, train_matrix, k, n_context, combination_id, factors, reg, iterations, train_time, start_time):
    als_coverage_map, context_coverage_map = set(), set()
    random_coverage_map, popularity_coverage_map = set(), set()

    random_gini_index_map = {item_id: 0 for item_id in items_ranked_by_popularity}
    popularity_gini_index_map = {item_id: 0 for item_id in items_ranked_by_popularity}

    als_gini_index_map = {item_id: 0 for item_id in items_ranked_by_popularity}
    context_gini_index_map = {item_id: 0 for item_id in items_ranked_by_popularity}

    als_popularity, context_popularity = [], []
    random_popularity, popularity_popularity = [], []

    als_hitrate_map, context_hitrate_map = [], []
    random_hitrate_map, popularity_hitrate_map = [], []

    als_map_score, context_map_score = 0.0, 0.0
    random_map_score, popularity_map_score = 0.0, 0.0

    als_precision, context_precision = [], []
    random_precision, popularity_precision = [], []

    filtered_for_user = [0, 0, 0]
    missing_context_item = 0
    missing_ids = 0
    start_custom_scoring_time = time.time()

    for user_idx, itemId, context in test_cases:
        rec_item_ids, rec_item_scores = model.recommend(userid=user_idx, user_items=train_matrix[user_idx], N=n_context)

        # ALS
        als_rec_item_ids = rec_item_ids[:k]
        als_coverage_map.update(als_rec_item_ids)

        rec_item_ids_dfid = [items_mapping.get(item_idx) for item_idx in als_rec_item_ids]
        if len(rec_item_ids_dfid) == 0:
            raise ValueError("Error, rec_item_ids_dfid is empty, something is wrong with the mappings")

        als_map_score += calculate_average_precision(rec_item_ids_dfid, [itemId])
        als_precision.append(precision(rec_item_ids_dfid, [itemId]))
        als_hitrate_map.append(int(any(item in rec_item_ids_dfid for item in [itemId])))

        for item_idx in rec_item_ids_dfid:
            als_gini_index_map[item_idx] = als_gini_index_map.get(item_idx, 0) + 1
            duration_score = duration_scores.get(item_idx, None)
            if duration_score:
                als_popularity.append(duration_score)

        # Context
        context_rec_tof_sorted, missing_context_item_count = rerank_with_context(rec_item_ids, rec_item_scores, 'timeofday', context, n=k)
        missing_context_item += missing_context_item_count

        context_item_ids = [item[0] for item in context_rec_tof_sorted]
        context_coverage_map.update(context_item_ids)

        context_map_score += calculate_average_precision(context_item_ids, [itemId])
        context_precision.append(precision(context_item_ids, [itemId]))
        context_hitrate_map.append(int(any(item in [itemId] for item in context_item_ids)))

        for item_idx in context_item_ids:
            context_gini_index_map[item_idx] = context_gini_index_map.get(item_idx, 0) + 1
            duration_score = duration_scores.get(item_idx)
            if duration_score:
                context_popularity.append(duration_score)

        # Random
        random_item_ids = np.random.choice(items_ranked_by_popularity, k, replace=False)
        random_coverage_map.update(random_item_ids)

        random_map_score += calculate_average_precision(random_item_ids, [itemId])
        random_precision.append(precision(random_item_ids, [itemId]))
        random_hitrate_map.append(int(any(item in random_item_ids for item in [itemId])))

        for item_idx in random_item_ids:
            random_gini_index_map[item_idx] = random_gini_index_map.get(item_idx, 0) + 1
            duration_score = duration_scores.get(item_idx, None)
            if duration_score:
                random_popularity.append(duration_score)

        # Popularity
        popularity_item_ids = items_ranked_by_popularity[:k]
        popularity_coverage_map.update(popularity_item_ids)

        popularity_map_score += calculate_average_precision(popularity_item_ids, [itemId])
        popularity_precision.append(precision(popularity_item_ids, [itemId]))
        popularity_hitrate_map.append(int(any(item in popularity_item_ids for item in [itemId])))

        for item_idx in popularity_item_ids:
            popularity_gini_index_map[item_idx] = popularity_gini_index_map.get(item_idx, 0) + 1
            duration_score = duration_scores.get(item_idx, None)
            if duration_score:
                popularity_popularity.append(duration_score)

    # Aggregate metrics
    als_avg_coverage, als_avg_hitrate, als_avg_popularity, als_avg_map, als_avg_precision, als_gini_value = combine_metrics(
        als_coverage_map, als_hitrate_map, als_popularity, als_map_score, als_precision, als_gini_index_map)

    context_avg_coverage, context_avg_hitrate, context_avg_popularity, context_avg_map, context_avg_precision, context_gini_value = combine_metrics(
        context_coverage_map, context_hitrate_map, context_popularity, context_map_score, context_precision, context_gini_index_map)

    random_avg_coverage, random_avg_hitrate, random_avg_popularity, random_avg_map, random_avg_precision, random_gini_value = combine_metrics(
        random_coverage_map, random_hitrate_map, random_popularity, random_map_score, random_precision, random_gini_index_map)

    popularity_avg_coverage, popularity_avg_hitrate, popularity_avg_popularity, popularity_avg_map, popularity_avg_precision, popularity_gini_value = combine_metrics(
        popularity_coverage_map, popularity_hitrate_map, popularity_popularity, popularity_map_score, popularity_precision, popularity_gini_index_map)

    end_custom_scoring_time = time.time() - start_custom_scoring_time
    full_model_run_time = time.time() - start_time

    avg_filtered_for_user = np.mean(np.array(filtered_for_user))

    # Create results
    als_results = compile_results(
        combination_id, factors, reg, iterations, k, als_avg_coverage, als_avg_hitrate, als_avg_popularity, als_gini_value,
        als_avg_map, als_avg_precision, missing_ids, 0, f'{MODEL}', avg_filtered_for_user, train_time, end_custom_scoring_time,
        full_model_run_time, start_time, test_cases
    )

    context_results = compile_results(
        combination_id, factors, reg, iterations, k, context_avg_coverage, context_avg_hitrate, context_avg_popularity,
        context_gini_value, context_avg_map, context_avg_precision, missing_ids, missing_context_item, f'{MODEL}_contextualized',
        avg_filtered_for_user, train_time, end_custom_scoring_time, full_model_run_time, start_time, test_cases
    )

    random_results = compile_results(
        combination_id, factors, reg, iterations, k, random_avg_coverage, random_avg_hitrate, random_avg_popularity,
        random_gini_value, random_avg_map, random_avg_precision, missing_ids, 0, f'{MODEL}_random', avg_filtered_for_user,
        train_time, end_custom_scoring_time, full_model_run_time, start_time, test_cases
    )

    popularity_results = compile_results(
        combination_id, factors, reg, iterations, k, popularity_avg_coverage, popularity_avg_hitrate, popularity_avg_popularity,
        popularity_gini_value, popularity_avg_map, popularity_avg_precision, missing_ids, 0, f'{MODEL}_popularity', avg_filtered_for_user,
        train_time, end_custom_scoring_time, full_model_run_time, start_time, test_cases
    )

    return als_results, context_results, random_results, popularity_results

## Grid search evaluation

In [None]:
MODEL = "ALS"

param_grid = {
    'factors': [20, 50, 75, 100],
    'regularization': [0.01, 0.1, 0.5],
    'iterations': [10, 30, 50],
    'k': [1, 3, 10, 20],
}

total_rounds = len(param_grid['factors']) * len(param_grid['regularization']
                                                ) * len(param_grid['iterations']) * len(param_grid['k'])
print(f"Will run {total_rounds} times.")

results = []
combinations = list(itertools.product(param_grid['factors'],
                                      param_grid['regularization'],
                                      param_grid['iterations'],
                                      param_grid['k']))


i = 0
with tqdm(total=total_rounds) as pbar:
    for combination in combinations:
        results = []
        combination_id = i
        factors, reg, iterations, k = combination
        try:
            start_time = time.time()
            model = AlternatingLeastSquares(factors=factors,
                                            regularization=reg,
                                            iterations=iterations,
                                            calculate_training_loss=True,
                                            use_native=True,
                                            use_cg=True,
                                            num_threads=0,
                                            random_state=42)

            model.fit(train_matrix, show_progress=False)
            train_time = time.time() - start_time

            n_context = 100
            als_results, context_results, random_results, popularity_results = evaluation(model, train_matrix, k, n_context, combination_id, factors, reg, iterations, train_time, start_time)

            results.append(als_results)
            results.append(context_results)
            results.append(random_results)
            results.append(popularity_results)

            if TO_BIGQUERY:
                result_df = pd.DataFrame(results)
                to_gbq(result_df, f'{BQ_DATASET_ID}.{BQ_TABLE_ID}', project_id=PROJECT_ID, if_exists='append')

        except (exceptions.GoogleAuthError, exceptions.TransportError) as e:
            print(f"Error uploading to BigQuery: {e}, combination_id: {combination_id}")
        except Exception as e:
            print(f"Error during processing: {e}, combination_id: {combination_id}")

        pbar.update(1)
        i += 1

if TO_LOCAL:
    als_results = pd.DataFrame(results)
    als_results.to_csv('offline_eval_results.csv', index=False)

# Results

In [None]:
client = bigquery.Client()

query = f"""
SELECT * FROM mytable
WHERE run_id = "{RUN_ID}"
"""

df = client.query(query).to_dataframe()

df