In [1]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [3]:
# read the data using spark
movies = spark.read.csv('/Users/sachinim/Documents/Projects/als-recommender-pyspark/movies.csv', header=True)
ratings = spark.read.csv('/Users/sachinim/Documents/Projects/als-recommender-pyspark/ratings.csv', header=True)

In [4]:
ratings.show(10)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 10 rows



In [5]:
movies.show(10)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [6]:
# print out the schema in the tree format
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [7]:
# Convert the data types of Ids and rating values Strings to Integer 
# drop the unwanted column Time stamp
ratings = ratings.\
    withColumn('userId', col('userId').cast('Integer')).\
    withColumn('movieId', col('movieId').cast('Integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [8]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [9]:
# Calculate Sparsity

# Get the number of ratings
numerator = ratings.select('rating').count()

# Number of distinct UserIds
dis_users = ratings.select('userId').distinct().count()

# Number of distinct MovieIds
dis_movie = ratings.select('movieId').distinct().count()

# denominator equals to number of users multiplied by number of movies
denominator = dis_users * dis_movie

# Divide numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.30% empty.


In [10]:
# Interpret ratings

#Grouping the data by userId and number of ratings
userId_ratings = ratings.groupBy('userId').count().orderBy('count',acending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|   576|   20|
|   569|   20|
|   320|   20|
|   595|   20|
|   189|   20|
|   194|   20|
|   207|   20|
|   257|   20|
|   278|   20|
|   406|   20|
|   442|   20|
|   431|   20|
|    53|   20|
|   147|   20|
|   157|   21|
|    87|   21|
|   324|   21|
|    37|   21|
|   598|   21|
|    26|   21|
+------+-----+
only showing top 20 rows



In [11]:
# grouping the data by movieId and number of ratings

movieId_ratings = ratings.groupBy('movieId').count().orderBy('count',acending=False)
movieId_ratings.show(10)

+-------+-----+
|movieId|count|
+-------+-----+
|  56022|    1|
|  87028|    1|
|  71468|    1|
| 185435|    1|
|    243|    1|
|   1990|    1|
|  45183|    1|
|   5614|    1|
|   1507|    1|
| 100068|    1|
+-------+-----+
only showing top 10 rows



In [12]:
# Build out an ALS Model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [13]:
# Creating testing and training datasets

# train--> 80% test--> 20%
(train,test) = ratings.randomSplit([0.8,0.2], seed=1234)

# create ALS model
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating',nonnegative=True, implicitPrefs = False, coldStartStrategy = "drop")

# Check "ALS" Model created
type(als)

pyspark.ml.recommendation.ALS

In [14]:
# tuning ALS model thru Spark

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [15]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

# Define evaluater as RMSE and print the length of evaluater
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
print("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [16]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_e6191336007d


In [17]:
# Checking best model and best model parameters

# fir cross validator to the "train" dataset
model = cv.fit(train)

# Extract best model from the cv model above
best_model = model.bestModel

In [18]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 150
  MaxIter: 10
  RegParam: 0.15


In [19]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8686158604174674


In [20]:
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   597|    471|   2.0|  4.138487|
|   436|    471|   3.0| 3.5936015|
|   218|    471|   4.0| 3.0037208|
|   387|    471|   3.0| 3.0054257|
|   217|    471|   2.0| 2.7966945|
|   287|    471|   4.5| 2.9292014|
|    32|    471|   3.0|  3.704304|
|   260|    471|   4.5| 3.6457002|
|   104|    471|   4.5| 3.5362327|
|   111|   1088|   3.0| 3.2533147|
|   177|   1088|   3.5|  3.539239|
|    41|   1088|   1.5| 2.6881957|
|   387|   1088|   1.5| 2.6142406|
|   594|   1088|   4.5|  4.434395|
|   307|   1088|   3.0| 2.6732225|
|   509|   1088|   3.0| 3.1198432|
|   104|   1088|   3.0|  3.670084|
|   268|   1238|   5.0| 3.7642636|
|   462|   1238|   3.5| 3.5396466|
|   307|   1342|   2.0| 2.2041955|
+------+-------+------+----------+
only showing top 20 rows



In [21]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{3379, 4.764641}...|
|   463|[{3379, 4.9576507...|
|   496|[{3379, 4.4238644...|
|   148|[{33649, 4.490478...|
|   540|[{3379, 5.322866}...|
|   392|[{3379, 4.6531377...|
|   243|[{86237, 5.608533...|
|    31|[{33649, 5.003610...|
|   516|[{4429, 4.815257}...|
|   580|[{3379, 4.720453}...|
+------+--------------------+



In [22]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|   471|   3379| 4.764641|
|   471| 171495| 4.499823|
|   471|  33649|4.4932084|
|   471|   7096| 4.490959|
|   471|  86781|4.4882207|
|   471|  78836|  4.43601|
|   471|   8477|4.4281893|
|   471|  26073| 4.403054|
|   471| 179135| 4.403054|
|   471| 117531| 4.403054|
+------+-------+---------+



In [23]:
nrecommendations.join(movies, on='movieId').filter('userId = 100').show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  67618|   100| 5.065317|Strictly Sexual (...|Comedy|Drama|Romance|
|  33649|   100|5.0045204|  Saving Face (2004)|Comedy|Drama|Romance|
|   3379|   100|4.9994802| On the Beach (1959)|               Drama|
|  74282|   100| 4.909474|Anne of Green Gab...|Children|Drama|Ro...|
|  42730|   100|4.9013314|   Glory Road (2006)|               Drama|
| 171495|   100|4.8605995|              Cosmos|  (no genres listed)|
|   7121|   100| 4.848839|   Adam's Rib (1949)|      Comedy|Romance|
|   7071|   100|4.8485265|Woman Under the I...|               Drama|
|  84273|   100|4.8485265|Zeitgeist: Moving...|         Documentary|
|  26073|   100|4.8485265|Human Condition I...|           Drama|War|
+-------+------+---------+--------------------+--------------------+



In [24]:
ratings.join(movies, on='movieId').filter('userId = 100').sort('rating', ascending=False).limit(10).show()

+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1101|   100|   5.0|      Top Gun (1986)|      Action|Romance|
|   1958|   100|   5.0|Terms of Endearme...|        Comedy|Drama|
|   2423|   100|   5.0|Christmas Vacatio...|              Comedy|
|   4041|   100|   5.0|Officer and a Gen...|       Drama|Romance|
|   5620|   100|   5.0|Sweet Home Alabam...|      Comedy|Romance|
|    368|   100|   4.5|     Maverick (1994)|Adventure|Comedy|...|
|    934|   100|   4.5|Father of the Bri...|              Comedy|
|    539|   100|   4.5|Sleepless in Seat...|Comedy|Drama|Romance|
|     16|   100|   4.5|       Casino (1995)|         Crime|Drama|
|    553|   100|   4.5|    Tombstone (1993)|Action|Drama|Western|
+-------+------+------+--------------------+--------------------+

