In [1]:
#imports
from pyspark.sql import SparkSession


In [2]:
#Create Session
spark = SparkSession.builder.appName('movie').getOrCreate()

23/09/11 16:17:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
#read data
data = spark.read.csv('movie_ratings_df.csv', inferSchema=True, header=True)
data.head()


Row(userId=196, title='Kolya (1996)', rating=3)

In [4]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



The title is now stored as string, we need to convert by StringIndexer. 

In [5]:
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='title', outputCol='num_title')

model = stringIndexer.fit(data)

indexed_df = model.transform(data)


In [7]:
indexed_df.show(5)
indexed_df.printSchema()

+------+------------+------+---------+
|userId|       title|rating|num_title|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
+------+------------+------+---------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- num_title: double (nullable = false)



In [8]:
#ALS (Alternating least squares) will be used 
from pyspark.ml.recommendation import ALS

In [9]:
#Split 70%, 30% for training and testing set
train_set, test_set = indexed_df.randomSplit([0.7,0.3])

In [12]:
#Create ALS model
recommendation = ALS(maxIter=10, regParam=0.01, userCol='userId', ratingCol='rating', itemCol='num_title', 
                     nonnegative=True, coldStartStrategy='drop')

In [13]:
#fit the model witl training set
rec_model = recommendation.fit(train_set)

23/09/11 16:25:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/09/11 16:25:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [14]:
#Making the prediction with testing set
predictions = rec_model.transform(test_set)

predictions.show(5)

+------+--------------------+------+---------+----------+
|userId|               title|rating|num_title|prediction|
+------+--------------------+------+---------+----------+
|   148|Around the World ...|     4|    540.0| 2.7131686|
|   148|         Babe (1995)|     4|     96.0| 4.3552203|
|   148|  Being There (1979)|     5|    290.0| 4.5945396|
|   148|       Brazil (1985)|     4|    109.0| 5.3253975|
|   148|     Fantasia (1940)|     5|    153.0| 5.7669916|
+------+--------------------+------+---------+----------+
only showing top 5 rows



In [18]:
#Evaluate the model with RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(predictionCol='prediction', labelCol='rating', metricName='rmse')

#apply the evaluation to the predictions result above
rmse = eval.evaluate(predictions)
print('RMSE value:',rmse)

RMSE value: 1.0425300215296913


In [28]:
#Now create top N films to recommend to a specific userId
#Create unique list of all films

unique_film_list = indexed_df.select('num_title').distinct()
print('Total uniqued film', unique_film_list.count())

Total uniqued film 1664


In [66]:
#create a function get inputs (userId and N=number of recommended films), return a list of top recommend film that user have 
#not give rating
from pyspark.sql.functions import col, split, lit

def top_n_movies(userId, n):
    #a df for unique films
    left = unique_film_list.alias('left')
    
    #create list of watched film
    watched_film = indexed_df.filter(indexed_df['userId'] == userId).select('num_title')
    
    right = watched_film.alias('right')
    
    #left joining two dataframe 
    all_movies = left.join(right, left.num_title == right.num_title, how='left')
    
    #remaining films 
    remaining_film = all_movies.where(col('right.num_title').isNull()).select(left.num_title).distinct()
    
    #add user_id
    remaining_film = remaining_film.withColumn('userId', lit(int(userId)))
    
    #add recommend values into the remaining list
    recommendations = rec_model.transform(remaining_film).orderBy('prediction', ascending=False).limit(n)
    
    #add title column back, the 'model.labels' is from the step of StringIndexer above
    movie_title = IndexToString(inputCol='num_title', outputCol='title', labels=model.labels)
    final_recommendations = movie_title.transform(recommendations)
    
    return final_recommendations.select('title','prediction').show(n,False)
    
    

In [67]:
top_n_movies(45,15)

+-----------------------------------------------------------+----------+
|title                                                      |prediction|
+-----------------------------------------------------------+----------+
|Paradise Lost: The Child Murders at Robin Hood Hills (1996)|7.8412967 |
|City of Industry (1997)                                    |6.974758  |
|Vanya on 42nd Street (1994)                                |6.3698115 |
|War Room, The (1993)                                       |6.253435  |
|Angel Baby (1995)                                          |6.174093  |
|Paths of Glory (1957)                                      |6.028388  |
|Dangerous Beauty (1998)                                    |5.9814606 |
|Widows' Peak (1994)                                        |5.9168186 |
|Persuasion (1995)                                          |5.8225775 |
|Jane Eyre (1996)                                           |5.8200393 |
|Pather Panchali (1955)                            