In [87]:
from pyspark.sql import SparkSession

In [88]:
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [102]:
movies = spark.read.csv("movies.csv",header=True)
ratings = spark.read.csv("ratings.csv",header=True)
ratings = ratings.withColumn("userId",col("userId").cast("int"))
ratings = ratings.withColumn("movieId",col("movieId").cast("int"))
ratings = ratings.withColumn("rating",col("rating").cast("int"))
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|     4|964982703|
|     1|      3|     4|964981247|
|     1|      6|     4|964982224|
|     1|     47|     5|964983815|
|     1|     50|     5|964982931|
|     1|     70|     3|964982400|
|     1|    101|     5|964980868|
|     1|    110|     4|964982176|
|     1|    151|     5|964984041|
|     1|    157|     5|964984100|
|     1|    163|     5|964983650|
|     1|    216|     5|964981208|
|     1|    223|     3|964980985|
|     1|    231|     5|964981179|
|     1|    235|     4|964980908|
|     1|    260|     5|964981680|
|     1|    296|     3|964982967|
|     1|    316|     3|964982310|
|     1|    333|     5|964981179|
|     1|    349|     4|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [103]:
movie_ratings = ratings.join(movies, ['movieId'], 'left')
movie_ratings.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|     4|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|     4|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|     4|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|     5|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|     5|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|     3|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|     5|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|     4|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|     5|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|     5|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [104]:
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")

In [105]:
get_mat_sparsity(ratings)

The ratings dataframe is  98.30% sparse.


In [106]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 2020)

In [107]:
from pyspark.sql.functions import lit, col

def get_binary_data(ratings):
    ratings = ratings.withColumn('binary', lit(1))
    userIds = ratings.select(col("userId").cast("int")).distinct()
    movieIds = ratings.select("movieId").distinct()

    user_movie = userIds.crossJoin(movieIds).join(ratings, ['userId', 'movieId'], "left")
    user_movie = user_movie.select(['userId', 'movieId', 'binary']).fillna(0)
    return user_movie

user_movie = get_binary_data(ratings)

In [108]:
user_movie.show()
user_movie.printSchema()

+------+-------+------+
|userId|movieId|binary|
+------+-------+------+
|   148|   1580|     0|
|   463|   1580|     0|
|   471|   1580|     0|
|   496|   1580|     0|
|   243|   1580|     0|
|   392|   1580|     0|
|   540|   1580|     0|
|    31|   1580|     0|
|   516|   1580|     0|
|    85|   1580|     0|
|   137|   1580|     1|
|   251|   1580|     0|
|   451|   1580|     0|
|   580|   1580|     1|
|    65|   1580|     0|
|   458|   1580|     0|
|    53|   1580|     0|
|   255|   1580|     0|
|   481|   1580|     0|
|   588|   1580|     0|
+------+-------+------+
only showing top 20 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- binary: integer (nullable = true)



In [109]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ALS model
als = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

In [110]:
# Import the requisite packages
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [111]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [112]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [113]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [114]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)
#Extract best model from the cv model above
best_model = model.bestModel
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.9133692135300809


In [124]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{171495, 5.88727...|
|     2|[{33649, 4.774651...|
|     3|[{6835, 4.8575363...|
|     4|[{3851, 4.84623},...|
|     5|[{5490, 4.536904}...|
|     6|[{33649, 4.893554...|
|     7|[{5490, 4.321935}...|
|     8|[{171495, 4.86760...|
|     9|[{171495, 4.82675...|
|    10|[{8869, 4.390154}...|
|    11|[{171495, 5.00620...|
|    12|[{67618, 5.795717...|
|    13|[{33649, 5.022939...|
|    14|[{171495, 4.61810...|
|    15|[{27611, 4.407625...|
|    16|[{33649, 4.283776...|
|    17|[{171495, 4.81478...|
|    18|[{171495, 4.63681...|
|    19|[{33649, 4.019057...|
|    20|[{132333, 4.87325...|
+------+--------------------+
only showing top 20 rows



In [125]:
from pyspark.sql.functions import explode

recommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

In [126]:
recommendations.limit(5).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1| 171495|5.8872743|
|     1|  33649|5.7832103|
|     1| 132333|5.6778646|
|     1|   5490|5.6778646|
|     1|  72171|5.6760664|
+------+-------+---------+

