In [1]:
"""
Collaborative Filtering ALS Recommender System using Spark MLlib adapted from
the Spark Summit 2014 Recommender System training example.

Developed By: Pranav Masariya
Supervisor: Dr. Magdalini Eirinaki
"""

import os
import numpy as np
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

from pyspark.mllib.recommendation import ALS
from pyspark.ml.recommendation import ALS as mlals
from pyspark.ml.evaluation import RegressionEvaluator

import math
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Calling spark session to register application
spark = SparkSession \
    .builder \
    .appName("Recom") \
    .config("spark.recom.demo", "1") \
    .getOrCreate()

In [3]:
"""
Loading and Parsing Dataset
    Each line in the ratings dataset (ratings.csv) is formatted as:
         userId,movieId,rating,timestamp
    Each line in the movies (movies.csv) dataset is formatted as:
        movieId,title,genres

""" 

# Load ratings
ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("ratings.csv")

In [4]:
ratings_df

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [5]:
"""
For the simplicity of this tutorial
    For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating). 
    We drop the timestamp because we do not need it for this recommender.
"""

#ratings_df = ratings_df.drop('timestamp')
ratings_df.show(30)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
| 12882|      1|   4.0|1147195252|
| 12882|     32|   3.5|1147195307|
| 12882|     47|   5.0|1147195343|
| 12882|     50|   5.0|1147185499|
| 12882|    110|   4.5|1147195239|
| 12882|    150|   3.5|1147195267|
| 12882|    158|   2.0|1147185180|
| 12882|    165|   4.0|1147195325|
| 12882|    260|   4.0|1147195260|
| 12882|    296|   5.0|1147195153|
| 12882|    318|   5.0|1147195162|
| 12882|    356|   5.0|1147185487|
| 12882|    364|   3.5|1147195899|
| 12882|    380|   2.5|1147195276|
| 12882|    457|   4.0|1147195271|
| 12882|    480|   3.5|1147185483|
| 12882|    515|   3.5|1147185231|
| 12882|    527|   4.0|1147195296|
| 12882|    552|   2.5|1147185190|
| 12882|    588|   3.0|1147195313|
| 12882|    589|   2.5|1147185399|
| 12882|    590|   3.0|1147195262|
| 12882|    592|   3.5|1147195242|
| 12882|    593|   4.5|1147195450|
| 12882|    648|   4.0|1147195332|
| 12882|    780|   0

In [6]:
# Load movies
movies_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("movies.csv")

In [7]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [8]:
"""
For each line in the movies dataset, we create a tuple of (MovieID, Title). 
    We drop the genres because we do not use them for this recommender.
"""
movies_df = movies_df.drop('genres')
movies_df.show(5)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows



In [9]:
"""
In order to determine the best ALS parameters, we will use the small dataset. 
We need first to split it into train, validation, and test datasets.
"""
(trainingData,validationData,testData) = ratings_df.randomSplit([0.6,0.2,0.2])

In [10]:
# Prepare test and validation set. They should not have ratings

validation_for_predict = validationData.select('userId','movieId')
test_for_predict = testData.select('userId','movieId')

In [11]:
"""
Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by 
using Alternating Least Squares. The implementation in MLlib has the following parameters:

    1. numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
    2. rank is the number of latent factors in the model.
    3. iterations is the number of iterations to run.
    4. lambda specifies the regularization parameter in ALS.
    5. implicitPrefs specifies whether to use the explicit 
        feedback ALS variant or one adapted for implicit feedback data.
    6. alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline 
        confidence in preference observations.

"""

seed = 5 #Random seed for initial matrix factorization model. A value of None will use system time as the seed.
iterations = 10
regularization_parameter = 0.1 #run for different lambdas - e.g. 0.01
ranks = [4, 8, 12] #number of features
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

In [12]:
#import os

#os.environ["SPARK_HOME"] = "/home/neil/Desktop/spark-2.2.0-bin-hadoop2.7/"
#os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"

In [13]:
# Let us traing our dataset and check the best rank with lowest RMSE
# predictAll method of the ALS takes only RDD format and hence we need to convert our dataframe into RDD
# df.rdd will automatically converts Dataframe into RDD

for rank in ranks:
    model = ALS.train(trainingData, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validationData.rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) # RMSE Error
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
"""
Spark will soon deprecate MLLIb package. 
They are focusing more on ML packages with standard machine learning implementation
Let's see that package also
"""
als =  mlals(maxIter=iterations,rank=4,seed=seed,regParam=regularization_parameter, userCol="userId", itemCol="movieId",ratingCol="rating")
modelML = als.fit(trainingData)
pred = modelML.transform(validationData)
pred = pred.where(pred['prediction'] != 'NaN')
    
# Evaluate the model by computing RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(pred)

print 'RMSE is %s' % rmse

"""
The best part is we do not have to worry about RDD any more with this library
"""

In [21]:
# Let's take test dataset and get ratings
predictions_test = model.predictAll(test_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))

In [22]:
## visualize preditions, here third element is predictions generated by ALS Model
predictions_test.take(3)

[((292, 320), 4.163789684707593),
 ((13, 320), 2.4530965672566305),
 ((118, 320), 4.9492797323585265)]

In [23]:
"""
Let's start recommending movies.
I have written a method to call recommendations for a perticular user from test data

TODO: You need to execute one more step before calling getRecommendations, 
      Think about that step. If you go through the seps below, you will realize it soon.
"""
def getRecommendations(user,testDf,trainDf,model):
    # get all user and his/her rated movies
    userDf = testDf.filter(testDf.userId == user)
    # filter movies from main set which have not been rated by selected user
    # and pass it to model we sreated above
    mov = trainDf.select('movieId').subtract(userDf.select('movieId'))
    
    # Again we need to covert our dataframe into RDD
    pred_rat = model.predictAll(mov.rdd.map(lambda x: (user, x[0]))).collect()
    
    # Get the top recommendations
    recommendations = sorted(pred_rat, key=lambda x: x[2], reverse=True)[:50]
    
    return recommendations

In [24]:
# Assign user id for which we need recommendations
user = 336

# Call getRecommendations method
derived_rec = getRecommendations(user,testData,trainingData,model)

print ("Movies recommended for:%d" % user)



Movies recommended for:336


In [27]:
derived_rec


[Rating(user=336, product=1368, rating=5.1981050690456625),
 Rating(user=336, product=958, rating=4.814280433278696),
 Rating(user=336, product=272, rating=4.709120785820507),
 Rating(user=336, product=1282, rating=4.687403767642065),
 Rating(user=336, product=315, rating=4.654853148942708),
 Rating(user=336, product=64, rating=4.611159440477497),
 Rating(user=336, product=192, rating=4.57882490756566),
 Rating(user=336, product=316, rating=4.555875752573431),
 Rating(user=336, product=357, rating=4.534705183664028),
 Rating(user=336, product=1643, rating=4.524863260473367),
 Rating(user=336, product=127, rating=4.521642274576629),
 Rating(user=336, product=1639, rating=4.497326809163149),
 Rating(user=336, product=954, rating=4.4886930781352214),
 Rating(user=336, product=1512, rating=4.4623095886606015),
 Rating(user=336, product=318, rating=4.456360167609304),
 Rating(user=336, product=114, rating=4.454167268584019),
 Rating(user=336, product=302, rating=4.430053560990084),
 Rating(

In [None]:
# Print the result
# TODO: we can convert derived_rec into a dataframe to present it properly
for i in xrange(len(derived_rec)):
    print i+1
    movies_df.filter(movies_df.movieId==derived_rec[i][1]).select('title').show()