In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import mlflow

In [0]:
df = spark.read.format('csv').option('header','true').load("/FileStore/tables/all_netflix.csv")

In [0]:
df.show()

+---+-------+--------+------+----------+
|_c0|Cust_Id|Movie_Id|Rating|      Date|
+---+-------+--------+------+----------+
|  0|1488844|       1|     3|2005-09-06|
|  1| 822109|       1|     5|2005-05-13|
|  2| 885013|       1|     4|2005-10-19|
|  3|  30878|       1|     4|2005-12-26|
|  4| 823519|       1|     3|2004-05-03|
|  5| 893988|       1|     3|2005-11-17|
|  6| 124105|       1|     4|2004-08-05|
|  7|1248029|       1|     3|2004-04-22|
|  8|1842128|       1|     4|2004-05-09|
|  9|2238063|       1|     3|2005-05-11|
| 10|1503895|       1|     4|2005-05-19|
| 11|2207774|       1|     5|2005-06-06|
| 12|2590061|       1|     3|2004-08-12|
| 13|   2442|       1|     3|2004-04-14|
| 14| 543865|       1|     4|2004-05-28|
| 15|1209119|       1|     4|2004-03-23|
| 16| 804919|       1|     4|2004-06-10|
| 17|1086807|       1|     3|2004-12-28|
| 18|1711859|       1|     4|2005-05-08|
| 19| 372233|       1|     5|2005-11-23|
+---+-------+--------+------+----------+
only showing top

In [0]:
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
ratings = (df
    .select(
        'Cust_Id',
        'Movie_Id',
        'Rating',
    )
).cache()

In [0]:
from pyspark.sql import functions as F
ratings_c = ratings.withColumn("Cust_Id",F.col("Cust_Id").cast("integer").alias("Cust_Id")) \
            .withColumn("Movie_Id",F.col("Movie_Id").cast("integer").alias("Movie_Id")) \
            .withColumn("Rating",F.col("Rating").cast("integer").alias("Rating"))

In [0]:
(training, test) = ratings_c.randomSplit([0.8, 0.2])

In [0]:
(training2, test2) = ratings_c.randomSplit([0.2, 0.8])
(training3, test3) = ratings_c.randomSplit([0.4, 0.6])
(training4, test4) = ratings_c.randomSplit([0.6, 0.4])

In [0]:
als = ALS(maxIter=2, regParam=0.01, 
          userCol="Cust_Id",
          itemCol="Movie_Id",
          ratingCol="Rating",
          nonnegative = True,
          coldStartStrategy="drop",
          implicitPrefs=False)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="Rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [0]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
# Confirm cv was built
print(cv)

CrossValidator_e6903097a498


In [0]:
#Fit cross validator to the 'train' dataset
model = cv.fit(training)

#Extract best model from the cv model above
best_model = model.bestModel

MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.


In [0]:
print("**Best Model**")
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 50
  MaxIter: 2
  RegParam: 0.15


In [0]:
#Fit cross validator to the 'train' dataset
model2 = cv.fit(training2)

#Extract best model from the cv model above
best_model2 = model2.bestModel

MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.


In [0]:
#Fit cross validator to the 'train' dataset
model3 = cv.fit(training3)

#Extract best model from the cv model above
best_model3 = model3.bestModel

MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.


In [0]:
#Fit cross validator to the 'train' dataset
model4 = cv.fit(training4)

#Extract best model from the cv model above
best_model4 = model4.bestModel

MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.


In [0]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.2168230037582293


In [0]:
# View the predictions
test_predictions2 = best_model2.transform(test2)
RMSE2 = evaluator.evaluate(test_predictions2)
print(RMSE2)

1.5146343371585127


In [0]:
# View the predictions
test_predictions3 = best_model3.transform(test3)
RMSE3 = evaluator.evaluate(test_predictions3)
print(RMSE3)

1.3406451812831852


In [0]:
# View the predictions
test_predictions4 = best_model4.transform(test4)
RMSE4 = evaluator.evaluate(test_predictions4)
print(RMSE4)

1.2781943808298617


In [0]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [0]:
test_predictions.show()

+-------+--------+------+----------+
|Cust_Id|Movie_Id|Rating|prediction|
+-------+--------+------+----------+
|   1500|      28|     1| 3.2370677|
|   2225|    4506|     4|  3.386501|
|   2225|    4516|     3|  2.629615|
|   2225|   13382|     1| 2.6649241|
|   3595|   13370|     3| 2.7085943|
|   3595|   13378|     4|  3.523923|
|   4229|   13384|     4|  2.481201|
|   4368|    9229|     4| 3.1785128|
|   4477|       8|     3| 2.8597584|
|   4477|      28|     5| 3.3612454|
|   4477|    9234|     4| 2.6547906|
|   4624|    4506|     4|  5.036746|
|   6056|    4517|     2| 2.2777045|
|   6460|       3|     5|  3.657725|
|   6460|      28|     3|  4.520268|
|   6460|    9216|     4|  1.945454|
|   7116|    4506|     4| 3.1689746|
|   7116|   13384|     3| 2.7011154|
|   7576|   13380|     5| 2.3146744|
|   7594|    9224|     4| 2.0724607|
+-------+--------+------+----------+
only showing top 20 rows



In [0]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

+-------+--------------------+
|Cust_Id|     recommendations|
+-------+--------------------+
|    183|[{4506, 2.8535624...|
|    296|[{13384, 3.812205...|
|    384|[{28, 4.432577}, ...|
|    756|[{13384, 1.879018...|
|   1034|[{28, 4.7816935},...|
|   1135|[{28, 1.9441148},...|
|   1243|[{28, 4.1046705},...|
|   1331|[{4520, 3.7103972...|
|   1500|[{4520, 3.8322995...|
|   1650|[{28, 3.9628386},...|
+-------+--------------------+



In [0]:
movieRecs = best_model.recommendForAllItems(10)
movieRecs.show()

+--------+--------------------+
|Movie_Id|     recommendations|
+--------+--------------------+
|    9230|[{1302139, 4.5253...|
|      20|[{2268720, 4.7556...|
|    4510|[{1743179, 4.7533...|
|    4520|[{1443113, 6.3639...|
|    4500|[{1065665, 4.4532...|
|   13380|[{1140762, 4.5317...|
|   13370|[{1030924, 5.0554...|
|      10|[{1739627, 4.5221...|
|    9220|[{1618249, 4.4307...|
|      30|[{290716, 5.41505...|
|   13371|[{1770537, 4.5687...|
|    4521|[{931817, 4.67657...|
|    9231|[{1935097, 4.6262...|
|       1|[{1686060, 4.5322...|
|   13381|[{268593, 4.47149...|
|    4511|[{1957883, 4.5383...|
|    9221|[{650676, 4.69163...|
|    9211|[{196716, 4.37343...|
|      21|[{2301402, 4.5666...|
|    4501|[{1935097, 4.3840...|
+--------+--------------------+
only showing top 20 rows



In [0]:
movie_title = spark.read.format('csv').option('header','true').load("/FileStore/tables/netflix_movietitles-1.csv")

In [0]:
movie_title.show()

+--------+------+--------------------+
|Movie_Id|  Year|                Name|
+--------+------+--------------------+
|       1|2003.0|     Dinosaur Planet|
|       2|2004.0|Isle of Man TT 20...|
|       3|1997.0|           Character|
|       4|1994.0|Paula Abdul's Get...|
|       5|2004.0|The Rise and Fall...|
|       6|1997.0|                Sick|
|       7|1992.0|               8 Man|
|       8|2004.0|What the #$*! Do ...|
|       9|1991.0|Class of Nuke 'Em...|
|      10|2001.0|             Fighter|
|      11|1999.0|Full Frame: Docum...|
|      12|1947.0|My Favorite Brunette|
|      13|2003.0|Lord of the Rings...|
|      14|1982.0|  Nature: Antarctica|
|      15|1988.0|Neil Diamond: Gre...|
|      16|1996.0|           Screamers|
|      17|2005.0|           7 Seconds|
|      18|1994.0|    Immortal Beloved|
|      19|2000.0|By Dawn's Early L...|
|      20|1972.0|     Seeta Aur Geeta|
+--------+------+--------------------+
only showing top 20 rows



In [0]:
inner_join = test_predictions.join(movie_title, test_predictions.Movie_Id == movie_title.Movie_Id)
inner_join.show()

+-------+--------+------+----------+--------+------+--------------------+
|Cust_Id|Movie_Id|Rating|prediction|Movie_Id|  Year|                Name|
+-------+--------+------+----------+--------+------+--------------------+
|   1500|      28|     1| 3.2370677|      28|2002.0|     Lilo and Stitch|
|   2225|    4506|     4|  3.386501|    4506|1961.0|Breakfast at Tiff...|
|   2225|    4516|     3|  2.629615|    4516|1999.0|Saturday Night Li...|
|   2225|   13382|     1| 2.6649241|   13382|1986.0|  The Mosquito Coast|
|   3595|   13370|     3| 2.7085943|   13370|2002.0|Justice League: P...|
|   3595|   13378|     4|  3.523923|   13378|1940.0|     His Girl Friday|
|   4229|   13384|     4|  2.481201|   13384|1979.0|   Kramer vs. Kramer|
|   4368|    9229|     4| 3.1785128|    9229|1988.0|              Colors|
|   4477|       8|     3| 2.8597584|       8|2004.0|What the #$*! Do ...|
|   4477|      28|     5| 3.3612454|      28|2002.0|     Lilo and Stitch|
|   4477|    9234|     4| 2.6547906|  

In [0]:
movierec = movieRecs.join(movie_title, movieRecs.Movie_Id == movie_title.Movie_Id)

+--------+--------------------+--------+------+--------------------+
|Movie_Id|     recommendations|Movie_Id|  Year|                Name|
+--------+--------------------+--------+------+--------------------+
|    9230|[{1302139, 4.5253...|    9230|1983.0|   Max Dugan Returns|
|      20|[{2268720, 4.7556...|      20|1972.0|     Seeta Aur Geeta|
|    4510|[{1743179, 4.7533...|    4510|1959.0|     Kaagaz Ke Phool|
|    4520|[{1443113, 6.3639...|    4520|1995.0|    Grumpier Old Men|
|    4500|[{1065665, 4.4532...|    4500|1945.0|Les Dames du Bois...|
|   13380|[{1140762, 4.5317...|   13380|1949.0|           Stray Dog|
|   13370|[{1030924, 5.0554...|   13370|2002.0|Justice League: P...|
|      10|[{1739627, 4.5221...|      10|2001.0|             Fighter|
|    9220|[{1618249, 4.4307...|    9220|2004.0|Kangaroo Jack: G'...|
|      30|[{290716, 5.41505...|      30|2003.0|Something's Gotta...|
|   13371|[{1770537, 4.5687...|   13371|1997.0|Chuck Norris: Pri...|
|    4521|[{931817, 4.67657...|   

In [0]:
movierec.show(10)

+--------+--------------------+--------+------+--------------------+
|Movie_Id|     recommendations|Movie_Id|  Year|                Name|
+--------+--------------------+--------+------+--------------------+
|    9230|[{1302139, 4.5253...|    9230|1983.0|   Max Dugan Returns|
|      20|[{2268720, 4.7556...|      20|1972.0|     Seeta Aur Geeta|
|    4510|[{1743179, 4.7533...|    4510|1959.0|     Kaagaz Ke Phool|
|    4520|[{1443113, 6.3639...|    4520|1995.0|    Grumpier Old Men|
|    4500|[{1065665, 4.4532...|    4500|1945.0|Les Dames du Bois...|
|   13380|[{1140762, 4.5317...|   13380|1949.0|           Stray Dog|
|   13370|[{1030924, 5.0554...|   13370|2002.0|Justice League: P...|
|      10|[{1739627, 4.5221...|      10|2001.0|             Fighter|
|    9220|[{1618249, 4.4307...|    9220|2004.0|Kangaroo Jack: G'...|
|      30|[{290716, 5.41505...|      30|2003.0|Something's Gotta...|
+--------+--------------------+--------+------+--------------------+
only showing top 10 rows

