In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=eebe9456c819725e063a5c95f232e75516d26a970b900712e70497df8d38f6f0
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import os
import random
import sklearn
import matplotlib.pyplot as plt
import pyspark

from pyspark.sql import SparkSession
from pyspark.ml import pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

Firstly, We need to create a spark container by calling SparkSession. This step is necessary before doing anything

In [None]:
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

Pyspark.toPandas() method allow us to return SparkDataFrame as Pandas table display


In [None]:
df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/11 - Kaggle Practicing/4 - Big Data with Spark (Movie Recommendation)/movie_ratings_df.csv',inferSchema=True,header=True)
df.limit(5).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5
3,154,Kolya (1996),3
4,306,Kolya (1996),5


Our task: given a user, we predict and return a list of movies recommendation for that user to watch.


We use: printSchema() to quick overview of features datatype

In [None]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
# Applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)
#creating new dataframe with transformed values
indexed = model.transform(df)
#validate the numerical title values
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [None]:
indexed.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)



In [None]:
#split the data into training and test
train,test = indexed.randomSplit([0.75,0.25])
from pyspark.ml.recommendation import ALS

#Training the recommender model using train dataset
rec = ALS(maxIter=10,
          regParam=0.01,
          userCol='userId',
          itemCol='title_new',
          ratingCol='rating',
          nonnegative=True,
          coldStartStrategy='drop')

#fit the model on the train set
rec_model = rec.fit(train)

#making predictions on test set
predicted_ratings = rec_model.transform(test)
predicted_ratings.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,148,Amadeus (1984),1,50.0,3.404798
1,148,Back to the Future (1985),3,20.0,3.710519
2,148,Dr. Strangelove or: How I Learned to Stop Worr...,5,123.0,3.747349
3,148,Forrest Gump (1994),5,27.0,2.285439
4,148,Gone with the Wind (1939),5,162.0,3.471748


## Evaluate the Training

In [None]:
# Importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator
#create Regressor evaluator object for measuring accuracy
evaluator = RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
#Apply the RE on predictions dataframe to calculate RMSE
rmse = evaluator.evaluate(predicted_ratings)
print(rmse)

1.0186297375163298


After training, now is the time to recommend top movies which user might like


In [None]:
# First we need to create dataset of all distinct movies
unique_movies=indexed.select('title_new').distinct()

#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like

    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')

    #creating another dataframe which contains already watched movie by active user
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')

    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')

    #joining both tables on left join
    total_movies = a.join(b, a.title_new == b.title_new,how='left')

    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()


    #adding new column of user_Id of active useer to remaining movies df
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))


    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)


    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)

    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [None]:
# Test: recommend 5 movies for user of id=60
top_movies(60,5)

+---------+------+----------+--------------------------------------+
|title_new|userId|prediction|title                                 |
+---------+------+----------+--------------------------------------+
|1347.0   |60    |6.181151  |Angel Baby (1995)                     |
|1411.0   |60    |6.019464  |Boys, Les (1997)                      |
|1289.0   |60    |5.851451  |World of Apu, The (Apur Sansar) (1959)|
|1277.0   |60    |5.8318987 |Mina Tannenbaum (1994)                |
|1288.0   |60    |5.8098354 |Whole Wide World, The (1996)          |
+---------+------+----------+--------------------------------------+

