In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.sql.functions import explode, col

import codecs
import sys

In [4]:
spark = SparkSession.builder.appName("ALSMovieRecNotebook").getOrCreate()

In [5]:
ratingsSchema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', IntegerType(), True),
    StructField('timeStamp', LongType(), True)
])

In [8]:
data = spark.read.option('sep', '::').schema(ratingsSchema).csv('ml-1m/ratings.dat')
data.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timeStamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
+------+-------+------+---------+
only showing top 5 rows



In [10]:
movieSchema = StructType([
    StructField('movieId', IntegerType(), True),
    StructField('title', StringType(), True),
    StructField('genres', StringType(), True)
])

In [11]:
movies = spark.read.option('sep', '::').schema(movieSchema).csv('ml-1m/movies.dat')
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [19]:
# Splitting data into training and test set
(train, test) = data.randomSplit([0.7, 0.3], seed = 42)

In [13]:
# Setting up ALS Model
USERID = 'userId'
MOVIEID = 'movieId'
RATING = 'rating'

als = ALS(
    maxIter = 5,
    regParam = 0.01,
    nonnegative = True,
    implicitPrefs = False,
    userCol = USERID,
    itemCol = MOVIEID,
    ratingCol = RATING
)

In [14]:
# Hyperparameter Tuning
param_grid = ParamGridBuilder().addGrid(als.rank, [10,50,100,150]).addGrid(als.regParam, [0.01, 0.05, 0.1, 0.15]).build()

In [15]:
# Evaluator
eval = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')

In [16]:
# Cross Validation
cross_val = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = eval, numFolds = 5)

In [20]:
model = cross_val.fit(train)

In [21]:
best_model = model.bestModel

In [23]:
# view the parameters of the best model
print('Rank:', best_model._java_obj.parent().getRank())
print('Reg Param:', best_model._java_obj.parent().getRegParam())
print('Max Iteration:', best_model._java_obj.parent().getMaxIter())

Rank: 10
Reg Param: 0.01
Max Iteration: 5


In [24]:
# Get predictions for the test data using the best model
predictions = best_model.transform(test)
predictions = predictions.na.drop()

In [25]:
# Evaluate Performance of model using RMSE
rmse = eval.evaluate(predictions)
print(f"RMSE: {rmse}")

RMSE: 0.8912259133404188


In [42]:
# Making recommendations for users
num_recommendations = 5
userID = 1
userSchema = StructType(
    [StructField('userId', IntegerType(), True)]
)
users = spark.createDataFrame([[userID,]], userSchema)

recommendations = best_model.recommendForUserSubset(users, num_recommendations)



In [44]:
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{138, 9.847504},...|
+------+--------------------+



In [50]:
recommendations = recommendations.withColumn('rec_explode', explode('recommendations'))\
    .select('userId', col('rec_explode.movieId'), col('rec_explode.rating'))

recommendations.show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1|    138| 9.847504|
|     1|   1846| 8.216328|
|     1|   3853|7.0898976|
|     1|   1773|7.0604815|
|     1|   2994|  6.51245|
+------+-------+---------+



In [52]:
movies.show(1)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Animation|Childre...|
+-------+----------------+--------------------+
only showing top 1 row



In [53]:
recommendations.join(movies, on = 'movieId').show()

+-------+------+---------+--------------------+------------+
|movieId|userId|   rating|               title|      genres|
+-------+------+---------+--------------------+------------+
|    138|     1| 9.847504|Neon Bible, The (...|       Drama|
|   1846|     1| 8.216328| Nil By Mouth (1997)|       Drama|
|   3853|     1|7.0898976|Tic Code, The (1998)|       Drama|
|   1773|     1|7.0604815|   Tokyo Fist (1995)|Action|Drama|
|   2994|     1|  6.51245|    City, The (1998)|       Drama|
+-------+------+---------+--------------------+------------+



In [54]:
data.join(movies, on = 'movieId').filter('userId = 1').sort('rating', ascending = False).show(5)

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timeStamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|   1035|     1|     5|978301753|Sound of Music, T...|             Musical|
|   1836|     1|     5|978300172|Last Days of Disc...|               Drama|
|   3105|     1|     5|978301713|   Awakenings (1990)|               Drama|
|   2355|     1|     5|978824291|Bug's Life, A (1998)|Animation|Childre...|
|   1270|     1|     5|978300055|Back to the Futur...|       Comedy|Sci-Fi|
+-------+------+------+---------+--------------------+--------------------+
only showing top 5 rows

