## **Movielens-1M using FMClassifier and FMRegressor** 
Compare the prediction using FMClassifier and FMRegressor from Spark

#### **Data Loading and Processing**

In [1]:
import pandas as pd 
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType 

from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import FMClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from pyspark.ml import Pipeline

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 192g --executor-memory 16g pyspark-shell'

In [2]:
from pyspark import SparkContext

builder = SparkSession.builder
builder = builder.config("spark.driver.maxResultSize", "5G")

spark = builder.appName("FMClassifier_MovieLens").getOrCreate()

spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

24/01/31 16:00:37 WARN Utils: Your hostname, server resolves to a loopback address: 127.0.1.1; using 192.168.18.50 instead (on interface enp36s0f0)
24/01/31 16:00:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/31 16:00:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# schema = StructType(
#     (
#         StructField("user_id", IntegerType()),
#         StructField("item_id", IntegerType()),
#         StructField("rating", FloatType()),
#         StructField("timestamp", LongType())
#     )
# )

# rating = spark.read.csv("ml-1m/ratings.dat", sep="::", schema=schema)
# rating.show(5)

# Faster processing using pandas vs spark
rating = pd.read_csv("ml-1m/ratings.dat", names=["user_id", "item_id", "rating", "timestamp"], delimiter="::", engine="python")
rating.columns = ['user_id','item_id','rating','timestamp']
rating

Unnamed: 0,user_id,item_id,rating,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109
2,1,914,3,978301968
3,1,3408,4,978300275
4,1,2355,5,978824291
...,...,...,...,...
1000204,6040,1091,1,956716541
1000205,6040,1094,5,956704887
1000206,6040,562,5,956704746
1000207,6040,1096,4,956715648


In [4]:
rating.drop(['timestamp'], axis=1, inplace=True)

# set all positive interactions to 1
df_classification = rating.copy()
df_classification['interaction'] = 1
df_classification.drop(['rating'], axis=1, inplace=True)

#### **Classification Approach** 
Since all the user-item considered positive interaction in implicit feedback manner given all interaction is `1`. The assumption here is to generate negative samples to fit with classification method for FMClassifier.  

In [5]:
import random

all_users = df_classification.user_id.unique()
all_items = df_classification.item_id.unique()

negative_instances = []

for user in all_users:
    user_interacted_item = df_classification[df_classification.user_id == user]['item_id'].unique()
    non_interacted_items = set(all_items) - set(user_interacted_item)
    for item in non_interacted_items:
        negative_instances.append([user, item, 0])

num_negatives = len(rating[df_classification['interaction'] == 1])
sampled_negatives = random.sample(negative_instances, num_negatives)

df_negatives = pd.DataFrame(sampled_negatives, columns=['user_id', 'item_id', 'interaction'])
df_negatives

columns = ['user_id', 'item_id', 'interaction']
balanced_df = pd.concat([df_classification[columns], df_negatives[columns]]).reset_index(drop=True)

In [6]:
from pyspark.ml.feature import VectorAssembler

balanced_df_spark = spark.createDataFrame(balanced_df)

assembler = VectorAssembler(inputCols=["user_id", "item_id"], outputCol="features")
balanced_df_spark = assembler.transform(balanced_df_spark)
balanced_df_spark.show()

+-------+-------+-----------+------------+
|user_id|item_id|interaction|    features|
+-------+-------+-----------+------------+
|      1|   1193|          1|[1.0,1193.0]|
|      1|    661|          1| [1.0,661.0]|
|      1|    914|          1| [1.0,914.0]|
|      1|   3408|          1|[1.0,3408.0]|
|      1|   2355|          1|[1.0,2355.0]|
|      1|   1197|          1|[1.0,1197.0]|
|      1|   1287|          1|[1.0,1287.0]|
|      1|   2804|          1|[1.0,2804.0]|
|      1|    594|          1| [1.0,594.0]|
|      1|    919|          1| [1.0,919.0]|
|      1|    595|          1| [1.0,595.0]|
|      1|    938|          1| [1.0,938.0]|
|      1|   2398|          1|[1.0,2398.0]|
|      1|   2918|          1|[1.0,2918.0]|
|      1|   1035|          1|[1.0,1035.0]|
|      1|   2791|          1|[1.0,2791.0]|
|      1|   2687|          1|[1.0,2687.0]|
|      1|   2018|          1|[1.0,2018.0]|
|      1|   3105|          1|[1.0,3105.0]|
|      1|   2797|          1|[1.0,2797.0]|
+-------+--

                                                                                

In [7]:
# USE THIS FOR RANDOMIZED NEGATIVE AND POSITIVE SAMPLING APPROACHES

# import random 
# import pyspark.sql.functions as F
# from pyspark.sql import DataFrame 

# all_users = df_rating.select("user_id").distinct().rdd.flatMap(lambda x: x).collect()
# all_items = df_rating.select("item_id").distinct().rdd.flatMap(lambda x: x).collect()

# negative_instances = []

# for user in all_users:
#     user_item_interaction = df_rating.filter(df_rating.user_id == user).select("item_id").rdd.flatMap(lambda x: x).collect()
#     negative_interactions = list(set(all_items) - set(user_item_interaction))

#     for item in negative_interactions:
#         negative_instances.append((user, item, 0))

# num_negatives = df_rating.filter(df_rating.interaction == 1).count()
# sampled_negatives = random.sample(negative_instances, num_negatives)

# # create a dataframe with all negative instances
# df_negatives = spark.createDataFrame(sampled_negatives, schema=schema)

# balanced_df = df_rating.select("user_id", "item_id", "interaction").union(df_negatives)

# # merging with all positive data 
# combined = all_user_item_pairs.join(rating, on=["user_id", "item_id"], how="left")
# data_with_neg = combined.withColumn("interaction", F.coalesce(combined.interaction, F.lit(0)))

# negative_samples = data_with_neg.filter(data_with_neg.interaction == 0)
# positive_samples = data_with_neg.filter(data_with_neg.interaction != 0)

# fraction = positive_samples.count() / float(negative_samples.count())
# balanced_negatives = negative_samples.sample(False, fraction, seed=42)

# balanced_data = positive_samples.union(balanced_negatives)

# indexer_user = StringIndexer(inputCol="user_id", outputCol="user_id_idx")
# indexer_item = StringIndexer(inputCol="item_id", outputCol="item_id_idx")

In [8]:
(train_data, test_data) = balanced_df_spark.randomSplit([0.75, 0.25], seed=42)

# Ensure 'user_id' is present
print("Training Data Columns: ", train_data.columns)
print("Test Data Columns: ", test_data.columns)

labelIndexer = StringIndexer(inputCol="interaction", outputCol="indexedLabel").fit(balanced_df_spark)

fm = FMClassifier(featuresCol="features", labelCol="indexedLabel", stepSize=0.001)
pipeline = Pipeline(stages=[labelIndexer, fm])
model = pipeline.fit(train_data)

Training Data Columns:  ['user_id', 'item_id', 'interaction', 'features']
Test Data Columns:  ['user_id', 'item_id', 'interaction', 'features']


24/01/31 16:01:30 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [9]:
predictions = model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol="interaction", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"Area under ROC: {roc_auc}")

evaluator = BinaryClassificationEvaluator(labelCol="interaction")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

evaluator = MulticlassClassificationEvaluator(labelCol="interaction", predictionCol="prediction")

precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

print("Weighted Precision: {:.3f}".format(precision))
print("Weighted Recall: {:.3f}".format(recall))

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

                                                                                

Area under ROC: 0.4993986982884071


                                                                                

Accuracy: 0.5167887622606877


                                                                                

Weighted Precision: 0.498
Weighted Recall: 0.500
+----------+------------+-----------+
|prediction|indexedLabel|   features|
+----------+------------+-----------+
|       1.0|         1.0|[1.0,150.0]|
|       1.0|         1.0|[1.0,588.0]|
|       1.0|         1.0|[1.0,595.0]|
|       1.0|         1.0|[1.0,608.0]|
|       1.0|         1.0|[1.0,783.0]|
+----------+------------+-----------+
only showing top 5 rows



In [10]:
from sklearn.metrics import confusion_matrix

y_pred = predictions.select("prediction").collect()
y_orig = predictions.select("interaction").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix: \n", cm)

Confusion Matrix: 
 [[232215  17979]
 [232225  17656]]


In [11]:
k = 10

def pivot_table_recommendation(recommendation):
    # Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
    pv_rec = recommendation.pivot(index='user_id', columns='rank', values='item_id').reset_index()

    # Set user_id as the index
    pv_rec.set_index('user_id', inplace=True)

    # Remove the 'rank' column
    pv_rec.columns.name = None
    pv_rec.columns = [f'{int(rank)}' for rank in pv_rec.columns]
    pv_rec.index.name = None

    return pv_rec

In [12]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# # Rank the predictions
windowSpec = Window.partitionBy("user_id").orderBy(predictions["prediction"].desc())
df_test = predictions.withColumn("rank", row_number().over(windowSpec))

# Filter to get top 10 predictions for each user
top_10_recommendations = df_test.filter(df_test['rank'] <= 10)

# Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
pivot_recommendations = top_10_recommendations.groupBy("user_id").pivot("rank").agg({"item_id": "first"})
pivot_recommendations = pivot_recommendations.na.fill(0)
pivot_recommendations.show()




+-------+----+----+----+----+----+----+----+----+----+----+
|user_id|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+-------+----+----+----+----+----+----+----+----+----+----+
|      6|  34| 296| 364| 569| 920|1028|1043|1188|1566|1947|
|      7|1196|3107|3753|3089|3378|1381|2384|3473| 632|1202|
|      9|  25|  47| 377| 412| 428| 508| 590| 597| 805|1089|
|     19|  34|  76| 223| 260| 368| 377| 648| 785|1022|1090|
|     22|  47|  81|  95| 104| 163| 180| 256| 312| 333| 368|
|     25| 110| 157| 223| 260| 546| 737|1129|1356|1371|1372|
|     26|   1|  39|  45| 104| 125| 160| 168| 195| 198| 234|
|     27| 318| 541| 858| 905| 910| 926| 930| 955|1198|1225|
|     29|  50| 288| 318| 589| 858| 912| 969|1225|1356|1374|
|     31| 933| 946| 951|1077|1230|1234|1235|1259|1265|1304|
|     32|  50| 247| 457| 589| 608|1343|1683|2571|2916|2959|
|     34| 339| 353| 441| 455| 497| 837|1079|1100|1148|1196|
|     39| 223| 785|1060|1127|1193|2605|2706|2710|3005|3578|
|     43|2369|2581|2605|2961|3298|3302|1

                                                                                

In [13]:
pivot = pivot_recommendations.toPandas().set_index('user_id')
pivot = pivot.rename_axis(None, axis=0)
pivot

                                                                                

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
6,34,296,364,569,920,1028,1043,1188,1566,1947
7,1196,3107,3753,3089,3378,1381,2384,3473,632,1202
9,25,47,377,412,428,508,590,597,805,1089
19,34,76,223,260,368,377,648,785,1022,1090
22,47,81,95,104,163,180,256,312,333,368
...,...,...,...,...,...,...,...,...,...,...
6039,50,260,588,783,909,911,922,933,942,955
5703,20,104,155,185,553,922,1179,1200,1209,1227
5744,10,31,48,112,165,181,199,212,196,232
5752,1,11,60,111,169,199,223,228,54,139


In [14]:
k = 10

test_users_items = df_test.toPandas().groupby('user_id')['item_id'].apply(set).to_dict()
comm_user = pivot.index.values

def hit_rate(val_pivot, val_test_user_items, val_comm_user):
    hit_rate = np.mean([int(len(set(val_pivot.loc[u]) & val_test_user_items[u]) > 0) for u in val_comm_user])
    return hit_rate

def reciprocal_rank(val_pivot, val_test_user_items, val_comm_user):
    match_indexes = [np.where(pivot.loc[u].isin(set(pivot.loc[u]) & test_users_items[u]))[0] for u in comm_user]
    reciprocal_rank = np.mean([1 / (np.min(index) + 1) if len(index) > 0 else 0 for index in match_indexes])

    return reciprocal_rank

def dcg(val_pivot, val_test_user_items, val_comm_user):
    match_indexes = [np.where(val_pivot.loc[u].isin(set(val_pivot.loc[u]) & val_test_user_items[u]))[0] for u in val_comm_user]
    discounted_cumulative_gain = np.mean([np.sum(1 / np.log2(index + 2)) if len(index) > 0 else 0 for index in match_indexes])
    
    return discounted_cumulative_gain

def precision(val_pivot, val_test_user_items, val_comm_user):
    precision = np.mean([len(set(val_pivot.loc[u]) & val_test_user_items[u]) / len(val_pivot.loc[u]) for u in val_comm_user])

    return precision

def recall(val_pivot, val_test_user_items, val_comm_user):
    recall = np.mean([len(set(val_pivot.loc[u]) & val_test_user_items[u]) / len(test_users_items[u]) for u in val_comm_user])

    return recall

print("EVALUATION CLASSIFICATION TESTSET ONLY\n")

print("hit_rate: {:.3f}".format(hit_rate(pivot, test_users_items, comm_user)))
print("reciprocal_rank: {:.3f}".format(reciprocal_rank(pivot, test_users_items, comm_user)))
print("dcg: {:.3f}".format(dcg(pivot, test_users_items, comm_user)))
print("precision: {:.3f}".format(precision(pivot, test_users_items, comm_user)))
print("recall: {:.3f}".format(recall(pivot, test_users_items, comm_user)))

                                                                                

EVALUATION CLASSIFICATION TESTSET ONLY

hit_rate: 1.000
reciprocal_rank: 1.000
dcg: 4.544
precision: 1.000
recall: 0.146


#### Use RankFM Validation Data for Testing

In [15]:
np.random.seed(42)

interactions = rating.copy()
interactions['random'] = np.random.random(size=len(interactions))
test_pct = 0.25

train_mask = interactions['random'] <  (1 - test_pct)
valid_mask = interactions['random'] >= (1 - test_pct)

interactions_train = interactions[train_mask][['user_id', 'item_id']]
interactions_valid = interactions[valid_mask][['user_id', 'item_id']]

train_users = np.sort(interactions_train.user_id.unique())
valid_users = np.sort(interactions_valid.user_id.unique())
cold_start_users = set(valid_users) - set(train_users)

train_items = np.sort(interactions_train.item_id.unique())
valid_items = np.sort(interactions_valid.item_id.unique())
cold_start_items = set(valid_items) - set(train_items)

print("train shape: {}".format(interactions_train.shape))
print("valid shape: {}".format(interactions_valid.shape))

print("train users: {}".format(len(train_users)))
print("valid users: {}".format(len(valid_users)))
print("cold-start users: {}".format(cold_start_users))

print("train items: {}".format(len(train_items)))
print("valid items: {}".format(len(valid_items)))
print("cold-start items: {}".format(cold_start_items))


train shape: (750042, 2)
valid shape: (250167, 2)
train users: 6040
valid users: 6040
cold-start users: set()
train items: 3670
valid items: 3507
cold-start items: {3842, 2308, 2438, 3220, 3607, 2584, 1820, 2845, 2591, 545, 1316, 2214, 1832, 1579, 3376, 1714, 1843, 2226, 2742, 311, 826, 2235, 3517, 1470, 576, 2895, 601, 3291, 989, 1630, 2909, 868, 2277, 2039, 3065, 2556}


In [16]:
interaction_df_spark = spark.createDataFrame(interactions_valid)

assembler = VectorAssembler(inputCols=["user_id", "item_id"], outputCol="features")
interaction_df_spark = assembler.transform(interaction_df_spark)

In [17]:
# Predict the interaction probabilities
df_test = model.transform(interaction_df_spark)

# Rank the predictions
windowSpec = Window.partitionBy("user_id").orderBy(df_test["prediction"].desc())
df_test = df_test.withColumn("rank", row_number().over(windowSpec))

# Filter to get top 10 predictions for each user
top_10_recommendations = df_test.filter(df_test['rank'] <= 10)

# Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
pivot_recommendations = top_10_recommendations.groupBy("user_id").pivot("rank").agg({"item_id": "first"})
pivot_recommendations = pivot_recommendations.na.fill(0)
pivot_recommendations.show()


24/01/31 16:02:33 WARN StringIndexerModel: Input column interaction does not exist during transformation. Skip StringIndexerModel for this column.


+-------+----+----+----+----+----+----+----+----+----+----+
|user_id|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+-------+----+----+----+----+----+----+----+----+----+----+
|      1| 661|2804| 938|2398|  48| 588|1907| 783|2692|3114|
|      2|1357|1537|2916|1213|2881|3030| 434|3108| 292|1293|
|      3|3421|1641|3534|3868|1079| 653|2167|1580|3114|3552|
|      4| 480|1196|1198|3418|2366|1387|3527|2947|   0|   0|
|      5|1175|1392| 860| 215|1759| 501|3578|3793|1610|2058|
|      6|1101|  48|3508|   1|2858| 590| 597|3524|3604|3536|
|      7| 648| 861| 589|   6| 442| 733|2353|1196|2571| 457|
|      8|  39|2268|3500|3148|1476|2490|1836|   1|2429|1704|
|      9|3148|2278|3298|  50|1265| 805|1552| 593| 597| 524|
|     10|2622|1320|2124|2054|1252| 720|3868|3501|3363|2496|
|     11|1753|1188|2639| 663| 597|1777|1784| 246|2806|3101|
|     12| 813| 934|1641|1233| 999|   0|   0|   0|   0|   0|
|     13|2987|   2|2135|1196| 736| 165|1356| 329|  10|2686|
|     14|3354|2997|2731|2826|2686|2762|2

**Notes:** 
- RankFM able to generate movie_id based on user_id input, while XGBoost is predict the interaction given user_id and movie_id

#### Predict on RankFM Validation dataset

In [18]:
pivot = pivot_recommendations.toPandas().set_index('user_id')
pivot = pivot.rename_axis(None, axis=0)
pivot

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
1,661,2804,938,2398,48,588,1907,783,2692,3114
2,1357,1537,2916,1213,2881,3030,434,3108,292,1293
3,3421,1641,3534,3868,1079,653,2167,1580,3114,3552
4,480,1196,1198,3418,2366,1387,3527,2947,0,0
5,1175,1392,860,215,1759,501,3578,3793,1610,2058
...,...,...,...,...,...,...,...,...,...,...
5910,43,597,2406,2474,1124,608,2890,1183,1185,2130
5918,608,3863,1196,1198,0,0,0,0,0,0
5932,21,1100,1208,3267,493,1379,507,2028,0,0
6030,29,110,2094,920,3070,968,3479,260,3489,2105


**Notes:** 
- The results is looks totally differences than RankFM and few is NaN

In [19]:
k = 10

test_users_items = df_test.toPandas().groupby('user_id')['item_id'].apply(set).to_dict()
comm_user = pivot.index.values

print("EVALUATION CLASSIFICATION ON RANKFM VALIDATION DATASET \n")

print("hit_rate: {:.3f}".format(hit_rate(pivot, test_users_items, comm_user)))
print("reciprocal_rank: {:.3f}".format(reciprocal_rank(pivot, test_users_items, comm_user)))
print("dcg: {:.3f}".format(dcg(pivot, test_users_items, comm_user)))
print("precision: {:.3f}".format(precision(pivot, test_users_items, comm_user)))
print("recall: {:.3f}".format(recall(pivot, test_users_items, comm_user)))

EVALUATION CLASSIFICATION ON RANKFM VALIDATION DATASET 

hit_rate: 1.000
reciprocal_rank: 1.000
dcg: 4.315
precision: 0.928
recall: 0.510


**Notes:** 

- XGB2 outperform because its leverage rich data from user and item features
- Next, we test on unseen data to check whether its over-fitting

#### Prediction on All Combination Data (excluded training data)

In [20]:
# Step 1: Generate all possible user-item pairs
unique_users = balanced_df_spark.select("user_id").distinct()
unique_items = balanced_df_spark.select("item_id").distinct()
user_item_pairs = unique_users.crossJoin(unique_items)

# Exclude already rated items (present in the training data)
training_user_item_pairs = train_data.select("user_id", "item_id")
user_item_pairs = user_item_pairs.subtract(training_user_item_pairs)

In [21]:
assembler = VectorAssembler(inputCols=["user_id", "item_id"], outputCol="features")
all_user_item_pairs_spark = assembler.transform(user_item_pairs)

In [22]:
# Predict the interaction probabilities
df_test = model.transform(all_user_item_pairs_spark)

24/01/31 16:02:41 WARN StringIndexerModel: Input column interaction does not exist during transformation. Skip StringIndexerModel for this column.


In [23]:
# Rank the predictions
windowSpec = Window.partitionBy("user_id").orderBy(df_test["prediction"].desc())
df_test = df_test.withColumn("rank", row_number().over(windowSpec))

In [24]:
# Filter to get top 10 predictions for each user
top_10_recommendations = df_test.filter(df_test['rank'] <= 10)

# # Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
pivot_recommendations = top_10_recommendations.groupBy("user_id").pivot("rank").agg({"item_id": "first"})
pivot_recommendations = pivot_recommendations.na.fill(0)
pivot_recommendations.show()



+-------+----+----+----+----+----+----+----+----+----+----+
|user_id|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+-------+----+----+----+----+----+----+----+----+----+----+
|     19|  61|2909| 186|2826|3416|1668|3439|1898|2901| 848|
|     26|2705| 193| 360|3659|3326|3746| 451|2545|3654| 732|
|     29|2917|1344|1960|1017|3876|1922| 172|3124|3717|3293|
|     54| 173| 639|1005|3435|3683|3576| 900|3931|2295|1695|
|     65| 551| 853| 548| 146| 348| 996| 511| 793| 837| 468|
|    112| 445| 418|  39| 153| 373| 268| 209|  20| 169| 131|
|    113| 440| 173| 235| 351| 342| 377| 401| 156| 300| 101|
|    155| 181| 241| 165| 124| 350| 151| 335| 254| 269| 235|
|    167|  89| 324|  56| 304| 190|  25| 213|  68| 177| 163|
|    191|   6| 217|  59|  53|  35| 105| 272| 215| 290| 176|
|    222| 236|  71| 157| 250|  25|  92| 118|  39| 264| 202|
|    243|   4|  10| 139|  11|  34| 147| 232| 111|   7| 100|
|    270| 132|  77| 188| 190| 111|  69|  27| 185| 134|  72|
|    277|   4| 171| 140| 155| 275| 132| 

                                                                                

#### Evaluation Unseen Data Prediction Results

In [25]:
from pyspark.sql.functions import collect_set

test_users_items = df_test.toPandas().groupby('user_id')['item_id'].apply(set).to_dict()
comm_user = pivot.index.values

print("EVALUATION CLASSIFICATION UNSEEN DATA EXCLUDED TRAINING\n")

print("hit_rate: {:.3f}".format(hit_rate(pivot, test_users_items, comm_user)))
print("reciprocal_rank: {:.3f}".format(reciprocal_rank(pivot, test_users_items, comm_user)))
print("dcg: {:.3f}".format(dcg(pivot, test_users_items, comm_user)))
print("precision: {:.3f}".format(precision(pivot, test_users_items, comm_user)))
print("recall: {:.3f}".format(recall(pivot, test_users_items, comm_user)))

                                                                                

EVALUATION CLASSIFICATION UNSEEN DATA EXCLUDED TRAINING

hit_rate: 0.916
reciprocal_rank: 0.454
dcg: 1.082
precision: 0.232
recall: 0.001


**Notes:** 

- Its significant lower when predicting over unseen data and back testing into test data

#### **Regression Approach**

In [26]:
df = rating[['user_id', 'item_id', 'rating']]
user_item = df.drop(columns=['rating'])

df = spark.createDataFrame(df)
va = VectorAssembler(inputCols=['user_id', 'item_id'], outputCol='features')
va_df = va.transform(df)
va_df.show(5)

+-------+-------+------+------------+
|user_id|item_id|rating|    features|
+-------+-------+------+------------+
|      1|   1193|     5|[1.0,1193.0]|
|      1|    661|     3| [1.0,661.0]|
|      1|    914|     3| [1.0,914.0]|
|      1|   3408|     4|[1.0,3408.0]|
|      1|   2355|     5|[1.0,2355.0]|
+-------+-------+------+------------+
only showing top 5 rows



In [27]:
from pyspark.ml.regression import FMRegressor

(train_data, test_data) = va_df.randomSplit([0.75, 0.25], seed=42)

fm = FMRegressor(featuresCol="features", labelCol="rating", stepSize=0.001)
pipeline = Pipeline(stages=[fm])

model = pipeline.fit(train_data)

In [28]:
predictions = model.transform(test_data)
predictions.show(5)

+-------+-------+------+-----------+-------------------+
|user_id|item_id|rating|   features|         prediction|
+-------+-------+------+-----------+-------------------+
|      1|    150|     5|[1.0,150.0]|0.11770156730430315|
|      1|    588|     4|[1.0,588.0]|0.45603557509911274|
|      1|    595|     5|[1.0,595.0]|0.46144273960724846|
|      1|    608|     4|[1.0,608.0]| 0.4714846165509289|
|      1|    783|     4|[1.0,783.0]| 0.6066637292543353|
+-------+-------+------+-----------+-------------------+
only showing top 5 rows



In [29]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Root Mean Squared Error (RMSE) on test data = 3.3943799544645676


In [34]:

# Predict the interaction probabilities
df_test = predictions.select("*")

# Rank the predictions
windowSpec = Window.partitionBy("user_id").orderBy(df_test["prediction"].desc())
df_test = df_test.withColumn("rank", row_number().over(windowSpec))

# Filter to get top 10 predictions for each user
top_10_recommendations = df_test.filter(df_test['rank'] <= 10)

# Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
pivot_recommendations = top_10_recommendations.groupBy("user_id").pivot("rank").agg({"item_id": "first"})
pivot_recommendations = pivot_recommendations.na.fill(0)
pivot_recommendations.show()


+-------+----+----+----+----+----+----+----+----+----+----+
|user_id|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+-------+----+----+----+----+----+----+----+----+----+----+
|      1|3186|3105|2804|2797|2791|2687|2018|1962|1907|1721|
|      2|3418|3256|3107|3068|2728|2353|1968|1957|1945|1690|
|      3|2997|2470|1641|1304|1270|1079| 653| 260|   0|   0|
|      4|1210| 480|   0|   0|   0|   0|   0|   0|   0|   0|
|      5|3793|3728|3578|3513|3266|3083|3081|2952|2858|2762|
|      6|3685|3682|3624|3600|3565|3536|3408|3072|2966|2321|
|      7|3753|3107|1196|   0|   0|   0|   0|   0|   0|   0|
|      8|3265|3148|3107|3105|2858|2688|2600|2396|2320|2291|
|      9|3948|3755|3623|3510|3178|2890|2028|1912|1682|1584|
|     10|3675|3668|3593|3358|3309|3296|3252|3247|3175|3153|
|     11|3448|3418|3396|3148|2883|2795|2746|2355|2321|2302|
|     12|2616|1247|1198| 813| 593|   0|   0|   0|   0|   0|
|     13|3699|3070|2822|2686|2094|2046|2028|2005|2002|1967|
|     14|2997|2920|2731|   0|   0|   0| 

In [35]:
pivot = pivot_recommendations.toPandas().set_index('user_id')
pivot = pivot.rename_axis(None, axis=0)
pivot

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
1,3186,3105,2804,2797,2791,2687,2018,1962,1907,1721
2,3418,3256,3107,3068,2728,2353,1968,1957,1945,1690
3,2997,2470,1641,1304,1270,1079,653,260,0,0
4,1210,480,0,0,0,0,0,0,0,0
5,3793,3728,3578,3513,3266,3083,3081,2952,2858,2762
...,...,...,...,...,...,...,...,...,...,...
5902,3606,2863,2628,2571,2028,1684,1283,1247,1244,1212
5910,3555,3526,3260,3163,3006,2942,2929,2801,2716,2671
5932,3267,2028,780,507,493,457,0,0,0,0
6030,3489,3479,3439,3053,2968,2916,2797,2762,2664,2641


In [36]:
k = 10

test_users_items = df_test.toPandas().groupby('user_id')['item_id'].apply(set).to_dict()
comm_user = pivot.index.values

print("EVALUATION CLASSIFICATION ON TEST VALIDATION DATASET \n")

print("hit_rate: {:.3f}".format(hit_rate(pivot, test_users_items, comm_user)))
print("reciprocal_rank: {:.3f}".format(reciprocal_rank(pivot, test_users_items, comm_user)))
print("dcg: {:.3f}".format(dcg(pivot, test_users_items, comm_user)))
print("precision: {:.3f}".format(precision(pivot, test_users_items, comm_user)))
print("recall: {:.3f}".format(recall(pivot, test_users_items, comm_user)))

EVALUATION CLASSIFICATION ON TEST VALIDATION DATASET 

hit_rate: 1.000
reciprocal_rank: 1.000
dcg: 4.309
precision: 0.926
recall: 0.511


#### Test on RankFM Validation Data

In [37]:
# Predict the interaction probabilities
df_test = model.transform(interaction_df_spark)

# Rank the predictions
windowSpec = Window.partitionBy("user_id").orderBy(df_test["prediction"].desc())
df_test = df_test.withColumn("rank", row_number().over(windowSpec))

# Filter to get top 10 predictions for each user
top_10_recommendations = df_test.filter(df_test['rank'] <= 10)

# Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
pivot_recommendations = top_10_recommendations.groupBy("user_id").pivot("rank").agg({"item_id": "first"})
pivot_recommendations = pivot_recommendations.na.fill(0)
pivot_recommendations.show()


+-------+----+----+----+----+----+----+----+----+----+----+
|user_id|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+-------+----+----+----+----+----+----+----+----+----+----+
|      1|3114|2804|2692|2398|1907|1246| 938| 783| 661| 608|
|      2|3257|3108|3030|2916|2881|2717|2321|2278|2028|1953|
|      3|3868|3671|3552|3534|3421|3114|2871|2167|1968|1641|
|      4|3527|3418|2947|2366|1387|1198|1196| 480|   0|   0|
|      5|3793|3786|3624|3578|3513|3476|3418|3409|3266|3249|
|      6|3604|3536|3524|3508|3408|2966|2858|2396|2100|1947|
|      7|2571|2353|1221|1196| 861| 733| 648| 589| 480| 457|
|      8|3528|3500|3418|3186|3148|2692|2490|2429|2396|2324|
|      9|3916|3298|3178|3160|3148|2890|2692|2302|2278|1961|
|     10|3868|3812|3702|3701|3688|3671|3591|3501|3451|3447|
|     11|3755|3418|3255|3107|3105|3101|2959|2918|2907|2806|
|     12|1641|1233| 999| 934| 813|   0|   0|   0|   0|   0|
|     13|3256|2987|2916|2871|2686|2528|2470|2414|2135|2115|
|     14|3354|2997|2826|2762|2731|2694|2

In [38]:
pivot = pivot_recommendations.toPandas().set_index('user_id')
pivot = pivot.rename_axis(None, axis=0)
pivot

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
1,3114,2804,2692,2398,1907,1246,938,783,661,608
2,3257,3108,3030,2916,2881,2717,2321,2278,2028,1953
3,3868,3671,3552,3534,3421,3114,2871,2167,1968,1641
4,3527,3418,2947,2366,1387,1198,1196,480,0,0
5,3793,3786,3624,3578,3513,3476,3418,3409,3266,3249
...,...,...,...,...,...,...,...,...,...,...
5910,3176,3125,3102,2942,2919,2890,2716,2611,2474,2406
5918,3863,1198,1196,608,0,0,0,0,0,0
5932,3267,2028,1379,1208,1100,507,493,21,0,0
6030,3489,3479,3070,2762,2615,2571,2311,2253,2161,2105


In [39]:
k = 10

test_users_items = df_test.toPandas().groupby('user_id')['item_id'].apply(set).to_dict()
comm_user = pivot.index.values

print("EVALUATION CLASSIFICATION ON RANKFM VALIDATION DATASET \n")

print("hit_rate: {:.3f}".format(hit_rate(pivot, test_users_items, comm_user)))
print("reciprocal_rank: {:.3f}".format(reciprocal_rank(pivot, test_users_items, comm_user)))
print("dcg: {:.3f}".format(dcg(pivot, test_users_items, comm_user)))
print("precision: {:.3f}".format(precision(pivot, test_users_items, comm_user)))
print("recall: {:.3f}".format(recall(pivot, test_users_items, comm_user)))

EVALUATION CLASSIFICATION ON RANKFM VALIDATION DATASET 

hit_rate: 1.000
reciprocal_rank: 1.000
dcg: 4.315
precision: 0.928
recall: 0.510


In [40]:
# Predict the interaction probabilities
df_test = model.transform(all_user_item_pairs_spark)

# Rank the predictions
windowSpec = Window.partitionBy("user_id").orderBy(df_test["prediction"].desc())
df_test = df_test.withColumn("rank", row_number().over(windowSpec))

# Filter to get top 10 predictions for each user
top_10_recommendations = df_test.filter(df_test['rank'] <= 10)

# # Pivot the data to get a wide format DataFrame with one row per user and top 10 movie recommendations
pivot_recommendations = top_10_recommendations.groupBy("user_id").pivot("rank").agg({"item_id": "first"})
pivot_recommendations = pivot_recommendations.na.fill(0)
pivot_recommendations.show()



+-------+----+----+----+----+----+----+----+----+----+----+
|user_id|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+-------+----+----+----+----+----+----+----+----+----+----+
|     19|3952|3951|3950|3949|3948|3947|3946|3945|3944|3943|
|     26|3952|3951|3950|3949|3948|3947|3946|3945|3944|3943|
|     29|3952|3951|3950|3949|3948|3947|3946|3945|3944|3943|
|     54|3952|3951|3950|3949|3948|3947|3946|3945|3944|3943|
|     65|3952|3951|3950|3949|3947|3946|3945|3944|3943|3942|
|    191|3952|3951|3950|3949|3948|3947|3945|3944|3943|3942|
|    222|3952|3951|3950|3949|3946|3945|3944|3943|3942|3941|
|    243|3952|3951|3950|3949|3948|3947|3946|3945|3944|3943|
|    270|3952|3951|3950|3949|3947|3946|3945|3944|3943|3942|
|    278|3952|3951|3950|3949|3948|3947|3946|3945|3943|3942|
|    293|3952|3951|3950|3949|3947|3946|3945|3944|3943|3942|
|    296|3952|3951|3950|3949|3947|3946|3945|3944|3943|3942|
|    367|3952|3951|3950|3949|3948|3947|3946|3945|3944|3943|
|    418|3952|3951|3950|3949|3948|3947|3

                                                                                

In [41]:
from pyspark.sql.functions import collect_set

test_users_items = df_test.toPandas().groupby('user_id')['item_id'].apply(set).to_dict()
comm_user = pivot.index.values

print("EVALUATION CLASSIFICATION UNSEEN DATA EXCLUDED TRAINING\n")

print("hit_rate: {:.3f}".format(hit_rate(pivot, test_users_items, comm_user)))
print("reciprocal_rank: {:.3f}".format(reciprocal_rank(pivot, test_users_items, comm_user)))
print("dcg: {:.3f}".format(dcg(pivot, test_users_items, comm_user)))
print("precision: {:.3f}".format(precision(pivot, test_users_items, comm_user)))
print("recall: {:.3f}".format(recall(pivot, test_users_items, comm_user)))

                                                                                

EVALUATION CLASSIFICATION UNSEEN DATA EXCLUDED TRAINING

hit_rate: 0.915
reciprocal_rank: 0.445
dcg: 1.064
precision: 0.230
recall: 0.001


#### **Result Comparison**

| Metrics | FMClassifier (test data) | FMClassifier (all unseen data) | FM Regressor (test data) | FM Regressor (all unseen data) | RankFM |
| --- | --- | --- | --- | --- | --- |
| hit_rate | 1.000 |  0.916 | 1.000 | 0.915 | 0.788 |
| reciprocal_rank | 1.000 | 0.454 | 1.000 | 0.445 | 0.334 |
| dcg | 4.544 | 1.082 | 4.309 | 1.064 | 0.718 |
| precision | 1.000 | 0.232 | 0.926 | 0.230 | 0.156 |
| recall | 0.146 | 0.001 | 0.511 | 0.001 | 0.072 |
