In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("ALS Model").getOrCreate()

In [3]:
train = spark.read.csv("train_data.csv", header=True, inferSchema=True)
test = spark.read.csv("test_data.csv", header=True, inferSchema=True)

In [4]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True,  
          rank=2, maxIter=20, regParam=0.13, coldStartStrategy="drop")

als_model = als.fit(train)

predictions = als_model.transform(test)

In [5]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
mae = evaluator_mae.evaluate(predictions)
print(f"MAE: {mae}")

RMSE: 0.8850576441646054
MAE: 0.6855598945020555


In [6]:
predictions_clipped = predictions.withColumn(
    "prediction",
    F.when(F.col("prediction") < 0, 0)
     .when(F.col("prediction") > 5, 5)
     .otherwise(F.col("prediction"))
)

In [7]:
rmse_clipped = evaluator.evaluate(predictions_clipped)
print(f"RMSE: {rmse_clipped}")

mae_clipped = evaluator_mae.evaluate(predictions_clipped)
print(f"MAE: {mae_clipped}")

RMSE: 0.8849243499345293
MAE: 0.685347302104303


In [8]:
userRecs = als_model.recommendForAllUsers(5)
userRecs.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                    |
+------+---------------------------------------------------------------------------------------------------+
|1     |[{167746, 5.792939}, {6666, 5.599677}, {26084, 5.5527563}, {177593, 5.4989057}, {3200, 5.4678726}] |
|2     |[{167746, 4.686495}, {47423, 4.5686626}, {54881, 4.553201}, {130490, 4.5360503}, {8827, 4.5030203}]|
|3     |[{158872, 4.0467963}, {89904, 3.8038788}, {1232, 3.5286481}, {6639, 3.4873235}, {158966, 3.39644}] |
|4     |[{158872, 5.4484115}, {89904, 5.264735}, {158966, 4.99732}, {1232, 4.8181734}, {6639, 4.796327}]   |
|5     |[{167746, 4.5993185}, {6666, 4.4700265}, {26084, 4.431964}, {3201, 4.413035}, {86781, 4.409999}]   |
|6     |[{130490, 5.266449}, {54881, 5.1798124}, {167746, 5.1301174}, {47423, 5.129494}, {8827, 5.0688295}]|
|7     |[{167746, 4

In [9]:
from pyspark.sql import functions as F

userRecs = userRecs.withColumn("recommendations", F.expr("transform(recommendations, x -> concat(x.movieId, ':', x.rating))"))
userRecs = userRecs.withColumn("recommendations", F.concat_ws(", ", "recommendations"))

userRecs.toPandas().to_csv("user_recommendations.csv", index=False)