### Loading and parsing datasets

No we are ready to read in each of the files and create an RDD consisting of parsed lines.  

Each line in the ratings dataset (`ratings.csv`) is formatted as:  

`userId,movieId,rating,timestamp`  

Each line in the movies (`movies.csv`) dataset is formatted as:  

`movieId,title,genres`  

Were *genres* has the format:  

`Genre1|Genre2|Genre3...`

The tags file (`tags.csv`) has the format:  

`userId,movieId,tag,timestamp`  

And finally, the `links.csv` file has the format:  

`movieId,imdbId,tmdbId`  

The format of these files is uniform and simple, so we can use Python [`split()`](https://docs.python.org/2/library/stdtypes.html#str.split) to parse their lines once they are loaded into RDDs. Parsing the movies and ratings files yields two RDDs:  

* For each line in the ratings dataset, we create a tuple of `(UserID, MovieID, Rating)`. We drop the *timestamp* because we do not need it for this recommender.  
* For each line in the movies dataset, we create a tuple of `(MovieID, Title)`. We drop the *genres* because we do not use them for this recommender.  

So let's load the raw ratings data. We need to filter out the header, included in each file.    

In [2]:
import pyspark as ps
from pyspark.sql.types import *

spark = ps.sql.SparkSession.builder \
    .master("local[4]") \
    .appName("case-study") \
    .getOrCreate()

In [3]:
movies_full_df = spark.read.csv('data/movies/movies.csv', inferSchema=True, header=True)
ratings_full_df = spark.read.csv('data/movies/ratings.csv', inferSchema=True, header=True)
# tags_full_df = spark.read.csv('data/movies/tags.csv', inferSchema=True, header=True)
# links_full_df = spark.read.csv('data/movies/links.csv', inferSchema=True, header=True)

In [4]:
print(movies_full_df.take(5))
print(ratings_full_df.take(5))
# print(tags_full_df.take(5))
# print(links_full_df.take(5))
# print(rating_full_df.printSchema())

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'), Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'), Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'), Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'), Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]
[Row(userId=1, movieId=31, rating=2.5, timestamp=1260759144), Row(userId=1, movieId=1029, rating=3.0, timestamp=1260759179), Row(userId=1, movieId=1061, rating=3.0, timestamp=1260759182), Row(userId=1, movieId=1129, rating=2.0, timestamp=1260759185), Row(userId=1, movieId=1172, rating=4.0, timestamp=1260759205)]


In [5]:
with_replacement = False
movies_df = movies_full_df.sample(with_replacement, 0.1)

In [6]:
print(type(movies_full_df))
movies_full_df.show()
print(type(movies_df))
movies_df.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino

In [7]:
movies_raw_df = movies_full_df
ratings_raw_df = ratings_full_df
# tags_df = tags_full_df
# links_df = links_full_df

In [8]:
# drop the genres column so we are only working with movie_id, title
movies_df = movies_raw_df.drop('genres')

# drop the timestamp
ratings_df = ratings_raw_df.drop('timestamp')

## Selecting ALS parameters using the small dataset

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [10]:
ratings_df.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
|     1|   1343|   2.0|
|     1|   1371|   2.5|
|     1|   1405|   1.0|
|     1|   1953|   4.0|
|     1|   2105|   4.0|
|     1|   2150|   3.0|
|     1|   2193|   2.0|
|     1|   2294|   2.0|
|     1|   2455|   2.5|
|     1|   2968|   1.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 20 rows



In [11]:
(training, test) = ratings_df.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data



In [12]:
als = ALS(seed=5, maxIter=5, regParam=0.1, rank=4, userCol="userId",itemCol="movieId", ratingCol="rating")
model = als.fit(training)

In [13]:
predictions = model.transform(test)
predictions.show()
print(predictions.describe().show())

# evaluator=BinaryClassificationEvaluator(labelCol='rating', rawPredictionCol='prediction')
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print(predictions.printSchema())
rmse = evaluator.evaluate(predictions.dropna())
print("Root-mean-square error = " + str(rmse))

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   534|    463|   4.0|  3.768993|
|   242|    463|   4.0|  3.821826|
|    85|    471|   3.0|  3.279749|
|   460|    471|   5.0|   3.88757|
|   602|    471|   3.0| 4.1085205|
|    92|    471|   4.0| 3.8203726|
|   309|    471|   4.0| 4.1995983|
|   358|    471|   5.0| 3.8729005|
|   487|    471|   4.0| 3.9180155|
|   529|    471|   4.0| 3.6788464|
|   311|    471|   0.5|  2.716748|
|   399|    471|   5.0| 3.4869518|
|   296|    833|   4.5| 3.3651454|
|   412|    833|   1.0| 2.1878226|
|   212|   1088|   3.5|  3.218675|
|   500|   1088|   4.0| 3.2485583|
|   582|   1088|   3.5| 3.4516826|
|   607|   1088|   2.0| 3.4723363|
|   505|   1088|   4.0| 2.9556832|
|   264|   1088|   4.0| 3.8859043|
+------+-------+------+----------+
only showing top 20 rows

+-------+-----------------+------------------+------------------+----------+
|summary|           userId|           movieId|           

In [14]:
als = ALS(userCol="userId",itemCol="movieId", ratingCol="rating")
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

seed = 5
iterations = [10, 20]
regularization_parameter = [0.05, 0.1, 0.2, 0.5]
ranks = [4, 8, 12]

# errors = [0, 0, 0]
# err = 0
# tolerance = 0.02

# paramGrid = ParamGridBuilder() \
#     .addGrid(als.regParam, regularization_parameter) \
#     .addGrid(als.maxIter, iterations) \
#     .addGrid(als.seed, [seed]) \
#     .addGrid(als.rank, ranks) \
#     .build()
    
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, regularization_parameter) \
    .addGrid(als.maxIter, iterations) \
    .addGrid(als.seed, [seed]) \
    .addGrid(als.rank, ranks) \
    .build()

crossval = CrossValidator(estimator=als,
                          evaluator = evaluator,
                          estimatorParamMaps=paramGrid,
                          numFolds=2) 

In [15]:
test.registerTempTable('test')
tdf = spark.sql('''
                SELECT DISTINCT userId
                FROM test''')
tdf.show()

+------+
|userId|
+------+
|   148|
|   463|
|   471|
|   496|
|   243|
|   392|
|   540|
|   623|
|    31|
|   516|
|    85|
|   137|
|   251|
|   451|
|   580|
|    65|
|   458|
|    53|
|   255|
|   481|
+------+
only showing top 20 rows



In [16]:
# Run lumch cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

In [17]:
rmse = evaluator.evaluate(predictions.dropna())
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9156753265989203


In [18]:
best_model = cvModel.bestModel
best_model

ALS_484b8caf75bd85b536e0

In [29]:
predictions = cvModel.transform(test)
user_recommendations = model.recommendForAllUsers(10)
item_recomendations = model.recommendForAllItems(10)
print(user_recommendations.show())
print(item_recomendations.show())

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[7669,5.406508],...|
|   463|[[67504,5.430828]...|
|   496|[[8535,5.9998264]...|
|   148|[[83411,6.0360246...|
|   540|[[7096,6.4437385]...|
|   392|[[92494,5.4114256...|
|   243|[[83411,5.4076853...|
|   623|[[83411,6.1725845...|
|    31|[[83411,5.9359956...|
|   516|[[67504,5.2222137...|
|   580|[[83411,5.1590877...|
|   251|[[83411,6.4844513...|
|   451|[[5114,5.969368],...|
|    85|[[2570,6.077694],...|
|   137|[[65188,5.665559]...|
|    65|[[4731,6.2018056]...|
|   458|[[7669,5.546337],...|
|   481|[[67504,6.1706157...|
|    53|[[87522,5.7504325...|
|   255|[[83411,6.0636168...|
+------+--------------------+
only showing top 20 rows

None
+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[46,5.601162], [...|
|   5300|[[566,4.5389132],...|
|   6620|[[179,4.920983], ...|
|   7340|[[156,4.7629623],...|
|  32460|[[298,4.8513107],...|
|

In [30]:
#more grid searching
#scratch notes for filling nans etc. & finding recommendation for a single user

als = ALS(userCol="userId",itemCol="movieId", ratingCol="rating")
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

training = training.fillna(3)
test = test.fillna(3)

seed = 5
iterations = [5, 10]
regularization_parameter = [0.1, 0.2]
ranks = [4, 8]

# errors = [0, 0, 0]
# err = 0
# tolerance = 0.02

# paramGrid = ParamGridBuilder() \
#     .addGrid(als.regParam, regularization_parameter) \
#     .addGrid(als.maxIter, iterations) \
#     .addGrid(als.seed, [seed]) \
#     .addGrid(als.rank, ranks) \
#     .build()
    
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, regularization_parameter) \
    .addGrid(als.maxIter, iterations) \
    .addGrid(als.seed, [seed]) \
    .addGrid(als.rank, ranks) \
    .build()

crossval = CrossValidator(estimator=als,
                          evaluator = evaluator,
                          estimatorParamMaps=paramGrid,
                          numFolds=10)

new_cvModel = crossval.fit(training)

In [31]:
new_predictions = new_cvModel.transform(test)
rmse = evaluator.evaluate(predictions.dropna())
print(rmse)

grid_search_best_model = new_cvModel.bestModel
print (grid_search_best_model)

0.9537639387657043
ALS_4f7794d4eb4c9942343d


In [32]:
movies_user = ratings_df.join(movies_df, ['movieId'])
movies_user.show()

+-------+------+------+--------------------+
|movieId|userId|rating|               title|
+-------+------+------+--------------------+
|     31|     1|   2.5|Dangerous Minds (...|
|   1029|     1|   3.0|        Dumbo (1941)|
|   1061|     1|   3.0|     Sleepers (1996)|
|   1129|     1|   2.0|Escape from New Y...|
|   1172|     1|   4.0|Cinema Paradiso (...|
|   1263|     1|   2.0|Deer Hunter, The ...|
|   1287|     1|   2.0|      Ben-Hur (1959)|
|   1293|     1|   2.0|       Gandhi (1982)|
|   1339|     1|   3.5|Dracula (Bram Sto...|
|   1343|     1|   2.0|    Cape Fear (1991)|
|   1371|     1|   2.5|Star Trek: The Mo...|
|   1405|     1|   1.0|Beavis and Butt-H...|
|   1953|     1|   4.0|French Connection...|
|   2105|     1|   4.0|         Tron (1982)|
|   2150|     1|   3.0|Gods Must Be Craz...|
|   2193|     1|   2.0|       Willow (1988)|
|   2294|     1|   2.0|         Antz (1998)|
|   2455|     1|   2.5|     Fly, The (1986)|
|   2968|     1|   1.0| Time Bandits (1981)|
|   3671| 

In [33]:
movies_user.registerTempTable('movies_user_sql')

distinct_user_id = spark.sql('''
                            SELECT DISTINCT userId FROM movies_user_sql ORDER BY userId''')
distinct_user_id.show()
print(distinct_user_id.count())

distinct_movie_id = spark.sql('''
                            SELECT DISTINCT movieId, title FROM movies_user_sql ORDER BY movieId''')
print(distinct_movie_id.count())
distinct_movie_id.show()

userid_rdd = distinct_user_id.rdd
movie_rdd = distinct_movie_id.rdd

+------+
|userId|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
|    20|
+------+
only showing top 20 rows

671
9066
+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
|      6|         Heat (1995)|
|      7|      Sabrina (1995)|
|      8| Tom and Huck (1995)|
|      9| Sudden Death (1995)|
|     10|    GoldenEye (1995)|
|     11|American Presiden...|
|     12|Dracula: Dead and...|
|     13|        Balto (1995)|
|     14|        Nixon (1995)|
|     15|Cutthroat Island ...|
|     16|       Casino (1995)|
|     17|Sense and Sensibi...|
|     18|   Four Rooms (1995)|
|     19|Ace Ventura: When...|
|     20|  Money Train (1995)|
+-------+--------------------+
only 

In [34]:
rdd = movies_user.rdd
rdd.take(5)

[Row(movieId=31, userId=1, rating=2.5, title='Dangerous Minds (1995)'),
 Row(movieId=1029, userId=1, rating=3.0, title='Dumbo (1941)'),
 Row(movieId=1061, userId=1, rating=3.0, title='Sleepers (1996)'),
 Row(movieId=1129, userId=1, rating=2.0, title='Escape from New York (1981)'),
 Row(movieId=1172, userId=1, rating=4.0, title='Cinema Paradiso (Nuovo cinema Paradiso) (1989)')]

In [35]:
user_id = 1

movies_rated = movies_user.where(movies_user.userId==user_id)
movies_rated.show(25)


+-------+------+------+--------------------+
|movieId|userId|rating|               title|
+-------+------+------+--------------------+
|     31|     1|   2.5|Dangerous Minds (...|
|   1029|     1|   3.0|        Dumbo (1941)|
|   1061|     1|   3.0|     Sleepers (1996)|
|   1129|     1|   2.0|Escape from New Y...|
|   1172|     1|   4.0|Cinema Paradiso (...|
|   1263|     1|   2.0|Deer Hunter, The ...|
|   1287|     1|   2.0|      Ben-Hur (1959)|
|   1293|     1|   2.0|       Gandhi (1982)|
|   1339|     1|   3.5|Dracula (Bram Sto...|
|   1343|     1|   2.0|    Cape Fear (1991)|
|   1371|     1|   2.5|Star Trek: The Mo...|
|   1405|     1|   1.0|Beavis and Butt-H...|
|   1953|     1|   4.0|French Connection...|
|   2105|     1|   4.0|         Tron (1982)|
|   2150|     1|   3.0|Gods Must Be Craz...|
|   2193|     1|   2.0|       Willow (1988)|
|   2294|     1|   2.0|         Antz (1998)|
|   2455|     1|   2.5|     Fly, The (1986)|
|   2968|     1|   1.0| Time Bandits (1981)|
|   3671| 

In [36]:
movies_rated.registerTempTable('movies_rated_sql')
distinct_movie_id.registerTempTable('d_movie_id')

movies_not_rated = spark.sql('''
                            SELECT d_movie_id.movieId, d_movie_id.title
                            FROM d_movie_id
                            LEFT JOIN movies_rated_sql
                            ON d_movie_id.movieId = movies_rated_sql.movieId
                            WHERE movies_rated_sql.userId IS NULL
                            ''')
movies_not_rated.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|    148|Awfully Big Adven...|
|    463|Guilty as Sin (1993)|
|    471|Hudsucker Proxy, ...|
|    496|What Happened Was...|
|    833|High School High ...|
|   1088|Dirty Dancing (1987)|
|   1238|   Local Hero (1983)|
|   1342|     Candyman (1992)|
|   1580|Men in Black (a.k...|
|   1591|        Spawn (1997)|
|   1645|The Devil's Advoc...|
|   1959|Out of Africa (1985)|
|   2122|Children of the C...|
|   2142|American Tail: Fi...|
|   2366|    King Kong (1933)|
|   2659|It Came from Holl...|
|   2866|Buddy Holly Story...|
|   3175| Galaxy Quest (1999)|
|   3794| Chuck & Buck (2000)|
|   3918|Hellbound: Hellra...|
+-------+--------------------+
only showing top 20 rows



In [37]:
pred_drop_nan = predictions.dropna()
pred_drop_nan.show()

pred_drop_nan.registerTempTable('predictions_sql')
movies_not_rated.registerTempTable('movies_not_rated_sql')

top_preds = spark.sql('''
                        SELECT movies_not_rated_sql.title, predictions_sql.prediction
                        FROM predictions_sql
                        JOIN movies_not_rated_sql
                        ON predictions_sql.movieId=movies_not_rated_sql.movieId
                        ORDER BY predictions_sql.prediction DESC
                        LIMIT 10''')
top_preds.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   534|    463|   4.0| 4.1284094|
|   242|    463|   4.0| 3.7865186|
|    85|    471|   3.0| 3.1321955|
|   460|    471|   5.0| 3.8541195|
|   602|    471|   3.0|  4.327158|
|    92|    471|   4.0| 3.8575947|
|   309|    471|   4.0| 4.2382703|
|   358|    471|   5.0| 3.9551108|
|   487|    471|   4.0| 3.7404592|
|   529|    471|   4.0| 3.8283744|
|   311|    471|   0.5|  2.510409|
|   399|    471|   5.0| 3.5116353|
|   296|    833|   4.5|  4.468324|
|   412|    833|   1.0| 1.4515231|
|   212|   1088|   3.5| 3.2614427|
|   500|   1088|   4.0| 3.3545737|
|   582|   1088|   3.5| 3.3808362|
|   607|   1088|   2.0| 3.4734614|
|   505|   1088|   4.0| 3.0011072|
|   264|   1088|   4.0| 3.8867025|
+------+-------+------+----------+
only showing top 20 rows

+--------------------+----------+
|               title|prediction|
+--------------------+----------+
|Smokey and the Ba...| 6.1444435

In [38]:
user_id = 2

movies_user = ratings_df.join(movies_df, ['movieId'])
movies_user.count()
distinct_movie_id = spark.sql('''
                            SELECT DISTINCT movieId, title
                            FROM movies_user_sql
                            ORDER BY movieId''')
distinct_movie_id.count()
print(user_id)
movies_rated = movies_user.where(movies_user.userId==user_id)
movies_rated.count()
movies_rated.registerTempTable('movies_rated_sql')
distinct_movie_id.registerTempTable('d_movie_id')
movies_not_rated = spark.sql('''
                            SELECT d_movie_id.movieId, d_movie_id.title
                            FROM d_movie_id
                            LEFT JOIN movies_rated_sql
                            ON d_movie_id.movieId = movies_rated_sql.movieId
                            WHERE movies_rated_sql.userId IS NULL
                            ''')
print('Movies not rated')
print(movies_not_rated.count())
pred_drop_nan = predictions.dropna()
pred_drop_nan.printSchema()
pred_drop_nan.registerTempTable('predictions_sql')
movies_not_rated.registerTempTable('movies_not_rated_sql')

tp = spark.sql('''
                SELECT predictions_sql.prediction, predictions_sql.userId 
                FROM predictions_sql
                WHERE userId=2
                ''')
print(tp.count())
top_preds = spark.sql('''
                        SELECT movies_not_rated_sql.title, predictions_sql.prediction, predictions_sql.userId
                        FROM predictions_sql
                        JOIN movies_not_rated_sql
                        ON predictions_sql.movieId=movies_not_rated_sql.movieId
                        WHERE predictions_sql.userId=2
                        ORDER BY predictions_sql.prediction DESC
                        LIMIT 10''')
top_preds.show()
#This won't work because predictions is a sparse matrix

2
Movies not rated
8990
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- prediction: float (nullable = true)

22
+-----+----------+------+
|title|prediction|userId|
+-----+----------+------+
+-----+----------+------+

