# Importing the modules

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [2]:
#SparkSession creation
spark = SparkSession.builder.appName("ALS CF").getOrCreate()

In [3]:
#Loading rating Data 
ratings = (
    spark.read.csv(
        path="movielens/ratings.csv",
        sep= ",",
        quote='"',
        header=True,
        schema="userId INT, movieID INT, rating DOUBLE, timestamp INT",
     )
     .select("userId","movieID","rating")
     .cache()
)

#Loading movie data
movies = (
    spark.read.csv(
        path="movielens/movies.csv",
        sep= ",",
        quote='"',
        header=True,
        schema="movieID INT, title STRING, genres STRING",
    )
    .withColumn("release_year",f.regexp_extract(f.col("title"),"\s?\((\d{4})\)",1))
    .withColumn("title",f.regexp_replace(f.col("title"),"\s?\((\d{4})\)",""))
    .withColumn("genres",f.split(f.col("genres"),"\|"))
    .cache()
)

In [4]:
movies.show(5,False)
ratings.show(5,False)

+-------+---------------------------+-------------------------------------------------+------------+
|movieID|title                      |genres                                           |release_year|
+-------+---------------------------+-------------------------------------------------+------------+
|1      |Toy Story                  |[Adventure, Animation, Children, Comedy, Fantasy]|1995        |
|2      |Jumanji                    |[Adventure, Children, Fantasy]                   |1995        |
|3      |Grumpier Old Men           |[Comedy, Romance]                                |1995        |
|4      |Waiting to Exhale          |[Comedy, Drama, Romance]                         |1995        |
|5      |Father of the Bride Part II|[Comedy]                                         |1995        |
+-------+---------------------------+-------------------------------------------------+------------+
only showing top 5 rows

+------+-------+------+
|userId|movieID|rating|
+------+-------+--

In [5]:
als = ALS(
          userCol="userId",
          itemCol="movieID",
          ratingCol="rating",
          coldStartStrategy="drop",     #default: Nan Now drop
          implicitPrefs="True"          #default: False(explicit Feedback)
)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

parameter_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [1,5,10])
    .addGrid(als.maxIter,[20])
    .addGrid(als.regParam,[0.05,0.1])
    .build()
)

crossvalidator = CrossValidator(
    estimator=als,
    estimatorParamMaps=parameter_grid,
    evaluator=evaluator,
    numFolds=2
)


(training_data,validation_data) = ratings.randomSplit([8.0,2.0])
crossval_model = crossvalidator.fit(training_data)

#Selecting the best model
model = crossval_model.bestModel

#Statistic about out trained model
predictions = model.transform(validation_data).na.drop()
print(f"Root Mean Square Error for Best Model: ({model}):  {evaluator.evaluate(predictions)}")

Root Mean Square Error for Best Model: (ALSModel: uid=ALS_69a8413310ea, rank=1):  0.8812238821543542


In [6]:
#Let's have a look at our data
predictions.toPandas()

Unnamed: 0,userId,movieID,rating,prediction
0,597,471,2.0,4.020075
1,602,471,4.0,3.499872
2,462,471,2.5,3.153466
3,610,471,4.0,3.832452
4,411,471,4.0,3.492277
...,...,...,...,...
19504,357,79008,4.0,4.149889
19505,496,84374,3.0,2.746061
19506,484,84374,4.0,3.266302
19507,563,84374,2.5,2.948870


# Using the model to show predictions

<h3>Method 1:</h3>
Pre-Compute ratings per movie/user and simply filter based on results

<h3>Top 5 movies for each user,for all users</h3>
"We can use the .recommendForAllUsers method to return "top" numItems items recommended for each user,for all users"


In [7]:
USER_ID = 50

rec_all_users = model.recommendForAllUsers(5).cache()
rec_all_users.show(5,False)

recommendation_for_user_1 = (
    rec_all_users.filter(f"userId == {USER_ID}")
    #Use explode to convert the arrays to the rows with structs
    .withColumn("rec",f.explode("recommendations"))
    #Select the column we want from resulting struct
    .select(
        "userId",
        f.col("rec").movieID.alias("movieID"),
        f.col("rec").rating.alias("rating"),
    )
    #Join movies dataframe and select only the columns we want
    .join(movies,"movieID")
    .orderBy("rating",ascending=False)
    .select("movieID","title","release_year")
)

recommendation_for_user_1.show(5,False)

+------+------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                 |
+------+------------------------------------------------------------------------------------------------+
|471   |[[6835, 7.32343], [5746, 7.32343], [40491, 7.225782], [136850, 7.0887785], [5764, 6.5910873]]   |
|463   |[[6835, 7.8463063], [5746, 7.8463063], [40491, 7.741686], [136850, 7.594901], [5764, 7.0616755]]|
|496   |[[6835, 6.8111854], [5746, 6.8111854], [40491, 6.720367], [136850, 6.5929465], [5764, 6.130067]]|
|148   |[[6835, 7.284841], [5746, 7.284841], [40491, 7.1877074], [136850, 7.051426], [5764, 6.556357]]  |
|540   |[[6835, 8.467546], [5746, 8.467546], [40491, 8.354643], [136850, 8.196236], [5764, 7.6207914]]  |
+------+------------------------------------------------------------------------------------------------+
only showing top 5 rows

+-------+------------

# Method 2
Similar to method 1,but more on the fly. Brings with it the same drawbacks however

<h3>Top 5 Movies for User Subset</h3>
We can use the .recommendForUserSubset method to return top numItems items recommended for a subset of user

In [8]:
USER_ID = 50

subset = ratings.filter(f"userId == {USER_ID}").select("userId")
rec_subset = model.recommendForUserSubset(subset,5)
rec_subset.show(1,False)

recommendations_for_user_2 = (
    rec_subset
    #Use explode to convert the arrays to the rows with structs
    .withColumn("rec",f.explode("recommendations"))
    #Select the column we want from resulting struct
    .select(
        "userId",
        f.col("rec").movieID.alias("movieID"),
        f.col("rec").rating.alias("rating"),
    )
    #Join movies dataframe and select only the columns we want
    .join(movies,"movieID")
    .orderBy("rating",ascending=False)
    .select("userId","movieID","title","release_year","rating")
)

recommendations_for_user_2.show(5,False)

+------+-----------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                |
+------+-----------------------------------------------------------------------------------------------+
|50    |[[6835, 5.690754], [5746, 5.690754], [40491, 5.6148753], [136850, 5.508415], [5764, 5.1216784]]|
+------+-----------------------------------------------------------------------------------------------+

+------+-------+------------------------------------------------+------------+---------+
|userId|movieID|title                                           |release_year|rating   |
+------+-------+------------------------------------------------+------------+---------+
|50    |6835   |Alien Contamination                             |1980        |5.690754 |
|50    |5746   |Galaxy of Terror (Quest)                        |1981        |5.690754 |
|50    |40491  |Match Factory

# Method 3
Manually prepare the data and apply the model to it

In [14]:
USER_ID = 50

movies_to_be_rated = (
    ratings
    #Select all the movieID that this user has not yet rated
    .filter(f"userId=={USER_ID}")
    .select("movieID").distinct()
    #Add userId back to the data
    .withColumn("userId",f.lit(USER_ID))
)

#Apply the predictions
user_movie_predictions = model.transform(movies_to_be_rated)

#Extract the recommendations
recommendations_for_user_3 = (
    user_movie_predictions
    .dropna()
    .orderBy("prediction",ascending=False)
    .limit(5)
    .join(movies,"movieID")
    .select("userId","movieID","title","release_year",f.col("prediction").alias("rating"))
)

recommendations_for_user_3.show(5,False)

+------+-------+-----------------------------------------------------------------------------+------------+---------+
|userId|movieID|title                                                                        |release_year|rating   |
+------+-------+-----------------------------------------------------------------------------+------------+---------+
|50    |1232   |Stalker                                                                      |1979        |3.5559518|
|50    |6666   |Discreet Charm of the Bourgeoisie, The (Charme discret de la bourgeoisie, Le)|1972        |3.6596885|
|50    |8235   |Safety Last!                                                                 |1923        |3.554216 |
|50    |26326  |Holy Mountain, The (Montaña sagrada, La)                                     |1973        |4.1546607|
|50    |168492 |Call Me by Your Name                                                         |2017        |3.6118174|
+------+-------+----------------------------------------