### Download dataset
<b>Dataset location: </b>http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Use Collaborative Filtering for movie recommendations') \
    .getOrCreate()

rawData = spark.read\
            .format('csv')\
            .option('header', 'true')\
            .load('../JunYang/movieData/ratings.csv')

In [2]:
rawData.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,31,2.5,1260759144
1,1,1029,3.0,1260759179
2,1,1061,3.0,1260759182
3,1,1129,2.0,1260759185
4,1,1172,4.0,1260759205


#### Pick all columns except the timestamp

In [3]:
from pyspark.sql.functions import col

dataset = rawData.select(col('userId').cast('int'), 
                         col('movieId').cast('int'), 
                         col('rating').cast('float')
                        )

dataset.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,31,2.5
1,1,1029,3.0
2,1,1061,3.0
3,1,1129,2.0
4,1,1172,4.0


#### Check the distribution of rating in the dataset

In [6]:
dataset.select('rating').toPandas().describe()

Unnamed: 0,rating
count,100004.0
mean,3.543608
std,1.058048
min,0.5
25%,3.0
50%,4.0
75%,4.0
max,5.0


#### Split into training and test data sets

In [7]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2])

### Define the Collaborative Filtering model
Uses the Alternating Least Squares algorithm to learn the latent factors
* <b>maxIter: </b>The maximum number of iterations to run
* <b>regParam: </b>Specifies the regularization parameter in ALS (defaults to 1.0)
* <b>coldStartStrategy: </b> Strategy for handling unknown or new users/items during prediction (which was not encountered in training). Options are 'drop' and 'nan'. We will drop unknown users/items from the predictions

In [8]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5, 
          regParam=0.1, 
          userCol='userId', 
          itemCol='movieId', 
          ratingCol='rating',
          coldStartStrategy='drop')

#### Build the ALSModel using the model definition and training data

In [9]:
model = als.fit(trainingData)

#### Get the predictions for the test data

In [10]:
predictions = model.transform(testData)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,prediction
0,242,463,4.0,3.879403
1,311,463,3.0,3.123105
2,285,471,5.0,3.543239
3,491,471,3.0,3.737683
4,19,471,3.0,3.953464


#### Compare the distribution of values for ratings and predictions

In [11]:
predictions.select('rating', 'prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19208.0,19208.0
mean,3.557736,3.385517
std,1.05758,0.752815
min,0.5,-0.197148
25%,3.0,2.942681
50%,4.0,3.465712
75%,4.0,3.911253
max,5.0,5.884375


#### Get the Root Mean Square Error on the test data

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', 
                                labelCol='rating',
                                predictionCol='prediction')

rmse = evaluator.evaluate(predictions)
print('RMSE = ', rmse)

RMSE =  0.9339184097772186


#### The ALS model can be used to get predictions for all users
Specify the number of predictions you would like for each user

In [13]:
userRecsAll = model.recommendForAllUsers(3)
userRecsAll

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

#### View the recommendations
For each userId there is a list of tuples representing a movieId and it's rating for the user

In [15]:
userRecsAll.toPandas().head()

Unnamed: 0,userId,recommendations
0,471,"[(121231, 4.998778343200684), (1859, 4.9381785..."
1,463,"[(67504, 4.866992473602295), (83411, 4.8669924..."
2,496,"[(1050, 5.5590620040893555), (26472, 5.3831639..."
3,148,"[(67504, 5.4773383140563965), (83411, 5.477338..."
4,540,"[(5792, 5.982637405395508), (3696, 5.813047885..."


#### Get the top user recommendations for each movie
* The users who are most likely to like a particular movie
* Get the top 3 users

In [19]:
movieRecsAll = model.recommendForAllItems(3)
movieRecsAll.toPandas().head()

Unnamed: 0,movieId,recommendations
0,1580,"[(46, 4.901529312133789), (543, 4.781454086303..."
1,5300,"[(46, 5.764642715454102), (622, 5.697394847869..."
2,6620,"[(653, 4.898347854614258), (357, 4.86716842651..."
3,7340,"[(46, 4.980098724365234), (517, 4.800014019012..."
4,32460,"[(156, 5.1172194480896), (298, 4.9101376533508..."


#### Get recommendations for a subset of users
* Start off by creating a list of users who make up our subset
* Convert that list to a dataframe which will be used shortly

In [16]:
from pyspark.sql.types import IntegerType

usersList = [148, 463, 267]
usersDF = spark.createDataFrame(usersList, IntegerType()).toDF('userId')

usersDF.take(3)

[Row(userId=148), Row(userId=463), Row(userId=267)]

#### Use the recommendForUserSubset function
This gets the recommendations for specific users

In [17]:
userRecs = model.recommendForUserSubset(usersDF, 5)
userRecs.toPandas()

Unnamed: 0,userId,recommendations
0,148,"[(67504, 5.4773383140563965), (83411, 5.477338..."
1,463,"[(67504, 4.866992473602295), (83411, 4.8669924..."
2,267,"[(67504, 5.4699273109436035), (83411, 5.469927..."


#### Extract recommendations for specific user
* We get a list comprising a Row object which in turn contains a list of Rows
* To get the movie names from the movieIds so we will need to perform some transformations

In [20]:
userMoviesList = userRecs.filter(userRecs.userId == 148)\
.select('recommendations')

userMoviesList.collect()

[Row(recommendations=[Row(movieId=67504, rating=5.4773383140563965), Row(movieId=83411, rating=5.4773383140563965), Row(movieId=83318, rating=5.4773383140563965), Row(movieId=1859, rating=5.1282806396484375), Row(movieId=32525, rating=5.099239349365234)])]

#### Extract the list of recommendations
We get the list of Rows contining the movieId and rating for the user

In [21]:
moviesList = userMoviesList.collect()[0].recommendations
moviesList

[Row(movieId=67504, rating=5.4773383140563965),
 Row(movieId=83411, rating=5.4773383140563965),
 Row(movieId=83318, rating=5.4773383140563965),
 Row(movieId=1859, rating=5.1282806396484375),
 Row(movieId=32525, rating=5.099239349365234)]

#### Create a DataFrame containing the movieId and rating as columns
Use the moviesList created previously

In [22]:
moviesDF = spark.createDataFrame(moviesList)
moviesDF.toPandas()

Unnamed: 0,movieId,rating
0,67504,5.477338
1,83411,5.477338
2,83318,5.477338
3,1859,5.128281
4,32525,5.099239


#### The movie names are stored in a csv file called movies.csv
Load that into another dataframe

In [23]:
movieData = sqlContext.read.csv('../JunYang/movieData/movies.csv',
                              header=True,
                              ignoreLeadingWhiteSpace= True)
movieData.toPandas().head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [24]:
recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
.orderBy('rating', ascending=False)\
.select('title', 'genres', 'rating')

recommendedMovies.toPandas()

Unnamed: 0,title,genres,rating
0,Land of Silence and Darkness (Land des Schweig...,Documentary,5.477338
1,Cops (1922),Comedy,5.477338
2,"Goat, The (1921)",Comedy,5.477338
3,Taste of Cherry (Ta'm e guilass) (1997),Drama,5.128281
4,The Earrings of Madame de... (1953),Drama|Romance,5.099239


In [25]:
from pyspark.sql.types import IntegerType

def getRecommendationsForUser(userId, numRecs):
    
    usersDF = spark.\
    createDataFrame([userId], IntegerType()).\
    toDF('userId')
    
    userRecs = model.recommendForUserSubset(usersDF, numRecs)
    
    moviesList = userRecs.collect()[0].recommendations
    moviesDF = spark.createDataFrame(moviesList)
    
    recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
    .orderBy('rating', ascending=False)\
    .select('title', 'genres', 'rating')
    
    return recommendedMovies

In [27]:
recommendationsForUser = getRecommendationsForUser(219, 10)
recommendationsForUser.toPandas()

Unnamed: 0,title,genres,rating
0,Albino Alligator (1996),Crime|Thriller,5.931947
1,Faster Pussycat! Kill! Kill! (1965),Action|Crime|Drama,5.779367
2,The Lair of the White Worm (1988),Comedy|Horror,5.510232
3,Love Is a Many-Splendored Thing (1955),Drama|Romance|War,5.46909
4,Dangerous Beauty (1998),Drama,5.457493
5,"Bad and the Beautiful, The (1952)",Drama,5.339161
6,Benji (1974),Adventure|Children,5.324603
7,Cronos (1993),Drama|Horror,5.322022
8,Shag (1989),Comedy|Drama,5.311676
9,May (2002),Drama|Horror,5.273395
