In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer,IndexToString,VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
# Building the Spark Session
# Befire 2.0.0, the main connection objects were 'SaprkContext,SqlContext, and HiveContext'

spark = SparkSession \
        .builder \
        .appName("Movie_Recommendations") \
        .config('spark.some.config.option','some-value') \
        .getOrCreate()

# here, 'spark' is an object of SparkSession, which has the 'SparkContext' object and can be accessed directly
sc = spark.sparkContext
print(sc.version)

2.0.2


In [3]:
print(sc.defaultParallelism) # number of default number of partitions
print (sc.getConf().toDebugString()) # details about the spark configuration

4
hive.metastore.warehouse.dir=file:/home/ramscrux7757/spark-warehouse
spark.app.id=local-1521775083021
spark.app.name=Movie_Recommendations
spark.driver.host=172.31.60.179
spark.driver.port=50551
spark.executor.id=driver
spark.files=file:/home/ramscrux7757/.ivy2/jars/com.databricks_spark-xml_2.11-0.4.1.jar,file:/home/ramscrux7757/.ivy2/jars/com.databricks_spark-csv_2.11-1.5.0.jar,file:/home/ramscrux7757/.ivy2/jars/com.databricks_spark-avro_2.11-3.2.0.jar,file:/home/ramscrux7757/.ivy2/jars/graphframes_graphframes-0.5.0-spark2.0-s_2.11.jar,file:/home/ramscrux7757/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,file:/home/ramscrux7757/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar,file:/home/ramscrux7757/.ivy2/jars/org.apache.avro_avro-1.7.6.jar,file:/home/ramscrux7757/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar,file:/home/ramscrux7757/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar,file:/home/ramscrux7757/.ivy2/jars/com.thoughtworks.paranamer_parana

In [4]:
movies_data = spark.read \
    .format('com.databricks.spark.csv') \
    .option('header','true') \
    .option('inferSchema', 'true') \
    .load('/home/ramscrux7757/SPARK/ml-latest-small/movies.csv')
    
ratings_data = spark.read \
    .format('com.databricks.spark.csv') \
    .option('header','true') \
    .option('inferSchema', 'true') \
    .load('/home/ramscrux7757/SPARK/ml-latest-small/ratings.csv')

print(movies_data.count(), len(movies_data.columns))
print(ratings_data.count(), len(ratings_data.columns))

(9125, 3)
(100004, 4)


In [5]:
print(movies_data.rdd.getNumPartitions())
print(ratings_data.rdd.getNumPartitions())

1
1


In [None]:
# despite the default parallelism, the data has been placed within 1 executor

In [6]:
# Repartitioning the data
# here, the data has been distributed across '4' executors
repart_movies = movies_data.repartition(4)
repart_ratings = ratings_data.repartition(4)
print(repart_movies.rdd.getNumPartitions())
print(repart_ratings.rdd.getNumPartitions())

4
4


In [7]:
#movies_data.cache()
print(movies_data.printSchema())
print(ratings_data.printSchema())

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

None
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

None


In [8]:
movies_data.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [9]:
ratings_data.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [10]:
# easy to switch between spark (distributed or single) to native Pandas DF
#ratings_data.toPandas().head()
pd.DataFrame(repart_ratings.take(4), columns = repart_ratings.columns)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1029,3.0,1260759179
1,1,1263,2.0,1260759151
2,1,1343,2.0,1260759131
3,1,2105,4.0,1260759139


In [11]:
# Cache - both the repartitioned movies and ratings dataframes 
repart_movies.cache()
repart_ratings.cache()

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [12]:
repart_ratings.describe().show()
# usually, not all these are meaninngful and useful

+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|           100004|            100004|            100004|              100004|
|   mean|347.0113095476181|12548.664363425463| 3.543608255669773|1.1296390869392424E9|
| stddev|195.1638379781962|26369.198968815206|1.0580641091070389|1.9168582602710965E8|
|    min|                1|                 1|               0.5|           789652009|
|    max|              671|            163949|               5.0|          1476640644|
+-------+-----------------+------------------+------------------+--------------------+



In [13]:
# unique user ID's 
print('Total Users:', repart_ratings.select(repart_ratings.userId).distinct().count())
# unique movie ID's
print('Total movies:', repart_ratings.select(repart_ratings.movieId).distinct().count())
# number of movies with at least one rating higher than 4
print('Total most popular Movies:', repart_ratings.filter(repart_ratings.rating > 4).select(repart_ratings.movieId).distinct().count())
# number of different genres
print('Count of different genres:', repart_movies.select(repart_movies.genres).distinct().count())

('Total Users:', 671)
('Total movies:', 9066)
('Total most popular Movies:', 4035)
('Count of different genres:', 902)


In [14]:
# SQL way of querying
repart_ratings.createOrReplaceTempView('ratings')
spark.sql("SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4").show()

+----+
|  nb|
+----+
|4035|
+----+



In [65]:
# Average rating of each movie  -- this lets us to pick the highly rated movies aswell !!!
repart_ratings.groupby('movieId').agg({'rating':'mean'}) \
                .join(repart_movies, repart_ratings.movieId == repart_movies.movieId, 'inner') \
                .drop(repart_movies.movieId) \
                .show()

+-------+------------------+--------------------+--------------------+
|movieId|       avg(rating)|               title|              genres|
+-------+------------------+--------------------+--------------------+
|   3175|3.5076923076923077| Galaxy Quest (1999)|Adventure|Comedy|...|
|  96488|              3.75|Searching for Sug...|         Documentary|
|   1580| 3.663157894736842|Men in Black (a.k...|Action|Comedy|Sci-Fi|
|   7982|3.1666666666666665|Tale of Two Siste...|Drama|Horror|Myst...|
|   1088| 3.358490566037736|Dirty Dancing (1987)|Drama|Musical|Rom...|
|   1238| 4.147058823529412|   Local Hero (1983)|              Comedy|
|   1342|3.0588235294117645|     Candyman (1992)|     Horror|Thriller|
|   6620|3.6470588235294117|American Splendor...|        Comedy|Drama|
|   1645|3.4583333333333335|The Devil's Advoc...|Drama|Mystery|Thr...|
|   3794|               3.4| Chuck & Buck (2000)|        Comedy|Drama|
|   1959|               3.8|Out of Africa (1985)|       Drama|Romance|
|   21

In [15]:
# Checking for the missing values
from pyspark.sql.functions import isnan, when, count, col
repart_ratings.select([count(when(isnan(c),c)).alias(c) for c in repart_ratings.columns]).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



In [16]:
repart_movies.select([count(when(isnan(c),c)).alias(c) for c in repart_movies.columns]).show()

+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+



In [17]:
# identifying the linar relation between usedId's and movieId's
import seaborn as sns
%matplotlib inline

#ratingsPandas = repart_ratings.toPandas()
#lm = sns.lmplot(x='userId', y='movieId', data=ratingsPandas, fit_reg=False, size=10, 
#                aspect=2, palette=sns.diverging_palette(10, 133, sep=80, n=10))

#axes = lm.axes
#axes[0,0].set_ylim(0,163950)
#axes[0,0].set_xlim(0,675)
#lm

In [18]:
# Building the recommender system
# 3 types of algorithms: user-based, content-based and collaborative filtering
# here, we are using 'collaborative filtering' approach
# alternating least squares (ALS) algorithm provides collaborative filetering between users and products (moves here)

# chekcing the sparcity of the user/product(movies) matrix
spark.sql("""
    SELECT *, 100 * nb_ratings/matrix_size AS percentage
    FROM (
        SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
        FROM (
            SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
            FROM ratings
        )
    )
""").show()

+--------+---------+----------+-----------+------------------+
|nb_users|nb_movies|nb_ratings|matrix_size|        percentage|
+--------+---------+----------+-----------+------------------+
|     671|     9066|    100004|    6083286|1.6439141608663477|
+--------+---------+----------+-----------+------------------+



In [None]:
# less than 2% of the matrix is filled

In [19]:
# splitting the data
(trainingRatings, testRatings) = repart_ratings.randomSplit([0.8, 0.2])

In [20]:
# Training the model

from pyspark.ml.recommendation import ALS

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)

In [21]:
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   380|    463|   3.0| 968949106| 2.9540255|
|   602|    471|   3.0| 842357922| 4.0983095|
|   274|    471|   5.0|1074104142|  3.724506|
|   440|    471|   3.0| 835337519| 3.4512959|
|   292|    471|   3.5|1140049920| 3.9857833|
|   452|    471|   3.0| 976422396| 3.3047695|
|    19|    471|   3.0| 855192558|  3.776581|
|   309|    471|   4.0|1114565458| 4.0564013|
|    15|    471|   3.0|1166586067|  2.988698|
|   659|    471|   4.0| 853412972| 3.5642576|
|   102|    471|   5.0| 958248997| 4.3238254|
|    73|    471|   4.0|1296460183| 3.8675725|
|   508|    471|   4.0| 844377075|  4.092397|
|   242|    471|   5.0| 956686752| 4.5214157|
|   468|    471|   4.0|1296197444| 3.4027889|
|   497|    496|   2.0| 939767844| 2.7011259|
|   294|    833|   2.0|1047074195| 2.5256567|
|   128|   1088|   5.0|1049690378| 4.9113264|
|   111|   1088|   3.5|1097431651|

In [22]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
evaluator.evaluate(predictions)

nan

In [23]:
predictions.select([count(when(isnan(c),c)).alias(c) for c in predictions.columns]).show()

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     0|      0|     0|        0|       708|
+------+-------+------+---------+----------+



In [24]:
# replacing the Nan's in prediction columns with 'average ratings'

avgRatings = repart_ratings.select('rating').groupBy().avg().first()[0]
print "The average rating in the dataset is: " + str(avgRatings)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

print('By dropping the NaN records:', evaluator.evaluate(predictions.na.drop()))
print('By replacing the NaN\'s with avg:', evaluator.evaluate(predictions.na.fill(avgRatings)))

The average rating in the dataset is: 3.54360825567
('By dropping the NaN records:', 0.9166879407205767)
("By replacing the NaN's with avg:", 0.9284420923478783)


In [32]:
# k-Fold cross-validation

#def kfoldALS(data, k=3, userCol="userId", itemCol="movieId", ratingCol="rating", metricName="rmse"):
#    evaluations = []
#    weights = [1.0] * k
#    splits = data.randomSplit(weights)
#    for i in range(0, k):  
#        testingSet = splits[i]
#        trainingSet = spark.createDataFrame(sc.emptyRDD(), data.schema)
#        for j in range(0, k):
#            if i == j:
#                continue
#            else:
#                trainingSet = trainingSet.union(splits[j])
#        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
#        model = als.fit(trainingSet)
#        predictions = model.transform(testingSet)
#        evaluator = RegressionEvaluator(metricName=metricName, labelCol="rating", predictionCol="prediction")
#        evaluation = evaluator.evaluate(predictions.na.drop())
#        print "Loop " + str(i+1) + ": " + metricName + " = " + str(evaluation)
#        evaluations.append(evaluation)
#    return sum(evaluations)/float(len(evaluations))

In [29]:
#print('RMSE = ', kfoldALS(repart_ratings, k=4))

In [25]:
print(als.explainParams())

alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movieId)
maxIter: max number of iterations (>= 0). (default: 10)
nonnegative: whether to use nonnegative constraint for least squares (default: False)
numItemBlocks: number of item blocks (default: 10)
numUserBlocks: number of user blocks (default: 10)
predictionCol: prediction column name. (default: prediction)
rank: rank of the factorization (default: 10)
ratingCol: column name for ratings (default: rating, curren

In [26]:
# Parameter tuning
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#(trainingRatings, validationRatings) = repart_ratings.randomSplit([0.8, 0.2])
#evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

paramGrid = ParamGridBuilder() \
        .addGrid(als.rank, [1, 5, 10]) \
        .addGrid(als.maxIter, [10]) \
        .addGrid(als.regParam, [0.05, 0.1, 0.5]) \
        .build()

cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(trainingRatings)
predictions = cvModel.transform(testRatings)

print "The root mean squared error for cross-validated model is: " + str(evaluator.evaluate(predictions.na.fill(avgRatings)))

The root mean squared error for cross-validated model is: 0.980104521746


In [47]:
# Recommended movies for the new users
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = repart_ratings.select("movieId").distinct().withColumn("userId", lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = repart_ratings.filter(repart_ratings.userId == user).select("movieId", "userId")

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)) \
                        .dropna().orderBy("prediction", ascending=False) \
                        .limit(nbRecommendations).select("movieId", "prediction")

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(repart_movies, predictions.movieId == repart_movies.movieId) \
                                .select(predictions.movieId, repart_movies.title, repart_movies.genres, \
                                        predictions.prediction)

    recommendations.show(truncate=False)
    #print(dataSet.show())
    #print(moviesAlreadyRated.show())
    #print(predictions.show())

In [48]:
print "Recommendations for user 133:"
recommendMovies(cvModel, 133, 10)
#print "Recommendations for user 471:"
#recommendMovies(cvModel, 471, 10)
#print "Recommendations for user 496:"
#recommendMovies(cvModel, 496, 10)

Recommendations for user 133:
+-------+--------------------------------------+-----------------------------+----------+
|movieId|title                                 |genres                       |prediction|
+-------+--------------------------------------+-----------------------------+----------+
|4930   |Funeral in Berlin (1966)              |Action|Drama|Thriller        |6.7820787 |
|4796   |Grass Is Greener, The (1960)          |Comedy|Romance               |6.7820787 |
|4591   |Erik the Viking (1989)                |Adventure|Comedy|Fantasy     |6.7820787 |
|3892   |Anatomy (Anatomie) (2000)             |Horror                       |6.7820787 |
|1563   |Dream With the Fishes (1997)          |Drama                        |6.7820787 |
|1819   |Storefront Hitchcock (1997)           |Documentary|Musical          |6.7820787 |
|3216   |Vampyros Lesbos (Vampiras, Las) (1971)|Fantasy|Horror|Thriller      |4.049766  |
|97957  |Excision (2012)                       |Crime|Drama|Horror|Thr