In [None]:
!pip install pyspark
!pip install findspark
!pip install "numpy<2.0"
!pip install scikit-surprise

In [ ]:
import pandas as pd
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from surprise import KNNBasic
from surprise.model_selection import GridSearchCV
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from surprise import Dataset, Reader
from surprise import NMF

In [None]:
findspark.init()

In [None]:
spark = SparkSession.builder \
    .appName("rnmp_lab2") \
    .getOrCreate()

In [None]:
movies_df = spark.read\
    .option("delimiter", "|")\
    .option("header", False)\
    .csv("u.item", inferSchema=True)

In [None]:
ratings_df = spark.read \
    .option("delimiter", "\t") \
    .option("header", False) \
    .csv("u.data", inferSchema=True)

In [None]:
movies_df.show(1)

In [None]:
ratings_df.show(1)

In [None]:
movies_df = movies_df.select(
    F.col("_c0").alias("movie_id"),
    F.col("_c1").alias("movie_title")
)
ratings_df = ratings_df.select(
    F.col("_c0").alias("user_id"),
    F.col("_c1").alias("movie_id"),
    F.col("_c2").alias("movie_rating")
)

In [None]:
movies_df.show(1)

In [None]:
ratings_df.show(1)

In [None]:
train_df, test_df = ratings_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="movie_rating",
    coldStartStrategy="drop",
    nonnegative=True
)

In [None]:
mse_evaluator = RegressionEvaluator(
    metricName="mse",
    labelCol="movie_rating",
    predictionCol="prediction"
)
mae_evaluator = RegressionEvaluator(
    metricName="mae",
    labelCol="movie_rating",
    predictionCol="prediction"
)

In [None]:
parameter_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20]) \
    .addGrid(als.maxIter, [10, 15]) \
    .build()

In [None]:
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=parameter_grid,
    evaluator=mse_evaluator,
    numFolds=3,
)

In [None]:
cv_model = cv.fit(train_df)

In [None]:
best_als = cv_model.bestModel

In [None]:
print(f"Best als model has rank [{best_als.rank}] , max number of iterations [{best_als._java_obj.parent().getMaxIter()}]",)

In [None]:
final_als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="movie_rating",
    rank = best_als._java_obj.parent().getRank(),
    regParam = 0.1,
    maxIter = best_als._java_obj.parent().getMaxIter(),
    nonnegative=True,
    coldStartStrategy="drop"
)

In [None]:
final_model = final_als.fit(train_df)

In [None]:
predictions = final_model.transform(test_df)
predictions.show(5)

In [None]:
mae = mae_evaluator.evaluate(predictions)
mse = mse_evaluator.evaluate(predictions)
mse, mae

In [None]:
user_recommendations = final_model.recommendForAllUsers(5)
# dava top 5 preporaki za sekoj user
# vrakja user_id | recommendations [{movie_id, rating}]

In [None]:
user_recommendations.show(5, truncate=False)

In [None]:
user_recommendations = user_recommendations \
    .withColumn("rec", F.explode("recommendations")) \
    .select(
        "user_id",
        "rec.movie_id",
        "rec.rating"
    )

In [None]:
user_recommendations = user_recommendations.join(
    movies_df,
    on="movie_id",
    how="inner"
)

In [None]:
user_recommendations.show(10, truncate=False)

KNN

In [None]:
train_pd = train_df.toPandas()
test_pd  = test_df.toPandas()

In [None]:
reader = Reader(rating_scale=(1, 5))

data = Dataset.load_from_df(
    train_pd,
    reader
)

In [None]:
knn_parameter_grid = {
    "k": [20, 40, 60],
}

In [None]:
gs = GridSearchCV(
    KNNBasic,
    knn_parameter_grid,
    measures=["mse"],
    cv=3,
    n_jobs=-1
)
gs.fit(data)

In [None]:
best_knn = gs.best_estimator["mse"]
print("Best K parameter:", gs.best_params["mse"]["k"])

In [None]:
trainset = data.build_full_trainset()
best_knn.fit(trainset)

In [None]:
y_true = []
y_pred = []

for _, row in test_pd.iterrows():
    pred = best_knn.predict(row.user_id, row.movie_id).est
    y_pred.append(pred)
    y_true.append(row.movie_rating)

In [None]:
mse = mean_squared_error(y_true, y_pred)
mae = mean_absolute_error(y_true, y_pred)

print("KNN Evaluation Results")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")

NMF

In [None]:
nmf_param_grid = {
    "n_factors": [20, 40],
    "reg_pu": [0.06, 0.1],
    "reg_qi": [0.06, 0.1],
}
gs_nmf = GridSearchCV(
    NMF,
    nmf_param_grid,
    measures=["mse"],
    cv=3,
    n_jobs=-1
)
gs_nmf.fit(data)

In [None]:
best_nmf = gs_nmf.best_estimator["mse"]

gs_nmf.best_params["mse"]

In [None]:
best_nmf.fit(trainset)

In [None]:
y_true_nmf = []
y_pred_nmf = []

for _, row in test_pd.iterrows():
    pred = best_nmf.predict(row.user_id, row.movie_id).est

    y_pred_nmf.append(pred)
    y_true_nmf.append(row.movie_rating)

In [None]:
nmf_mse = mean_squared_error(y_true_nmf, y_pred_nmf)
nmf_mae = mean_absolute_error(y_true_nmf, y_pred_nmf)

print("NMF Evaluation Results")
print(f"MSE: {nmf_mse:.4f}")
print(f"MAE: {nmf_mae:.4f}")