In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [2]:
# Create SparkSession
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

24/10/10 15:38:42 WARN Utils: Your hostname, Enchengs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.28.234.104 instead (on interface en0)
24/10/10 15:38:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/10 15:38:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [7]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [10]:
from pyspark.ml.feature import StringIndexer, IndexToString

StringIndexer = StringIndexer(inputCol='title', outputCol='digital_title')
model = StringIndexer.fit(df)

new_df = model.transform(df)
new_df.limit(10).toPandas()

Unnamed: 0,userId,title,rating,digital_title
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0
5,296,Kolya (1996),4,287.0
6,34,Kolya (1996),5,287.0
7,271,Kolya (1996),4,287.0
8,201,Kolya (1996),4,287.0
9,209,Kolya (1996),4,287.0


In [23]:
train, test = new_df.randomSplit([0.75, 0.25]) # split train and test sets
from pyspark.ml.recommendation import ALS


rec=ALS( maxIter=10
        ,regParam=0.01
        ,userCol='userId'
        ,itemCol='digital_title'
        ,ratingCol='rating'
        ,nonnegative=True
        ,coldStartStrategy="drop")

rec_model = rec.fit(train) # fit the model on train set


predicted_ratings = rec_model.transform(test) # make preds on test set
predicted_ratings.limit(10).toPandas()



Unnamed: 0,userId,title,rating,digital_title,prediction
0,148,Amadeus (1984),1,50.0,4.129126
1,148,"Deer Hunter, The (1978)",1,280.0,5.213006
2,148,"Empire Strikes Back, The (1980)",5,18.0,4.165876
3,148,Fantasia (1940),5,153.0,4.73739
4,148,Free Willy (1993),1,761.0,3.512775
5,148,"Grand Day Out, A (1992)",4,494.0,5.011035
6,148,Jaws (1975),3,49.0,3.800406
7,148,Othello (1995),3,453.0,4.710493
8,148,Pink Floyd - The Wall (1982),5,298.0,3.335729
9,148,Psycho (1960),1,79.0,3.447943


In [14]:
# Importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator


evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')

rmse = evaluator.evaluate(predicted_ratings)
print(rmse)
# 1.023 is reasonably well in scale of 1 to 5

1.023716715471145


In [30]:
unique_moives = new_df.select('digital_title').distinct() # all distinct digital titles


def top_movies(specific_user_id, n):
    """
    this func returns top n movies that user_id has not seen but might like
    """
    a = unique_moives.alias('a') # name unique movies as a

    watched_movies = new_df.filter(new_df['userId'] == specific_user_id).select('digital_title')
    # given specific user id, then get its watched digital titles(movie titles)

    b = watched_movies.alias('b') # name watched movies as b

    total_movies = a.join(b, a.digital_title == b.digital_title, how='left')
    # SQL: select * from a left join b using (digital_title)
    # keep all movies both in a and b, but unwatched movies has NULL is b.col

    unwatched_movies = total_movies.where(col("b.digital_title").isNull()).select(a.digital_title).distinct()
    # apply .distinct() here due to duplicate items in b


    unwatched_movies = unwatched_movies.withColumn("userId", lit(int(specific_user_id)))
    # add a new col besides col:movies with value:specific_user_id(parameter)


    recommendations = rec_model.transform(unwatched_movies).orderBy('prediction', ascending=False).limit(n)
    # use rec_model from last chunk to rate unwatched movies

    title_transformer = IndexToString(inputCol="digital_title", outputCol="title", labels=model.labels)

    final_recommendations = title_transformer.transform(recommendations)

    return final_recommendations.show(n, False)









In [31]:
top_movies(60,5)

                                                                                

+-------------+------+----------+----------------------+
|digital_title|userId|prediction|title                 |
+-------------+------+----------+----------------------+
|1277.0       |60    |6.0959373 |Mina Tannenbaum (1994)|
|1120.0       |60    |5.8247786 |Crooklyn (1994)       |
|800.0        |60    |5.6524196 |Beautiful Thing (1996)|
|950.0        |60    |5.6385283 |Amateur (1994)        |
|657.0        |60    |5.5957384 |Persuasion (1995)     |
+-------------+------+----------+----------------------+

