# Alternating Least Squares (ALS) Collaborative Filtering Recommender

In [81]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

## Spark Session Initialization
Initializing a Spark session with increased memory allocation to handle large datasets.

In [82]:
spark = SparkSession.builder \
    .appName("KuaiRecALS") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

# Data Loading and Sampling
Loading the user-item interaction data and sampling a fraction to fit into memory for demonstration purposes.

In [83]:
interactions_raw = pd.read_csv("data_final_project/KuaiRec 2.0/data/small_matrix.csv")

# Reduce the size of the DataFrame to fit into memory
interactions_raw = interactions_raw.sample(frac=0.1, random_state=42)
interactions_raw.head(5)

Unnamed: 0,user_id,video_id,play_duration,video_duration,time,date,timestamp,watch_ratio
224263,385,6088,3010,14240,2020-08-08 09:49:55.461,20200808.0,1596851000.0,0.211376
2682796,4275,570,1543,25310,2020-07-29 20:38:30.483,20200729.0,1596026000.0,0.060964
32875,55,7281,1276,6667,2020-08-28 07:44:46.355,20200828.0,1598572000.0,0.19139
262283,477,7093,6709,28929,2020-07-14 19:24:31.309,20200714.0,1594726000.0,0.231913
3228863,5000,7125,2976,6016,2020-07-18 11:21:36.833,20200718.0,1595042000.0,0.494681


## Data Preparation

Creating a binary 'is_like' column to represent positive feedback and normalizing ratings for ALS.

In [None]:
interactions_df = interactions_raw.copy()
interactions_df["is_like"] = interactions_df['watch_ratio'].apply(lambda x: 1 if x >= 2 else 0)
interactions_df['ratings'] = interactions_df['watch_ratio'].apply(lambda x: 1 if x >= 2 else 0)
interactions_df.head(5)

Unnamed: 0,user_id,video_id,play_duration,video_duration,time,date,timestamp,watch_ratio,is_like,ratings
224263,385,6088,3010,14240,2020-08-08 09:49:55.461,20200808.0,1596851000.0,0.211376,0,0
2682796,4275,570,1543,25310,2020-07-29 20:38:30.483,20200729.0,1596026000.0,0.060964,0,0
32875,55,7281,1276,6667,2020-08-28 07:44:46.355,20200828.0,1598572000.0,0.19139,0,0
262283,477,7093,6709,28929,2020-07-14 19:24:31.309,20200714.0,1594726000.0,0.231913,0,0
3228863,5000,7125,2976,6016,2020-07-18 11:21:36.833,20200718.0,1595042000.0,0.494681,0,0


### Conversion to Spark DataFrame

In [85]:
interactions_spark = spark.createDataFrame(interactions_df)

### Selecting Relevant Columns

In [86]:
ratings_spark = interactions_spark.select('user_id', 'video_id', 'ratings')
ratings_spark.show(5)

+-------+--------+-------+
|user_id|video_id|ratings|
+-------+--------+-------+
|    385|    6088|      0|
|   4275|     570|      0|
|     55|    7281|      0|
|    477|    7093|      0|
|   5000|    7125|      0|
+-------+--------+-------+
only showing top 5 rows



25/05/17 23:29:23 WARN TaskSetManager: Stage 975 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.


### Indexing User and Item IDs
Encoding user and video IDs as numerical indices, which is required for Spark's ALS implementation.

In [87]:
indexer = [
    StringIndexer(inputCol=column, outputCol=column + "_index")
    for column in list(set(ratings_spark.columns) - set(["ratings"]))
]

pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(ratings_spark).transform(ratings_spark)
transformed.show(5)

25/05/17 23:29:23 WARN TaskSetManager: Stage 976 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.
25/05/17 23:29:24 WARN TaskSetManager: Stage 979 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.
25/05/17 23:29:24 WARN TaskSetManager: Stage 982 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.


+-------+--------+-------+--------------+-------------+
|user_id|video_id|ratings|video_id_index|user_id_index|
+-------+--------+-------+--------------+-------------+
|    385|    6088|      0|        1511.0|        549.0|
|   4275|     570|      0|        2641.0|        354.0|
|     55|    7281|      0|        2204.0|       1298.0|
|    477|    7093|      0|        2087.0|        307.0|
|   5000|    7125|      0|          53.0|        655.0|
+-------+--------+-------+--------------+-------------+
only showing top 5 rows



### Train-Test Split
Splitting the data into training and test sets to evaluate model performance.

In [88]:
(training, test) = transformed.randomSplit([0.8, 0.2], seed=42)

# ALS Model Training
Configuring and training the ALS model to learn latent factors for users and items.

In [89]:
als = ALS(
    maxIter=5,
    regParam=0.25,
    rank=25,
    userCol="user_id_index",
    itemCol="video_id_index",
    ratingCol="ratings",
    coldStartStrategy="drop",
    nonnegative=True,
)

model = als.fit(training)

25/05/17 23:29:24 WARN TaskSetManager: Stage 983 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.
25/05/17 23:29:24 WARN TaskSetManager: Stage 984 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.


## Evaluation
Evaluating the ALS model using RMSE on the test set to assess prediction accuracy.

In [90]:
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="ratings", predictionCol="prediction"
)

predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)

print("RMSE=" + str(rmse))
predictions.show(5)

25/05/17 23:29:25 WARN TaskSetManager: Stage 1017 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

RMSE=0.46516946190313313


25/05/17 23:29:26 WARN TaskSetManager: Stage 1056 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.

+-------+--------+-------+--------------+-------------+------------+
|user_id|video_id|ratings|video_id_index|user_id_index|  prediction|
+-------+--------+-------+--------------+-------------+------------+
|   1015|    8718|      0|        1645.0|        392.0|  0.24896477|
|   2139|    8718|      1|        1645.0|       1064.0|  0.23429392|
|   1884|   10364|      0|        1238.0|       1269.0| 0.068184175|
|   1354|    8718|      1|        1645.0|        860.0|  0.23267432|
|    896|   10416|      0|        2122.0|        222.0|0.0078038597|
+-------+--------+-------+--------------+-------------+------------+
only showing top 5 rows



                                                                                

## Post-processing Recommendations
Converting Spark recommendations to Pandas, mapping indices back to original IDs, and organizing recommendations for evaluation.

In [92]:
recs = model.recommendForAllUsers(10).toPandas()
df_recs = (
    recs.recommendations.apply(pd.Series)
    .merge(recs, right_index=True, left_index=True)
    .drop(["recommendations"], axis=1)
    .melt(id_vars=["user_id_index"], value_name="recommendation")
    .drop("variable", axis=1)
    .dropna()
)

df_recs = df_recs.sort_values("user_id_index")
df_recs = pd.concat(
    [df_recs["recommendation"].apply(pd.Series), df_recs["user_id_index"]], axis=1
)

df_recs.columns = ["product_id_index", "ratings", "reviewer_id"]
tmp = transformed.select(
    transformed["user_id"],
    transformed["user_id_index"],
    transformed["video_id"],
    transformed["video_id_index"],
)
tmp = tmp.toPandas()

dict1 = dict(zip(tmp["user_id_index"], tmp["user_id"]))
dict2 = dict(zip(tmp["video_id_index"], tmp["video_id"]))

df_recs_copy = df_recs.copy()
df_recs_copy.loc[:, "user_id"] = df_recs["reviewer_id"].map(dict1)
df_recs_copy.loc[:, "video_id"] = df_recs["product_id_index"].map(dict2)
df_recs_copy = df_recs_copy.sort_values("user_id")
df_recs_copy.reset_index(drop=True, inplace=True)

new = df_recs_copy[["user_id", "video_id", "ratings"]]
new["recommendations"] = list(zip(new.video_id, new.ratings))

res = new[["user_id", "recommendations"]]
res_new = res["recommendations"].groupby([res.user_id]).apply(list).reset_index()

print(res_new)

25/05/17 23:29:30 WARN TaskSetManager: Stage 1152 contains a task of very large size (2287 KiB). The maximum recommended task size is 1000 KiB.


      user_id                                    recommendations
0          14  [(1305, 0.3922075629234314), (4123, 0.38031759...
1          19  [(6787, 0.35512658953666687), (6222, 0.3460551...
2          21  [(600, 0.3670656085014343), (154, 0.3574683964...
3          23  [(154, 0.291812002658844), (2130, 0.2933684587...
4          24  [(314, 0.24343322217464447), (6787, 0.25202533...
...       ...                                                ...
1406     7142  [(9907, 0.31830838322639465), (2130, 0.3290573...
1407     7147  [(2130, 0.4634591341018677), (600, 0.473377108...
1408     7153  [(4040, 0.23970308899879456), (6222, 0.2331626...
1409     7159  [(154, 0.3142992854118347), (6222, 0.308887779...
1410     7162  [(1305, 0.39347967505455017), (2130, 0.3893636...

[1411 rows x 2 columns]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  new["recommendations"] = list(zip(new.video_id, new.ratings))


## Evaluation

In [93]:
def evaluate_topk_metrics(y_true, top_k_preds, k=10):
    top_k = top_k_preds[:k]
    relevant = set(y_true)
    hits = [1 if item in relevant else 0 for item in top_k]

    precision = sum(hits) / k
    recall = sum(hits) / len(relevant) if relevant else 0.0
    dcg = sum(hit / np.log2(i + 2) for i, hit in enumerate(hits))
    ideal_hits = [1] * min(len(relevant), k)
    idcg = sum(1 / np.log2(i + 2) for i in range(len(ideal_hits)))
    ndcg = dcg / idcg if idcg != 0 else 0.0

    # MAP@k: mean average precision
    ap_sum = 0.0
    hit_count = 0
    for i, hit in enumerate(hits):
        if hit:
            hit_count += 1
            ap_sum += hit_count / (i + 1)
    map_k = ap_sum / min(len(relevant), k) if relevant else 0.0

    return precision, recall, ndcg, map_k


In [94]:
user_recs_dict = dict(zip(res_new["user_id"], res_new["recommendations"]))

k_values = [1, 5, 10, 20]
results = []

for k in k_values:
    all_precisions, all_recalls, all_ndcgs, all_maps = [], [], [], []
    for user_id, recs in user_recs_dict.items():
        video_ids = [video_id for video_id, _ in recs][:k]
        y_true = interactions_df[(interactions_df["user_id"] == user_id) & (interactions_df["is_like"] == 1)]["video_id"].tolist()
        if not y_true or not video_ids:
            continue
        precision, recall, ndcg, map_k = evaluate_topk_metrics(y_true, video_ids, k)
        all_precisions.append(precision)
        all_recalls.append(recall)
        all_ndcgs.append(ndcg)
        all_maps.append(map_k)
    results.append({
        "k": k,
        "precision": np.mean(all_precisions),
        "recall": np.mean(all_recalls),
        "ndcg": np.mean(all_ndcgs),
        "map": np.mean(all_maps)
    })

for res in results:
    print(f"Results for k={res['k']}:")
    print(f"  Mean Precision@{res['k']}: {res['precision']:.4f}")
    print(f"  Mean Recall@{res['k']}: {res['recall']:.4f}")
    print(f"  Mean NDCG@{res['k']}: {res['ndcg']:.4f}")
    print(f"  Mean MAP@{res['k']}: {res['map']:.4f}\n")

Results for k=1:
  Mean Precision@1: 0.0886
  Mean Recall@1: 0.0009
  Mean NDCG@1: 0.0886
  Mean MAP@1: 0.0886

Results for k=5:
  Mean Precision@5: 0.0920
  Mean Recall@5: 0.0049
  Mean NDCG@5: 0.0919
  Mean MAP@5: 0.0467

Results for k=10:
  Mean Precision@10: 0.0938
  Mean Recall@10: 0.0098
  Mean NDCG@10: 0.0931
  Mean MAP@10: 0.0335

Results for k=20:
  Mean Precision@20: 0.0469
  Mean Recall@20: 0.0098
  Mean NDCG@20: 0.0601
  Mean MAP@20: 0.0167

