In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd
import numpy as np
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

## Data Source
https://grouplens.org/datasets/movielens/

In [5]:
movies = pd.read_csv("movies.csv")
movies

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
...,...,...,...
9737,193581,Black Butler: Book of the Atlantic (2017),Action|Animation|Comedy|Fantasy
9738,193583,No Game No Life: Zero (2017),Animation|Comedy|Fantasy
9739,193585,Flint (2017),Drama
9740,193587,Bungo Stray Dogs: Dead Apple (2018),Action|Animation


In [224]:
movies.shape

(9742, 3)

In [None]:
new_ratings = spark.read.csv("ratings.csv", header=True)
new_ratings = new_ratings.sample(withReplacement=False, fraction=0.5, seed=1234)

In [230]:
%%time
# Count the total number of ratings in the dataset
numerator = new_ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = new_ratings.select("userId").distinct().count()
num_movies = new_ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is", "%.2f" % sparsity + "% empty.")

The ratings dataframe is 98.90% empty.
CPU times: user 10.1 ms, sys: 4.45 ms, total: 14.5 ms
Wall time: 2.11 s


In [221]:
%%time
from pyspark.sql.functions import max, min, avg
# Min num ratings for movies
print("Movie with the fewest ratings: ")
new_ratings.groupBy("movieId").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
new_ratings.groupBy("movieId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
new_ratings.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
new_ratings.groupBy("userId").count().select(avg("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+----------------+
|      avg(count)|
+----------------+
|6.71293459437858|
+----------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         5|
+----------+

Avg num ratings per user: 
+-----------------+
|       avg(count)|
+-----------------+
|82.61311475409836|
+-----------------+

CPU times: user 31.6 ms, sys: 18.6 ms, total: 50.1 ms
Wall time: 5.45 s


In [106]:
# Use .printSchema() to see the datatypes of the ratings dataset
new_ratings.printSchema()

# Tell Spark to convert the columns to the proper data types
new_ratings = new_ratings.select(new_ratings.userId.cast("integer"), 
                                     new_ratings.movieId.cast("integer"), 
                                     new_ratings.rating.cast("double"))

# Call .printSchema() again to confirm the columns are now in the correct format
new_ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [127]:
# Create test and train set
(training, testing) = new_ratings.randomSplit([.8, .2], seed=1234)

In [None]:
%%time
# Import the required functions
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


# Create ALS model
als = ALS(userCol="userId", itemCol="movieId",
          ratingCol="rating", maxIter=20, coldStartStrategy="drop", 
          nonnegative=True, implicitPrefs=False)

# Fit model to training data
model = als.fit(training)  

# Generate predictions on test_data
predictions = model.transform(test)

# Tell Spark how to evaluate predictions
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
# Obtain and print RMSE
rmse = evaluator.evaluate(predictions)
print(f'RMSE: {rmse}')

In [165]:
%%time
sc.setCheckpointDir('/Users/Roger/Desktop/Brandeis/Spring2020/Self-project/PySpark/ml-1m')
# Build generic ALS model without hyperparameters
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", 
            coldStartStrategy="drop", nonnegative = True, 
            implicitPrefs = False)

# Tell Spark what values to try for each hyperparameter
param_grid = ParamGridBuilder()\
                    .addGrid(als.rank, [50, 100, 120])\
                    .addGrid(als.maxIter, [5, 50, 80])\
                    .addGrid(als.regParam, [.05, .1, .5])\
                    .build()

# Tell Spark how to evaluate model performance           
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", 
            predictionCol="prediction")

# Build cross validation step using CrossValidator 
cv_als = CrossValidator(estimator = als,
                    estimatorParamMaps = param_grid,
                    evaluator = evaluator,
                    numFolds = 3)

# Run the cv on the training data                    
new_model = cv_als.fit(training)                    

# Extract best combination of values from cross validation
new_best_model = new_model.bestModel


CPU times: user 12 s, sys: 2.84 s, total: 14.9 s
Wall time: 1h 6min 33s


In [178]:
 new_best_model

ALS_73a185fc1ba6

In [232]:
print(type(new_best_model))
print("")
print("====Best Model===")

# Print "Rank"
print("  Rank:", new_best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", new_best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", new_best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>

====Best Model===
  Rank: 120
  MaxIter: 50
  RegParam: 0.1


In [169]:
new_test_predictions = new_best_model.transform(testing)
new_test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   541|    471|   3.0| 3.7734034|
|   599|    833|   1.5| 1.8606207|
|   608|    833|   0.5|  1.914421|
|    84|   1088|   3.0| 2.9149926|
|    10|   1088|   3.0| 2.6746564|
|   116|   1088|   4.5| 3.0357797|
|   587|   1238|   4.0| 3.9014268|
|   223|   1342|   1.0| 1.6475196|
|   232|   1580|   3.5| 3.7188938|
|   111|   1580|   3.0| 2.8687918|
|   474|   1580|   4.5| 3.4266531|
|   542|   1580|   3.5| 3.2883322|
|    45|   1580|   3.0| 3.9989038|
|    82|   1580|   4.0|  3.597784|
|   237|   1580|   4.5| 2.9974847|
|   352|   1580|   2.5| 3.6041574|
|   469|   1580|   3.0| 3.4081657|
|   354|   1580|   3.5|  3.735845|
|    67|   1580|   4.0| 3.2788165|
|   399|   1580|   0.5|  4.124449|
+------+-------+------+----------+
only showing top 20 rows



In [168]:
# Obtain and print RMSE
rmse = evaluator.evaluate(new_test_predictions)
print(f'RMSE: {rmse}')

RMSE: 0.9636748778855049


In [171]:
new_recommendation = new_test_predictions.join(spark_movies, 'movieId', 'left')\
                                         .sort('prediction', ascending=False)
new_recommendation.show()

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating|prediction|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|    318|   162|   5.0|   5.32176|Shawshank Redempt...|         Crime|Drama|
|    318|    52|   5.0| 5.1270366|Shawshank Redempt...|         Crime|Drama|
|    296|   348|   5.0|  5.070769| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    318|   122|   5.0| 5.0326896|Shawshank Redempt...|         Crime|Drama|
|    858|   417|   5.0| 5.0283055|Godfather, The (1...|         Crime|Drama|
|   2959|   296|   5.0| 5.0194983|   Fight Club (1999)|Action|Crime|Dram...|
| 112552|   515|   5.0|  5.001726|     Whiplash (2014)|               Drama|
|   1732|   122|   5.0| 4.9857078|Big Lebowski, The...|        Comedy|Crime|
|   1208|   465|   5.0|  4.984627|Apocalypse Now (1...|    Action|Drama|War|
|    364|    43|   5.0| 4.9801106|Lion King, The (1...|Adventure|Animati...|

In [179]:
# Look at user 50's ratings
print("User 50's Ratings:")
original.filter(original.userId == 50).sort("rating", ascending = False).show()

# Look at the movies recommended to user 50
print("User 50s Recommendations:")
new_recommendation.filter(new_recommendation.userId == 50).show()


User 50's Ratings:
+-------+------+------+--------------------+--------------------+
|movieId|userId|rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1232|    50|   4.0|      Stalker (1979)|Drama|Mystery|Sci-Fi|
|    541|    50|   4.0| Blade Runner (1982)|Action|Sci-Fi|Thr...|
|   2019|    50|   4.0|Seven Samurai (Sh...|Action|Adventure|...|
|   6975|    50|   3.5|  Funny Games (1997)|Drama|Horror|Thri...|
|  44555|    50|   3.5|Lives of Others, ...|Drama|Romance|Thr...|
|   1278|    50|   3.5|Young Frankenstei...|      Comedy|Fantasy|
|   1198|    50|   3.5|Raiders of the Lo...|    Action|Adventure|
|   2712|    50|   3.5|Eyes Wide Shut (1...|Drama|Mystery|Thr...|
|   8143|    50|   3.5|  Lola Montès (1955)|               Drama|
|    914|    50|   3.0| My Fair Lady (1964)|Comedy|Drama|Musi...|
|   1223|    50|   3.0|Grand Day Out wit...|Adventure|Animati...|
|   1370|    50|   3.0|   Die Hard 2 (1990)|Action|Advent

In [220]:
user_50_original = original.filter(original.userId == 50)
user_50_recommendation = new_recommendation.filter(new_recommendation.userId == 50)

# Get unique recommendations for user 50
user_50_recommendation.join(user_50_original, 'movieId', 'left_anti').sort('prediction', ascending=False).show()


+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating|prediction|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|    750|    50|   4.0|  3.469326|Dr. Strangelove o...|          Comedy|War|
|   7748|    50|   4.0| 3.2748797|Pierrot le fou (1...|         Crime|Drama|
|     32|    50|   3.0|  3.104672|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
|   2160|    50|   4.0| 3.0225413|Rosemary's Baby (...|Drama|Horror|Thri...|
|   1230|    50|   3.5| 2.9624524|   Annie Hall (1977)|      Comedy|Romance|
|    969|    50|   3.5|  2.916551|African Queen, Th...|Adventure|Comedy|...|
| 128620|    50|   3.5| 2.9083743|     Victoria (2015)| Crime|Drama|Romance|
|   8368|    50|   3.0| 2.9052143|Harry Potter and ...|Adventure|Fantasy...|
|    909|    50|   3.0| 2.8207333|Apartment, The (1...|Comedy|Drama|Romance|
|   1079|    50|   2.5| 2.8008797|Fish Called Wanda...|        Comedy|Crime|