In [1]:
!pip3 install pyspark



In [2]:
!pip install --upgrade pip



In [1]:
#spark session and SQL imports

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean,col,when,lit,split,regexp_extract

In [2]:
#SQL features and ml evaluation import

from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Creating Spark Session

In [3]:
#Spark session creation
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

23/02/20 11:42:46 WARN Utils: Your hostname, Poojas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.130.201.5 instead (on interface en0)
23/02/20 11:42:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/20 11:42:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#reading the data and displaying it in pandas table format

df = spark.read.csv('movie_ratings_df.csv',inferSchema = True,header = True)

                                                                                

#### displaying data using limit & show methods in PySpark

In [5]:
df.limit(5).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5
3,154,Kolya (1996),3
4,306,Kolya (1996),5


In [6]:
df.show(5)

+------+------------+------+
|userId|       title|rating|
+------+------------+------+
|   196|Kolya (1996)|     3|
|    63|Kolya (1996)|     3|
|   226|Kolya (1996)|     5|
|   154|Kolya (1996)|     3|
|   306|Kolya (1996)|     5|
+------+------------+------+
only showing top 5 rows



In [7]:
df.show(3,vertical = True)

-RECORD 0--------------
 userId | 196          
 title  | Kolya (1996) 
 rating | 3            
-RECORD 1--------------
 userId | 63           
 title  | Kolya (1996) 
 rating | 3            
-RECORD 2--------------
 userId | 226          
 title  | Kolya (1996) 
 rating | 5            
only showing top 3 rows



### Goal: Given a user predict and return a list of movie recommendations for that user to watch

In [8]:
#printing the schema of the dataframe

df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



#### Since title is stored as a sting. To work with pyspark Mlib library, we need to convert string type to numeric values¶


In [9]:
from pyspark.ml.feature import IndexToString

#### creating a StringIndexer model

In [10]:
stringIndex = StringIndexer(inputCol = 'title',outputCol = 'title_new_indexed')

#### Fitting the string indexer model on a dataframe. fit() returns a transformer

In [11]:
model = stringIndex.fit(df)

                                                                                

#### Using the returned transformer to transform the dataframe. It returns a new dataframe

In [12]:
indexed_df = model.transform(df)

#### Displaying the first and last rows of the transformed dataframe

In [13]:
indexed_df.tail(3)

[Row(userId=655, title='Girls Town (1996)', rating=3, title_new_indexed=1568.0),
 Row(userId=655, title='Silence of the Palace, The (Saimt el Qusur) (1994)', rating=3, title_new_indexed=1628.0),
 Row(userId=655, title='Dadetown (1995)', rating=3, title_new_indexed=1549.0)]

In [14]:
indexed_df.head(3)

[Row(userId=196, title='Kolya (1996)', rating=3, title_new_indexed=287.0),
 Row(userId=63, title='Kolya (1996)', rating=3, title_new_indexed=287.0),
 Row(userId=226, title='Kolya (1996)', rating=5, title_new_indexed=287.0)]

#### Splitting the data into train and test

In [15]:
train,test = indexed_df.randomSplit([0.75,0.25])

In [16]:
from pyspark.ml.recommendation import ALS


#### Creating an ALS model for recommender system

In [17]:
als_model = ALS(maxIter =10, regParam = 0.01,userCol = 'userId',
                itemCol = 'title_new_indexed',ratingCol = 'rating',
                nonnegative = True,
                coldStartStrategy = 'drop')

#### Training the model with the test data

In [18]:
recommender = als_model.fit(train) #training the model using fit function

                                                                                

23/02/20 11:43:36 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/02/20 11:43:36 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


#### Getting the predictions on the test data

In [19]:
predictions = recommender.transform(test) #in PySpark instead of predict there's a transform function which does prediction

In [20]:
predictions.limit(5).toPandas()

                                                                                

Unnamed: 0,userId,title,rating,title_new_indexed,prediction
0,148,2001: A Space Odyssey (1968),5,59.0,3.161632
1,148,Amadeus (1984),1,50.0,3.644425
2,148,Being There (1979),5,290.0,2.904923
3,148,"Close Shave, A (1995)",5,302.0,4.215456
4,148,Cold Comfort Farm (1995),5,262.0,4.560448


#### Calculating RMSE

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

In [22]:
evaluator = RegressionEvaluator(metricName = 'rmse',predictionCol = 'prediction',labelCol = 'rating')

In [23]:
rmse = evaluator.evaluate(predictions)

                                                                                

In [24]:
rmse

1.025680050443128

#### Reccomending movies

##### Creating dataframe of movie names without repetation 

In [25]:
unique_movies =  indexed_df.select('title_new_indexed').distinct()

In [47]:
# creating fucntion for recommending top n moviess for any particular user.

def top_movies(user_id,n):
    #aliasing the unique dataframe as 'a'
    a = unique_movies.alias('a')
    
    #getting the titles of the movie which the specified user has already watched 
    watched_movies = indexed_df.filter(indexed_df['userId']==user_id).select('title_new_indexed')
    
    #aliasing the watched movies dataframe as 'b'
    b = watched_movies.alias('b')
    
    #Left joining all the movies for the specified user id 
    total_movies = a.join(b, a.title_new_indexed == b.title_new_indexed, how = 'left')
    
    remaining_movies = total_movies.where(col("b.title_new_indexed").isNull()).select(a.title_new_indexed).distinct() 
    
    #adding a new column to remaining movies df of active users
    remaining_movies = remaining_movies.withColumn("userId",lit(int(user_id)))
    
    recommendation = recommender.transform(remaining_movies).orderBy('prediction',ascending =False).limit(n)
    
    movie_title = IndexToString(inputCol = 'title_new_indexed',outputCol = 'title', labels = model.labels)
    final_recommendations = movie_title.transform(recommendation)
    
    return final_recommendations.show(n,False)
    
    
    

In [50]:
top_movies(150,5)

+-----------------+------+----------+---------------------------+
|title_new_indexed|userId|prediction|title                      |
+-----------------+------+----------+---------------------------+
|1220.0           |150   |8.406602  |Horse Whisperer, The (1998)|
|1173.0           |150   |8.3096    |True Crime (1995)          |
|1124.0           |150   |6.777126  |Fresh (1994)               |
|845.0            |150   |6.7490425 |Pillow Book, The (1995)    |
|1122.0           |150   |6.35596   |Faithful (1996)            |
+-----------------+------+----------+---------------------------+

