In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/movierating/movie_ratings_df.csv


In [2]:
!pip install pyspark 
!pip install sklearn

[0m

In [3]:
import numpy as np  # linear algebra
import pandas as pd  # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [4]:
spark = SparkSession.builder.appName("movie_recommender").getOrCreate() 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/16 13:28:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv('/kaggle/input/movierating', inferSchema = True, header=True) 

df.limit(3).toPandas() 
#limit will only show schema - toPandas() shows table


                                                                                

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5


Examining the features in the data-set

In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



Because 'useId' + 'rating' = integer --> we need to convert title (dtype = string) into something more readable by the ML processor.

In [7]:
from pyspark.ml.feature import StringIndexer, IndexToString

In [10]:
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new") 
model = stringIndexer.fit(df) 
indexed = model.transform(df) 
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [13]:
#using alternating least squares (ALS) to predict ratings; ALS solves for matrix optimisation

train, test = indexed.randomSplit([0.75, 0.25])
from pyspark.ml.recommendation import ALS 

rec = ALS (maxIter = 10, 
          regParam = 0.01, 
          userCol ='userId', 
          ratingCol = 'rating', 
          itemCol = 'title_new', 
          nonnegative = True,
          coldStartStrategy = "drop") #we define the ALS process. 

rec_model = rec.fit(train) 

predicted_model = rec_model.transform(test) 
predicted_model.limit(5).toPandas() 

23/04/16 13:39:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

Unnamed: 0,userId,title,rating,title_new,prediction
0,148,Around the World in 80 Days (1956),4,540.0,1.958437
1,148,Beauty and the Beast (1991),4,114.0,3.639122
2,148,Being There (1979),5,290.0,3.557612
3,148,Cinderella (1950),3,243.0,3.276735
4,148,Fantasia (1940),5,153.0,4.773097


## Evaluating Trained Model

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

In [17]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                               labelCol='rating',
                              metricName = "rmse") 

#regression evaluator caluclates RMSE (root mean-squared error - fitted line) - what is a good score? 
rmse = evaluator.evaluate(predicted_model) 
print(rmse)

                                                                                

1.0294612729134853


### Predicting what User Might Like

Creating the function Top_movies to recommend userId with movie rated by model. 

In [21]:
# all unique movies in dataset
unique_movies = indexed.select('title_new').distinct() 
#unique_movies.limit(5).toPandas()

def top_movies(user_id, n): 
    a = unique_movies.alias('a') #alias unique names
    
    watched_movies = indexed.filter(indexed['userId'] == user_id).select("title_new") #select user
    b = watched_movies.alias('b')
    
    total_movies = a.join(b, a.title_new == b.title_new, how='left') #comparative point
    
    #new df to show unwatched movies for all user
    unwatched_movies = (total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()) 
    
    #matching selected userId to new df index
    unwatched_movies = unwatched_movies.withColumn("userId", lit(int(user_id)))
    
    #use model to find rec, order by descending and selecting only n entries
    recommendations = (
        rec_model.transform(unwatched_movies)
        .orderBy("prediction", ascending=False)
        .limit(n)
    )
    
    #converting movie index back to string - creat object + transform
    movie_title = IndexToString(
        inputCol="title_new", outputCol="title", labels=model.labels
    )
    final_recommendations = movie_title.transform(recommendations)

    # return the recommendations to active user
    return final_recommendations.show(n, False)

In [23]:
top_movies(60, 4)

                                                                                

+---------+------+----------+----------------------+
|title_new|userId|prediction|title                 |
+---------+------+----------+----------------------+
|1347.0   |60    |6.6166396 |Angel Baby (1995)     |
|1411.0   |60    |5.980855  |Boys, Les (1997)      |
|1277.0   |60    |5.9031873 |Mina Tannenbaum (1994)|
|1057.0   |60    |5.8961306 |Safe (1995)           |
+---------+------+----------+----------------------+

