In [1]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
# Create my_spark
spark = SparkSession.builder.getOrCreate()
# load ratings data
ratings = spark.read.csv("ml-latest-small/ratings.csv", header=True, inferSchema=True)
print("Number of ratings: %i" % ratings.count())
print("Sample of ratings:")
ratings.show(5)

Number of ratings: 100836
Sample of ratings:
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [2]:
# Look at the column names
print (ratings.columns)

['userId', 'movieId', 'rating', 'timestamp']


# Calculate sparsity

In [3]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct users.
num_users = ratings.select("userId").distinct().count()

# Count the number of distinct movies.
num_movies = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print ("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.30% empty.


# The GroupBy and Filter Methods

In [4]:
# Import the requisite packages
from pyspark.sql.functions import col

# View the ratings dataset
ratings.show()

# Filter out all userIds greater than 100
ratings.filter(col("userId") < 100).show()

# Group data by userId, count ratings
ratings.groupBy("userId").count().show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0

# Summary Statistics

In [5]:
from pyspark.sql.functions import min,avg

# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().select(min("count")).show()
# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movieId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("userId").count().select(avg("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+------------------+
|        avg(count)|
+------------------+
|10.369806663924312|
+------------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|        20|
+----------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|165.30491803278687|
+------------------+



# View Schema

In [6]:
# Use the .printSchema() method to see the datatypes of the ratings dataset.
ratings.printSchema()

# Tell Spark to convert the columns to the proper data types.
ratings = ratings.select(ratings.userId.cast("integer"), ratings.movieId.cast("integer"), ratings.rating.cast("double"))

# Call .printSchema() again to confirm the columns are now in the correct format
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



# Create test/train splits and build your ALS model

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

# Create test and train set
(train, test) = ratings.randomSplit([0.80, 0.20], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False)

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

# tune ALS model

In [8]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid= ParamGridBuilder() 
param_grid=param_grid.addGrid(als.rank, [50, 100])
param_grid=param_grid.addGrid(als.maxIter, [10, 20]) 
param_grid=param_grid.addGrid(als.regParam, [.05, .1]) 
param_grid=param_grid.build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  8


# Build your cross validation pipeline

In [9]:
# Build cross validator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print (cv)

#Fit cross validator to the 'train' dataset
model = cv.fit(train)

CrossValidator_793f8f6cd821


# Best Model and Best Model Parameters

In [16]:
#Extract best model from the cv model above
best_model = model.bestModel

# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**


In [13]:
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

  Rank: 50
  MaxIter: 10
  RegParam: 0.05


# Generate predictions and calculate RMSE

In [14]:
test_predictions = model.transform(test)

In [17]:
# View the predictions 
test_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   385|    471|   4.0| 3.0619082|
|   462|    471|   2.5| 2.7239828|
|   387|    471|   3.0|  2.855985|
|   171|    471|   3.0| 5.0116787|
|    32|    471|   3.0| 4.1348567|
|   469|    471|   5.0| 3.4102752|
|   357|    471|   3.5|  4.144864|
|   191|    496|   5.0|       NaN|
|   132|   1088|   4.0| 2.9508123|
|   563|   1088|   4.0| 3.4017477|
|   594|   1088|   4.5|  4.299285|
|   307|   1088|   3.0| 2.2142334|
|    51|   1088|   4.0| 3.8735592|
|   221|   1088|   3.0| 2.9568038|
|   414|   1088|   3.0| 2.9884179|
|   200|   1088|   4.0| 3.7016778|
|   104|   1088|   3.0| 3.8977597|
|    19|   1238|   3.0| 2.8326402|
|   156|   1238|   4.0|  3.548694|
|   425|   1342|   3.5| 2.1621828|
+------+-------+------+----------+
only showing top 20 rows



In [18]:
# Select the first set of columns
selected1 = test_predictions.select("userId","movieId","rating","prediction")
# Define first filter
predictions = selected1.filter(test_predictions.prediction != 'NaN')

In [19]:
rmse = evaluator.evaluate(predictions)
print('RMSE:', rmse)

RMSE: 0.9315846852540457


# Make Recommendations

In [45]:
# Look at user 60's ratings
print ("User 60's Ratings:")
test.filter(col("userId") == 60).sort("rating",ascending = False).show()

# Look at the movies recommended to user 60
print ("User 60s Recommendations:")
test_predictions.filter(col("userId") == 60).show()

# Look at user 63's ratings
print ("User 63's Ratings:")
test.filter(col("userId") == 63).sort("rating", ascending = False).show()

# Look at the movies recommended to user 63
print ("User 63's Recommendations:")
test_predictions.filter(col("userId") == 63).show()

User 60's Ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    60|    318|   4.0|
|    60|   6016|   4.0|
|    60|    783|   4.0|
|    60|   1203|   3.0|
+------+-------+------+

User 60s Recommendations:
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    60|    318|   4.0|  4.520606|
|    60|   1203|   3.0|  4.465421|
|    60|    783|   4.0| 3.0083368|
|    60|   6016|   4.0|  4.255051|
+------+-------+------+----------+

User 63's Ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    63|    318|   5.0|
|    63|   1220|   5.0|
|    63|   1288|   5.0|
|    63|    344|   5.0|
|    63|   2716|   5.0|
|    63|  33779|   5.0|
|    63|  77455|   5.0|
|    63|  89753|   5.0|
|    63| 111781|   5.0|
|    63| 115617|   5.0|
|    63| 134853|   5.0|
|    63|   2078|   5.0|
|    63|     50|   5.0|
|    63|    260|   5.0|
|    63|   2858|   5.0|
|    63|   1198|   5.0|
|    6

# load raw data from movie

In [22]:
movie_info = spark.read.csv("ml-latest-small/movies.csv", header=True, inferSchema=True)
print("Raw movie data:")
movie_info.show(5, truncate=False)

Raw movie data:
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [49]:
this_user = test.filter(test['userId'] == 200).select('userId', 'movieId')
this_user.show()

+------+-------+
|userId|movieId|
+------+-------+
|   200|     34|
|   200|    435|
|   200|    480|
|   200|    586|
|   200|    852|
|   200|   1028|
|   200|   1036|
|   200|   1088|
|   200|   1198|
|   200|   1250|
|   200|   1391|
|   200|   1500|
|   200|   1517|
|   200|   1569|
|   200|   2012|
|   200|   2248|
|   200|   2502|
|   200|   2581|
|   200|   2991|
|   200|   3253|
+------+-------+
only showing top 20 rows



In [58]:
recommendation_this_user = model.transform(this_user)
recommendation_this_user.show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|   200|   1088| 3.7016778|
|   200|   4161| 3.7567112|
|   200|   6934| 3.2413628|
|   200|   6378| 3.6304917|
|   200|  58559| 4.3657455|
|   200|     34| 3.0511289|
|   200|   1198|  4.590148|
|   200|   1500| 3.4712083|
|   200|  30793|  4.346811|
|   200|   3481| 3.6183228|
|   200|   2502|  4.489507|
|   200|   6503| 2.5988638|
|   200|  53996| 3.4248185|
|   200|   4246| 3.4882336|
|   200|   7361| 3.9417608|
|   200|    435|  2.510918|
|   200|   1391| 3.1258621|
|   200|   4979| 3.6850567|
|   200|    852| 2.7579849|
|   200|  56949| 3.7036724|
+------+-------+----------+
only showing top 20 rows



In [60]:
# Join the DataFrames
recommendation_this_user_movies = recommendation_this_user.join(raw_movies,on='movieId',how="leftouter")

# Examine the data again
print(recommendation_this_user_movies.orderBy('prediction', ascending=False).filter(recommendation_this_user_movies.prediction != 'NaN').show())

+-------+------+----------+--------------------+--------------------+
|movieId|userId|prediction|               title|              genres|
+-------+------+----------+--------------------+--------------------+
|   2991|   200| 4.7491813|Live and Let Die ...|Action|Adventure|...|
|   3253|   200| 4.6182733|Wayne's World (1992)|              Comedy|
|   1198|   200|  4.590148|Raiders of the Lo...|    Action|Adventure|
|   1250|   200|  4.532661|Bridge on the Riv...| Adventure|Drama|War|
|   5989|   200|  4.521723|Catch Me If You C...|         Crime|Drama|
|   2502|   200|  4.489507| Office Space (1999)|        Comedy|Crime|
|   4816|   200| 4.4075074|    Zoolander (2001)|              Comedy|
|   4886|   200|  4.383157|Monsters, Inc. (2...|Adventure|Animati...|
|  58559|   200| 4.3657455|Dark Knight, The ...|Action|Crime|Dram...|
|  54503|   200| 4.3490386|     Superbad (2007)|              Comedy|
|  30793|   200|  4.346811|Charlie and the C...|Adventure|Childre...|
|  48738|   200| 4.3