In [None]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 44.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=bce199c69675a95fd3357cadf7edcbcc83d09c

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import time
import findspark
import pyspark
from pyspark.mllib.recommendation import ALS


In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [None]:
# ratings = sc.textFile("ratings.csv")


movies = spark.read.load("movies.csv", format='csv', header = True)
ratings = spark.read.load("ratings.csv", format='csv', header = True)
links = spark.read.load("links.csv", format='csv', header = True)
tags = spark.read.load("tags.csv", format='csv', header = True)

In [None]:
movies.show(5)


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings.show(5)


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
links.show(5)


+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [None]:
tags.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [None]:
movies.groupby("genres").count().orderBy("count", ascending=False).show()

+--------------------+-----+
|              genres|count|
+--------------------+-----+
|               Drama| 1053|
|              Comedy|  946|
|        Comedy|Drama|  435|
|      Comedy|Romance|  363|
|       Drama|Romance|  349|
|         Documentary|  339|
|Comedy|Drama|Romance|  276|
|      Drama|Thriller|  168|
|              Horror|  167|
|     Horror|Thriller|  135|
|         Crime|Drama|  134|
|Crime|Drama|Thriller|  125|
|           Drama|War|  114|
|        Comedy|Crime|  101|
|       Action|Comedy|   92|
|            Thriller|   84|
|     Children|Comedy|   74|
|       Comedy|Horror|   69|
|Action|Adventure|...|   66|
|Action|Crime|Thri...|   66|
+--------------------+-----+
only showing top 20 rows



In [None]:
print('Distinct values of ratings:')
print(sorted(ratings.select('rating').distinct().rdd.map(lambda r: r[0]).collect()))

Distinct values of ratings:
['0.5', '1.0', '1.5', '2.0', '2.5', '3.0', '3.5', '4.0', '4.5', '5.0']


In [None]:
tmp1 = ratings.groupBy("userID").count().select('count').rdd.min()[0]
tmp2 = ratings.groupBy("movieId").count().select('count').rdd.min()[0]
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [None]:
tmp1 = ratings.groupBy("movieId").count().withColumnRenamed("count", "rating count")\
.groupBy("rating count").count().orderBy('rating count').first()[1]
# Or use pandas: tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


In [None]:
print("Number of users:", ratings.select('userId').union(tags.select('userId')).distinct().count())

Number of users: 610


In [None]:
print("Number of users who rated movies:", ratings.select('userId').distinct().count())

Number of users who rated movies: 610


In [None]:
print("Number of movies:", ratings.select('movieId').union(tags.select('movieId')).distinct().count())

Number of movies: 9742


In [None]:
print("Number of rated movies:", ratings.select('movieId').distinct().count())

Number of rated movies: 9724


In [None]:
ratings = ratings.select("userId", "movieId", "rating")

In [None]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)



In [None]:
df = ratings.withColumn('userId', ratings['userId'].cast('int')).\
withColumn('movieId', ratings['movieId'].cast('int')).withColumn('rating', ratings['rating'].cast('float'))

In [None]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
train, validation, test = df.randomSplit([0.6,0.2,0.2], seed = 0)


In [None]:
print ("The number of ratings in each set: {}, {}, {}".format(train.count(), validation.count(), test.count())
)

The number of ratings in each set: 60435, 20052, 20349


In [None]:
mean_rating = train.groupby('movieId').mean().select('movieId','avg(rating)')
mean_rating = mean_rating.withColumnRenamed('avg(rating)','prediction')
mean_rating.show(5)

+-------+------------------+
|movieId|        prediction|
+-------+------------------+
|   1580|3.4257425742574257|
|   3175|3.5869565217391304|
|   1088|              3.34|
|  32460| 4.333333333333333|
|  44022|               3.0|
+-------+------------------+
only showing top 5 rows



In [None]:
test.createOrReplaceTempView("test")
mean_rating.createOrReplaceTempView("mean_rating")

sqlDF = spark.sql("select test.*, mean_rating.prediction \
                   from test join mean_rating \
                   on test.movieId = mean_rating.movieId")
sqlDF.show(5)

+------+-------+------+------------------+
|userId|movieId|rating|        prediction|
+------+-------+------+------------------+
|   607|   1580|   3.0|3.4257425742574257|
|   599|   1580|   3.0|3.4257425742574257|
|   580|   1580|   4.0|3.4257425742574257|
|   579|   1580|   4.0|3.4257425742574257|
|   559|   1580|   5.0|3.4257425742574257|
+------+-------+------+------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

def RMSE(predictions):
    return evaluator.evaluate(predictions)

In [None]:
print ('Using the mean rating of each movie as the prediction, the testing RMSE is ' + str(RMSE(sqlDF)))


Using the mean rating of each movie as the prediction, the testing RMSE is 0.9738656835815092


In [None]:
from pyspark.ml.recommendation import ALS

def GridSearch(train, valid, num_iterations, reg_param, n_factors):
    min_rmse = float('inf')
    best_n = -1
    best_reg = 0
    best_model = None
    for n in n_factors:
        for reg in reg_param:
            als = ALS(rank = n, 
                      maxIter = num_iterations, 
                      seed = 0, 
                      regParam = reg,
                      userCol="userId", 
                      itemCol="movieId", 
                      ratingCol="rating", 
                      coldStartStrategy="drop")            
            model = als.fit(train)
            predictions = model.transform(valid)
            rmse = RMSE(predictions)     
            print ('{} latent factors and regularization = {}: validation RMSE is {}'.format(n, reg, rmse))
            if rmse < min_rmse:
                min_rmse = rmse
                best_n = n
                best_reg = reg
                best_model = model
                
    pred = best_model.transform(train)
    train_rmse = RMSE(pred)
    print ('\nThe best model has {} latent factors and regularization = {}:'.format(best_n, best_reg))
    print ('traning RMSE is {}; validation RMSE is {}'.format(train_rmse, min_rmse))
    return best_model

In [None]:
num_iterations = 10
ranks = [6, 8, 10, 12]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

start_time = time.time()
final_model = GridSearch(train, validation, num_iterations, reg_params, ranks)
print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

6 latent factors and regularization = 0.05: validation RMSE is 0.9774929307576413
6 latent factors and regularization = 0.1: validation RMSE is 0.9129091171232059
6 latent factors and regularization = 0.2: validation RMSE is 0.8951553364405104
6 latent factors and regularization = 0.4: validation RMSE is 0.9694803184332064
6 latent factors and regularization = 0.8: validation RMSE is 1.1934058861934433
8 latent factors and regularization = 0.05: validation RMSE is 0.9911454460293756
8 latent factors and regularization = 0.1: validation RMSE is 0.9168968721788989
8 latent factors and regularization = 0.2: validation RMSE is 0.8984989583931352
8 latent factors and regularization = 0.4: validation RMSE is 0.9702570888476013
8 latent factors and regularization = 0.8: validation RMSE is 1.1934001728703625
10 latent factors and regularization = 0.05: validation RMSE is 0.9978579793437548
10 latent factors and regularization = 0.1: validation RMSE is 0.9176672176926329
10 latent factors and r

In [None]:

num_iterations = 15
ranks = [7, 8, 9]
reg_params = [0.1, 0.2, 0.3]

final_model = GridSearch(train, validation, num_iterations, reg_params, ranks)

7 latent factors and regularization = 0.1: validation RMSE is 0.9171975613567398
7 latent factors and regularization = 0.2: validation RMSE is 0.8987595837337011
7 latent factors and regularization = 0.3: validation RMSE is 0.926549261122086
8 latent factors and regularization = 0.1: validation RMSE is 0.9166986190332322
8 latent factors and regularization = 0.2: validation RMSE is 0.8977974062879039
8 latent factors and regularization = 0.3: validation RMSE is 0.926399403602508
9 latent factors and regularization = 0.1: validation RMSE is 0.9144883067966653
9 latent factors and regularization = 0.2: validation RMSE is 0.8987557279069118
9 latent factors and regularization = 0.3: validation RMSE is 0.9265441457847975

The best model has 8 latent factors and regularization = 0.2:
traning RMSE is 0.6761050651985206; validation RMSE is 0.8977974062879039


In [None]:
pred_test = final_model.transform(test)
print ('The testing RMSE is ' + str(RMSE(pred_test)))

The testing RMSE is 0.8973276428793233
