In [1]:
#Instantiate SparkSession
from pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('Use Colloborative Filtering for Movie Recommendations') \
.getOrCreate()

In [2]:
# wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
# gsutil cp ratings.csv gs://dexdebra-123/datasets
# Use sparkSession to read csv file

rawdata=spark.read \
           .format('csv') \
           .option('header','true') \
           .load('gs://dexdebra-123/datasets/ratings.csv')

In [3]:
rawdata.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [4]:
# Convert rawdata to dataframe
# userId:
# movieId:
# rating: rating given by userId for movieId
# timestamp


from pyspark.sql.functions  import col
dataset=rawdata.select(
    col('userId').cast('int'),
    col('movieId').cast('int'),
    col('rating').cast('float')
)

In [5]:
dataset.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0
3,1,47,5.0
4,1,50,5.0


In [6]:
# Examine the distribution of the values for the movie rating
dataset.select('rating').toPandas().describe()

Unnamed: 0,rating
count,100836.0
mean,3.501557
std,1.042541
min,0.5
25%,3.0
50%,3.5
75%,4.0
max,5.0


In [7]:
# Split the movie data
(trainingData,testData)=dataset.randomSplit([0.8,0.2])

In [8]:
# Use ALS library to instantiate estimator to train the model
# weighted regularization - prevent overfitting on the training data

from pyspark.ml.recommendation  import ALS
als=ALS(
            maxIter=5,
            regParam=0.1,
            userCol='userId', # Columns for user's
            itemCol='movieId', # Columns for products in this case movies
            ratingCol='rating',
            coldStartStrategy='drop' # if the algorithm encountes new user or product during validation it will drop that row
)


In [9]:
# Run training on our ML model
model=als.fit(trainingData)

In [10]:
# Call transform on the trained model to get predictions on testData
predictions=model.transform(testData)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,prediction
0,597,471,2.0,4.430201
1,91,471,1.0,2.81255
2,182,471,4.5,3.845886
3,218,471,4.0,3.720989
4,474,471,3.0,3.394701


In [11]:
# Explore the ratings matrix
# We will compare the distribution of the actual ratings and the predictions ratings in testData
predictions.select('rating','prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19444.0,19444.0
mean,3.514246,3.359824
std,1.037591,0.735212
min,0.5,0.061381
25%,3.0,2.922843
50%,3.5,3.435321
75%,4.0,3.878765
max,5.0,5.267329


In [13]:
# Evaluate the ALS model using RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(
    metricName='rmse',
    labelCol='rating', # original label
    predictionCol='prediction' # predicted rating
)


In [14]:
rmse=evaluator.evaluate(predictions)

print('RMSE=',rmse)

('RMSE=', 0.8868207323542452)


In [15]:
# Use the model for movie recommendations

userRecsall=model.recommendForAllUsers(3)
userRecsall



DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

In [16]:
userRecsall.toPandas().head

# This DF contains 3 movie recommendations for every user in the database.
# The recommendations are present in a column which contains a list of tuples where
# Every tuple,contains movieId and corresponding rating.

Unnamed: 0,userId,recommendations
0,471,"[(71899, 5.03348064423), (3379, 4.98671913147)..."
1,463,"[(7842, 5.17101097107), (3379, 4.95255565643),..."
2,496,"[(1354, 5.03072357178), (3134, 5.02348423004),..."
3,148,"[(89904, 4.59168338776), (98491, 4.52277517319..."
4,540,"[(7842, 5.721347332), (1046, 5.21630573273), (..."


In [17]:
movieRecsAll=model.recommendForAllItems(3)
movieRecsAll.toPandas().head()

# The DF contains the movieId and a list of recommendations.
# Recommendations are a list of (userId,rating) tuples.


Unnamed: 0,movieId,recommendations
0,1580,"[(276, 4.59262561798), (53, 4.58105754852), (4..."
1,4900,"[(53, 5.29805088043), (12, 5.26811170578), (59..."
2,5300,"[(53, 4.39893436432), (295, 4.37017059326), (1..."
3,6620,"[(418, 5.30183029175), (147, 5.28994083405), (..."
4,7340,"[(543, 4.65297842026), (43, 4.2081861496), (20..."


In [19]:
# When we use the recommendation model, we are not going to need recommendation for all users in the data set.

# To get recommendations for only a specific set of user , we can do as below.

from pyspark.sql.types import IntegerType
usersList=[148,46,267]
usersDF=spark.createDataFrame(usersList,IntegerType()).toDF('userId')
usersDF.take(3)

# here are the 3 users for whom we want recommendations

[Row(userId=148), Row(userId=46), Row(userId=267)]

In [None]:
# once we have the users, we will call as below.

# top 5 movie recommendations for the 3 users we saw earlier
userRecs=model.recommendForUserSubset(usersDF,5)

# The recommendations are a list of Tuple. Where a tupe in a movie Id and corresponding RatingId

userRecs.toPandas()

Unnamed: 0,userId,recommendations
0,148,"[(89904, 4.59168338776), (98491, 4.52277517319..."
1,267,"[(33649, 5.61535453796), (3200, 5.60720252991)..."
2,46,"[(7842, 5.74811649323), (3379, 5.65375328064),..."


In [22]:
userMoviesList=userRecs.filter(
    userRecs.userId==148
).select('recommendations')

userMoviesList.collect()

# Each row contains recommendation column which contains a list of recommendations
# The recommendations are also row object (movieId,rating)


[Row(recommendations=[Row(movieId=89904, rating=4.591683387756348), Row(movieId=98491, rating=4.522775173187256), Row(movieId=7842, rating=4.520864009857178), Row(movieId=92535, rating=4.457137107849121), Row(movieId=7121, rating=4.424065589904785)])]

In [23]:
# Extract the movie recommendation for this particular user into a list

moviesList = userMoviesList.collect()[0].recommendations
moviesList

[Row(movieId=89904, rating=4.591683387756348),
 Row(movieId=98491, rating=4.522775173187256),
 Row(movieId=7842, rating=4.520864009857178),
 Row(movieId=92535, rating=4.457137107849121),
 Row(movieId=7121, rating=4.424065589904785)]

In [25]:
# Create a DataFrame containing the movieID and estimated rating for that movie in separate columns

moviesDF = spark.createDataFrame( moviesList )
moviesDF.toPandas()


Unnamed: 0,movieId,rating
0,89904,4.591683
1,98491,4.522775
2,7842,4.520864
3,92535,4.457137
4,7121,4.424066


In [27]:
# We have the recommendation for a user in ID form

# To Get the Actual Names of the movies that were recommended

# We need to join this DataFrame with another DataFrame where we load in  the movie name Information

# Use Spark Session and read in the movies.csv file

# wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
# gsutil cp ratings.csv gs://dexdebra-123/datasets
# Use sparkSession to read csv file

moviedata=spark.read \
           .format('csv') \
           .option('header','true') \
           .load('gs://dexdebra-123/datasets/movies.csv')



In [29]:
moviedata.toPandas().head()

# movieId
# title of the movies
# the Genres the movie belongs

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [31]:
# We can do a join operation to get the names of the recommended movies

recommendedMovies = moviedata.join(
                        moviesDF,on=['movieId']) \
                            .orderBy('rating',ascending=False) \
                                .select('title','genres','rating')
recommendedMovies.toPandas()        

Unnamed: 0,title,genres,rating
0,The Artist (2011),Comedy|Drama|Romance,4.591683
1,Paperman (2012),Animation|Comedy|Romance,4.522775
2,Dune (2000),Drama|Fantasy|Sci-Fi,4.520864
3,Louis C.K.: Live at the Beacon Theater (2011),Comedy,4.457137
4,Adam's Rib (1949),Comedy|Romance,4.424066


In [41]:
from pyspark.sql.types import IntegerType

def getRecommendationsForUser(userId,numRecs):
    
    usersDF=spark.createDataFrame([userId],IntegerType()).toDF('userId')
    
    userRecs=model.recommendForUserSubset(usersDF,numRecs)
    
    moviesList=userRecs.collect()[0].recommendations
    
    moviesDF=spark.createDataFrame(moviesList)
    
    recommendedMovies=moviedata.join(moviesDF,on=['movieId'])\
    .orderBy('rating',ascending=False) \
    .select('title','genres','rating')
    return recommendedMovies

In [44]:
recommendationsForUser = getRecommendationsForUser(148,10)

In [45]:
recommendationsForUser.toPandas()

Unnamed: 0,title,genres,rating
0,The Artist (2011),Comedy|Drama|Romance,4.591683
1,Paperman (2012),Animation|Comedy|Romance,4.522775
2,Dune (2000),Drama|Fantasy|Sci-Fi,4.520864
3,Louis C.K.: Live at the Beacon Theater (2011),Comedy,4.457137
4,Adam's Rib (1949),Comedy|Romance,4.424066
5,"Affair to Remember, An (1957)",Drama|Romance,4.406073
6,Waydowntown (2000),Comedy,4.392367
7,Enter the Void (2009),Drama,4.363585
8,"Human Condition III, The (Ningen no joken III)...",Drama|War,4.361141
9,Watermark (2014),Documentary,4.361141
