In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS
import time
import math
from als import train_ALS, recommendation

In [2]:
spark = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

In [3]:
movies = spark.read.load('movies.csv', format='csv', header=True, inferSchema=True)
ratings = spark.read.load('ratings.csv', format='csv', header=True, inferSchema=True)
links = spark.read.load('links.csv', format='csv', header=True, inferSchema=True)
tags = spark.read.load('tags.csv', format='csv', header=True, inferSchema=True)

In [6]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [7]:
print('Distinct values of ratings:')
print(sorted(ratings.select('rating').distinct().rdd.map(lambda r: r[0]).collect()))

Distinct values of ratings:
[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]


In [8]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 1
Minimum number of ratings per movie is 1


In [9]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

10155 out of 53889 movies are rated by only one user


In [10]:
tmp1 = movies.select('movieID').distinct().count()
tmp2 = ratings.select('movieID').distinct().count()
print('We have a total of {} distinct movies that are rated by users in ratings table'.format(tmp2))
print('We have {} movies that are not rated yet'.format(tmp1-tmp2))

We have a total of 53889 distinct movies that are rated by users in ratings table
We have 4209 movies that are not rated yet


In [11]:
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
print('List movies that are not rated yet: ')
spark.sql(
    "SELECT movieId, title "
    "FROM movies "
    "WHERE movieId NOT IN (SELECT distinct(movieId) FROM ratings)"
).show(10)

List movies that are not rated yet: 
+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|  25817|Break of Hearts (...|
|  26361|Baby Blue Marine ...|
|  27153|Can't Be Heaven (...|
|  27433|        Bark! (2002)|
|  31945|Always a Bridesma...|
|  52696|Thousand and One ...|
|  58209|Alex in Wonder (S...|
|  60234|   Shock, The (1923)|
|  69565|Bling: A Planet R...|
|  69834|       Agency (1980)|
+-------+--------------------+
only showing top 10 rows



In [13]:
print('Counts of movies per genre')
splitter = UserDefinedFunction(lambda x: x.split('|'), ArrayType(StringType()))
movies.select('movieID', explode(splitter("genres")).alias("genres")) \
    .groupby('genres') \
    .count() \
    .sort(desc('count')) \
    .show()

Counts of movies per genre
+------------------+-----+
|            genres|count|
+------------------+-----+
|             Drama|24144|
|            Comedy|15956|
|          Thriller| 8216|
|           Romance| 7412|
|            Action| 7130|
|            Horror| 5555|
|       Documentary| 5118|
|             Crime| 5105|
|(no genres listed)| 4266|
|         Adventure| 4067|
|            Sci-Fi| 3444|
|           Mystery| 2773|
|          Children| 2749|
|         Animation| 2663|
|           Fantasy| 2637|
|               War| 1820|
|           Western| 1378|
|           Musical| 1113|
|         Film-Noir|  364|
|              IMAX|  197|
+------------------+-----+



In [12]:
rating = sc.textFile('ratings.csv')
head = rating.take(1)[0]
ratings = rating.filter(lambda x: x!=head).map(lambda x: x.split(",")).filter(lambda x: int(x[3]) > 1366677221).map(
    lambda x: (int(x[0]), int(x[1]), float(x[2]))).cache()

ratings.take(5)

[(13, 204, 5.0),
 (13, 216, 4.0),
 (13, 362, 4.0),
 (13, 720, 4.0),
 (13, 838, 3.5)]

In [13]:
train, valid, test = ratings.randomSplit([7, 2, 1], seed=42)

train.cache()
valid.cache()
test.cache()

PythonRDD[293] at RDD at PythonRDD.scala:53

In [18]:
ranks = [8, 10, 12, 14, 16, 18, 20]
regs = [0.001, 0.01, 0.05, 0.1, 0.2]

start_time = time.time()
final_model = train_ALS(train, valid, 10, regs, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

8 latent factors and regularization = 0.001: validation RMSE is 0.9043014519077007
8 latent factors and regularization = 0.01: validation RMSE is 0.8293024842044121
8 latent factors and regularization = 0.05: validation RMSE is 0.802750217604632
8 latent factors and regularization = 0.1: validation RMSE is 0.8102747314409071
8 latent factors and regularization = 0.2: validation RMSE is 0.8574665846463414
10 latent factors and regularization = 0.001: validation RMSE is 0.9127782733639884
10 latent factors and regularization = 0.01: validation RMSE is 0.826914767146492
10 latent factors and regularization = 0.05: validation RMSE is 0.796888229507744
10 latent factors and regularization = 0.1: validation RMSE is 0.8035750643905293
10 latent factors and regularization = 0.2: validation RMSE is 0.8538400389066517
12 latent factors and regularization = 0.001: validation RMSE is 0.9239643918401101
12 latent factors and regularization = 0.01: validation RMSE is 0.8288236826156852
12 latent fac

In [20]:
test_d = test.map(lambda p: (p[0], p[1]))
pred_t = final_model.predictAll(test_d).map(lambda x: ((x[0], x[1]), x[2]))
combine_t = test.map(lambda x: ((x[0], x[1]), x[2])).join(pred_t)

MSE_test = combine_t.map(lambda x: (x[1][0] - x[1][1])**2).mean()
err_test = math.sqrt(MSE_test)
print('The out-of-sample RMSE of rating predictions is', round(err_test, 4))

The out-of-sample RMSE of rating predictions is 0.7864


In [42]:
favorites = ['Hunger Game','Harry Porter','Iron Man']

recommends = recommendation(
    model_best_params={'iterations': 10, 'rank': 20, 'lambda_': 0.05}, 
    ratings=ratings, 
    movies=movies, 
    favs=favorites, 
    n=10, 
    spark_context=sc)

print('Recommendations for {}:'.format(my_favorite_movies[0]))
for i, title in enumerate(recommends):
    print('{0}: {1}'.format(i+1, title))

Recommendations for Iron Man:
1: Nobody Will Speak of Us When We're Dead (Nadie hablará de nosotras cuando hayamos muerto) (1995)
2: Went the Day Well? (1942)
3: Harakiri (1919)
4: 24 City (Er shi si cheng ji) (2008)
5: Dinosaur Island (1994)
6: Future My Love (2012)
7: A martfüi rém (2016)
8: Countdown (2004)
9: Rumble: The Indians Who Rocked the World (2017)
10: Whitney Cummings: Money Shot (2010)


In [28]:
recommends

[]