In [21]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col,explode

ss = SparkSession.builder.appName("Collaborative Filtering - Recommendation Engine").getOrCreate()
moviesDF = ss.read.options(inferSchema='True', header='True').csv('movies.csv')
ratingDF = ss.read.options(inferSchema='True', header='True').csv('ratings.csv')

#ratingDF.printSchema()
#moviesDF.printSchema()

In [22]:
ratings = ratingDF.join(moviesDF,"movieId","left")
(train,test) = ratings.randomSplit([0.8,0.2])

In [23]:
#ratings.count()


In [24]:
#test.count()


In [27]:
#train.count()
display(ratings)

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, title: string, genres: string]

In [32]:
als = ALS(userCol="userId",itemCol ="movieId",ratingCol="rating",nonnegative=True,implicitPrefs=False,coldStartStrategy="Drop")

In [33]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [34]:
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction")


In [35]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)


In [36]:
model = cv.fit(train)
best_model = model.bestModel
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)


22/01/25 12:18:50 WARN CacheManager: Asked to cache already cached data.
22/01/25 12:18:50 WARN CacheManager: Asked to cache already cached data.
22/01/25 12:18:56 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/25 12:18:56 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

0.8618574095283522


In [38]:
recommendations = best_model.recommendForAllUsers(5)
df = recommendations
df.show()
df2 = df.withColumn("movieid_rating", explode("recommendations"))
df2.show()
display(df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")))
df2.show()



                                                                                

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{170355, 5.90361...|
|     2|[{170355, 4.79047...|
|     3|[{6835, 4.8407235...|
|     4|[{3851, 4.8461304...|
|     5|[{170355, 4.80633...|
|     6|[{33649, 4.688395...|
|     7|[{5490, 4.6856084...|
|     8|[{170355, 4.78006...|
|     9|[{170355, 4.88653...|
|    10|[{71579, 4.46295}...|
|    11|[{170355, 5.08758...|
|    12|[{67618, 5.578362...|
|    13|[{170355, 5.24418...|
|    14|[{170355, 4.41399...|
|    15|[{60943, 4.366083...|
|    16|[{170355, 4.56995...|
|    17|[{170355, 5.20470...|
|    18|[{170355, 4.99271...|
|    19|[{170355, 4.05804...|
|    20|[{170355, 4.99511...|
+------+--------------------+
only showing top 20 rows





+------+--------------------+-------------------+
|userId|     recommendations|     movieid_rating|
+------+--------------------+-------------------+
|     1|[{170355, 5.90361...| {170355, 5.903616}|
|     1|[{170355, 5.90361...|   {3379, 5.903616}|
|     1|[{170355, 5.90361...| {33649, 5.6599727}|
|     1|[{170355, 5.90361...|   {5490, 5.484412}|
|     1|[{170355, 5.90361...| {132333, 5.484412}|
|     2|[{170355, 4.79047...|  {170355, 4.79047}|
|     2|[{170355, 4.79047...|    {3379, 4.79047}|
|     2|[{170355, 4.79047...|  {33649, 4.760414}|
|     2|[{170355, 4.79047...| {117531, 4.619274}|
|     2|[{170355, 4.79047...|   {7071, 4.619274}|
|     3|[{6835, 4.8407235...|  {6835, 4.8407235}|
|     3|[{6835, 4.8407235...|  {5181, 4.8407235}|
|     3|[{6835, 4.8407235...| {70946, 4.8212543}|
|     3|[{6835, 4.8407235...|   {5919, 4.739641}|
|     3|[{6835, 4.8407235...|   {7991, 4.618826}|
|     4|[{3851, 4.8461304...|  {3851, 4.8461304}|
|     4|[{3851, 4.8461304...|  {5490, 4.7504883}|


                                                                                

DataFrame[userId: int, movieId: int, rating: float]



+------+--------------------+-------------------+
|userId|     recommendations|     movieid_rating|
+------+--------------------+-------------------+
|     1|[{170355, 5.90361...| {170355, 5.903616}|
|     1|[{170355, 5.90361...|   {3379, 5.903616}|
|     1|[{170355, 5.90361...| {33649, 5.6599727}|
|     1|[{170355, 5.90361...|   {5490, 5.484412}|
|     1|[{170355, 5.90361...| {132333, 5.484412}|
|     2|[{170355, 4.79047...|  {170355, 4.79047}|
|     2|[{170355, 4.79047...|    {3379, 4.79047}|
|     2|[{170355, 4.79047...|  {33649, 4.760414}|
|     2|[{170355, 4.79047...| {117531, 4.619274}|
|     2|[{170355, 4.79047...|   {7071, 4.619274}|
|     3|[{6835, 4.8407235...|  {6835, 4.8407235}|
|     3|[{6835, 4.8407235...|  {5181, 4.8407235}|
|     3|[{6835, 4.8407235...| {70946, 4.8212543}|
|     3|[{6835, 4.8407235...|   {5919, 4.739641}|
|     3|[{6835, 4.8407235...|   {7991, 4.618826}|
|     4|[{3851, 4.8461304...|  {3851, 4.8461304}|
|     4|[{3851, 4.8461304...|  {5490, 4.7504883}|


                                                                                