## SparkSession Object 

In [1]:
# Import sparksession 
# Creat new spark session object to use spark 

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('lin_reg').getOrCreate()

## Read the dataset 

In [2]:
# Load and read the dataset within spark using a dataframe 

df=spark.read.csv('./Data/movie_ratings_df.csv',inferSchema=True,header=True)

In [3]:
# The size of our dataset 

print((df.count(), len(df.columns)))

(100000, 3)


In [4]:
# Check the datatype in case we need to change some of them 
# We need to have user_id and Title in a numerical form, hence we will change the title type 

df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [5]:
# Apply some filters to the data 

df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False) 

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [6]:
# Apply some filters to the data 
# The user with the highest number of records has rated 737 movies, andeach user has rated at least 20 movies.

df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|636   |20   |
|572   |20   |
|93    |20   |
|685   |20   |
|631   |20   |
|596   |20   |
|34    |20   |
|300   |20   |
|926   |20   |
+------+-----+
only showing top 10 rows



In [7]:
# Apply some filters to the data 
# The movie with highest number of ratings is Star Wars (1977) and has been rated 583 times, and each movie has been rated by at least by 1 user.

df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



## Feature Engineering 

In [8]:
# Convert the movie title column from categorical to numerical values using StringIndexer. 
# Create the stringindexer object by mentioning the input

column and output column
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer,IndexToString

stringIndexer = StringIndexer(inputCol="title",outputCol="title_new")
model = stringIndexer.fit(df)
indexed = model.transform(df)


In [9]:
indexed.show(25)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
|    35|Kolya (1996)|     2|    287.0|
|   354|Kolya (1996)|     5|    287.0|
|   199|Kolya (1996)|     5|    287.0|
|   113|Kolya (1996)|     2|    287.0|
|     1|Kolya (1996)|     5|    287.0|
|   173|Kolya (1996)|     5|    287.0|
|   360|Kolya (1996)|     4|    287.0|
|   234|Kolya (1996)|     4|    287.0|
|    14|Kolya (1996)|     4|    287.0|
|   309|Kolya (1996)|     4|    287.0|
|   331|Kolya (1996)|     4|    287.0|
|    21|Kolya (1996)|     3|    287.0|
|   111|Kolya (1996)|    

In [10]:
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



## Splitting the Dataset

In [11]:
#split it into a 75 to 25 ratio to train the model and test its accuracy

train,test=indexed.randomSplit([0.75,0.25])

In [12]:
 train.count()

75093

In [13]:
test.count()

24907

## Build and Train 

In [14]:
# Import the ALS function from the PySpark ml library and build the model on the training dataset

from pyspark.ml.recommendation import ALS
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

#  nonnegative =‘True’ doesn’t create negative ratings 
#  coldStartStrategy=‘drop’ to prevent any NaN ratings 

rec_model=rec.fit(train)

## Prediction on test data 


In [15]:
# We use the transform function to make predictions on the test data and RegressionEvaluate to check the RMSE value of the model on test data.

predicted_ratings=rec_model.transform(test)

In [16]:
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [17]:
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   141|River Wild, The (...|     4|    213.0| 3.9563932|
|   311|Blues Brothers, T...|     3|     64.0|  4.284288|
|   328| M. Butterfly (1993)|     2|    967.0| 1.3779681|
|    90| Pulp Fiction (1994)|     5|     12.0| 3.6245391|
|   562|Butch Cassidy and...|     4|    101.0| 3.8343136|
|   450|Bye Bye, Love (1995)|     4|   1005.0|  4.301024|
|   936|Joe's Apartment (...|     1|    643.0| 2.8558931|
|    87|Blues Brothers, T...|     5|     64.0| 4.6309505|
|   311|Don Juan DeMarco ...|     4|    435.0| 2.8511434|
|   721|          187 (1997)|     3|    684.0| 3.8394866|
+------+--------------------+------+---------+----------+
only showing top 10 rows



## Evaluatiuon metrics 


In [20]:
# Check the performance of themodel on unseen or test data

from pyspark.ml.evaluation import RegressionEvaluator

evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
rmse=evaluator.evaluate(predicted_ratings)


In [21]:
print(rmse)

1.0219132500803982


## Recommend Top fils that users might like 

In [22]:
unique_movies=indexed.select('title_new').distinct()
unique_movies.count()
# we have 1664 distinct movies 

1664

In [23]:
 a = unique_movies.alias('a')

In [24]:
# Select a user for which we want to recommend other movies 
user_id=85

In [25]:
#  filter the movies that this active user has already rated or seen
watched_movies=indexed.filter(indexed['userId'] ==user_id).select('title_new').distinct()

In [27]:
watched_movies.count()

287

In [28]:
b=watched_movies.alias('b')


In [29]:
#  recommend movies from the remaining 1664-287 = 1,377 items.
total_movies = a.join(b, a.title_new == b.title_new,how='left')

In [30]:
 total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|305.0    |305.0    |
|596.0    |null     |
|299.0    |null     |
|769.0    |null     |
|692.0    |null     |
|934.0    |null     |
|1051.0   |null     |
|496.0    |null     |
|558.0    |558.0    |
|170.0    |null     |
+---------+---------+
only showing top 10 rows



In [31]:
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [32]:
remaining_movies.count()

1377

In [33]:
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))

In [34]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|596.0    |85    |
|299.0    |85    |
|769.0    |85    |
|692.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|496.0    |85    |
|170.0    |85    |
|184.0    |85    |
|576.0    |85    |
+---------+------+
only showing top 10 rows



## Make a prediction using the recommender model 

In [35]:
# Make the predictions on this remaining movie’s dataset for the active user using the recommender model that we built 

recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [36]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1277.0   |85    |5.95487   |
|1411.0   |85    |5.816051  |
|1120.0   |85    |5.5455956 |
|1353.0   |85    |5.1029277 |
|926.0    |85    |5.053426  |
+---------+------+----------+
only showing top 5 rows



In [37]:
# the movie title 1277 and 1411 have the highest rating for the user 85 
# add column title 
movie_title = IndexToString(inputCol="title_new",outputCol="title",labels=model.labels)
final_recommendations=movie_title.transform(recommendations)

In [38]:
final_recommendations.show(10,False)


+---------+------+----------+---------------------------------------+
|title_new|userId|prediction|title                                  |
+---------+------+----------+---------------------------------------+
|1277.0   |85    |5.95487   |Mina Tannenbaum (1994)                 |
|1411.0   |85    |5.816051  |Boys, Les (1997)                       |
|1120.0   |85    |5.5455956 |Crooklyn (1994)                        |
|1353.0   |85    |5.1029277 |C'est arriv� pr�s de chez vous (1992)  |
|926.0    |85    |5.053426  |Ma vie en rose (My Life in Pink) (1997)|
|1207.0   |85    |5.0477386 |Aparajito (1956)                       |
|1347.0   |85    |5.0462446 |Angel Baby (1995)                      |
|1463.0   |85    |4.6538796 |American Dream (1990)                  |
|1542.0   |85    |4.6372633 |Brothers in Trouble (1995)             |
|1470.0   |85    |4.6372633 |Butcher Boy, The (1998)                |
+---------+------+----------+---------------------------------------+
only showing top 10 