<h3>Imports<h3>

In [64]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *

<h3>Configs<h3>

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

conf = SparkConf() \
.setAppName('app') \
.setMaster("local[*]")

sc = SparkContext.getOrCreate(conf=conf)

<h3>Read Data<h3>

In [3]:
moviesDf  = spark.read.format("csv").load("movies.csv",  header=True).cache()
ratingsDf = spark.read.format("csv").load("ratings.csv", header=True).cache()

In [92]:
metadata_all = moviesDf.join(ratingsDf, moviesDf.movieId == ratingsDf.movieId) \
    .select(ratingsDf.userId, moviesDf.movieId, ratingsDf.rating, moviesDf.genres, moviesDf.title )

In [4]:
movie_ratings = moviesDf.join(ratingsDf, moviesDf.movieId == ratingsDf.movieId) \
    .select(ratingsDf.userId, moviesDf.movieId, ratingsDf.rating)

<h3>Type Casting<h3>

In [6]:
movie_ratings = movie_ratings.withColumn("userId",movie_ratings.userId.cast("int"))
movie_ratings = movie_ratings.withColumn("movieId",movie_ratings.movieId.cast("int"))
movie_ratings = movie_ratings.withColumn("rating",movie_ratings.rating.cast("double"))

In [7]:
movie_ratings.dtypes

[('userId', 'int'), ('movieId', 'int'), ('rating', 'double')]

<h3>Sparsity Func<h3>

In [9]:
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")
    


In [10]:
get_mat_sparsity(movie_ratings)

The ratings dataframe is  98.34% sparse.


<h3>Train and Test<h3>

In [13]:
(train, test) = movie_ratings.randomSplit([0.8, 0.2], seed = 2020)

<h3>ALS Configrations<h3>

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

als = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

In [17]:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

<h3>Param Grid Configartions<h3>

In [18]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

<h3>Regression Evulator Configrations<h3>

In [19]:
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


<h3>Cross Validator<h3>

In [20]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

<h3>Fit Model<h3>

In [21]:
model = cv.fit(train)

<h3>Select Best Model<h3>

In [25]:
best_model = model.bestModel

<h3>Model Result<h3>

In [30]:
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.9277270814484029


<h3>Recommend For All Users 5 Movie<h3>

In [34]:
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{8675, 4.540138}...|
|     3|[{8675, 5.6698346...|
|     5|[{5570, 4.627322}...|
|     6|[{115170, 4.80307...|
|     9|[{8675, 5.1065087...|
|    12|[{8675, 4.3357286...|
|    13|[{8675, 5.090038}...|
|    15|[{2647, 4.8586903...|
|    16|[{31116, 4.559248...|
|    17|[{31116, 4.812059...|
|    19|[{8675, 5.0115213...|
|    20|[{8675, 5.3883734...|
|    22|[{53318, 4.798818...|
|    26|[{8675, 5.0120754...|
|    27|[{8675, 5.3096685...|
|    28|[{3637, 4.3964314...|
|    31|[{7335, 4.8536677...|
|    34|[{8675, 4.5817814...|
|    35|[{8675, 4.9572916...|
|    37|[{8675, 4.8997564...|
+------+--------------------+
only showing top 20 rows



<h3>Explode movieId and rating<h3>

In [63]:
nrecommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
nrecommendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1|   8675| 4.540138|
|     1|   7338| 4.540138|
|     1|   4710| 4.405482|
|     1|   4337| 4.259174|
|     1|  31116| 4.193966|
|     3|   8675|5.6698346|
|     3|   7338|5.6698346|
|     3|  31116|5.5037985|
|     3|   1428| 5.225751|
|     3|    951|5.1894836|
+------+-------+---------+



<h3>Recommend One User<h3>

In [109]:
nrecommendations.join(moviesDf, on='movieId').filter('userId = 1').show()

+-------+------+--------+--------------------+----+-----------------+
|movieId|userId|  rating|               title|year|           genres|
+-------+------+--------+--------------------+----+-----------------+
|   8675|     1|4.540138| b'Enemy Below, The'|1957| Action|Drama|War|
|   7338|     1|4.540138|      b'Richard III'|1955|        Drama|War|
|   4710|     1|4.405482|    b'Shootist, The'|1976|    Drama|Western|
|   4337|     1|4.259174|b'Sand Pebbles, The'|1966|Drama|Romance|War|
|  31116|     1|4.193966|    b'Sergeant York'|1941|        Drama|War|
+-------+------+--------+--------------------+----+-----------------+



<h3>User Ratings<h3>

In [106]:
data.groupBy('genres').count().sort(col("count").desc())

genres,count
Crime|Drama,10
Action|Adventure|...,7
Drama,6
Action|Adventure|...,6
Horror,6
Action|Crime|Thri...,5
Action|Crime|Dram...,4
Crime|Drama|Thriller,4
Crime|Film-Noir|M...,4
Action|Sci-Fi|Thr...,4


: 