In [1]:
# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=b610881791329d6cf004d8ee7b031e446005572f8e95b8b82498648377fcaa91
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
from pyspark.sql.types import *
import pandas as pd
pd.options.display.max_columns = None
from pyspark.sql.functions import *

In [3]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

<hr style="border:1px solid blue"></hr>

### LOADING DATA

#### Download Dataset & Load into HDFS

```
sudo su 
cd 
mkdir mov
cd mov

wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
unzip ml-latest.zip
rm -f ml-latest.zip

hdfs dfs -mkdir /user/root/data/
hdfs dfs -mkdir /user/root/data/MOV/
hdfs dfs -mkdir /user/root/data/MOV/CSV

hdfs dfs -put ml-latest/* /user/root/data/MOV/CSV
```


In [4]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
!unzip ml-latest.zip
!rm -f ml-latest.zip

--2023-04-09 01:40:05--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘ml-latest.zip’


2023-04-09 01:40:09 (72.7 MB/s) - ‘ml-latest.zip’ saved [277113433/277113433]

Archive:  ml-latest.zip
   creating: ml-latest/
  inflating: ml-latest/links.csv     
  inflating: ml-latest/tags.csv      
  inflating: ml-latest/genome-tags.csv  
  inflating: ml-latest/ratings.csv   
  inflating: ml-latest/README.txt    
  inflating: ml-latest/genome-scores.csv  
  inflating: ml-latest/movies.csv    


#### Define schema for Movies Dataset


In [5]:
moviesStruct = [StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)]

moviesSchema = StructType(moviesStruct)

#### Read Movies dataframe from HDFS

In [6]:
# Read movies from HDFS as CSV (FIRST TIME ONLY)

moviesDF = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .schema(moviesSchema) \
    .load("/content/ml-latest/movies.csv")
#   .load("hdfs:///user/root/data/MOV/CSV/movies.csv")

In [7]:
moviesDF

DataFrame[movieId: int, title: string, genres: string]

In [8]:
display(moviesDF)

DataFrame[movieId: int, title: string, genres: string]

#### Write Movies Dataframe to Parquet File

In [9]:
#moviesDF.write.parquet("hdfs:///user/root/data/MOV/PARQUET/movies.parquet")
moviesDF.write.parquet("/content/ml-latest/movies.parquet")

#### Reload Movies Dataframe From Parquet File 

In [10]:
# (everytime after the first export to PARQUET)

# moviesDF = spark.read.parquet("hdfs:///user/root/data/MOV/PARQUET/movies.parquet")
moviesDF = spark.read.parquet("/content/ml-latest/movies.parquet")

#### Make Use of Caching Features

In [11]:
# caching might be aof great help for crossvalidation -among others
# read caching comment for ratingsDF below

moviesDF.cache()

DataFrame[movieId: int, title: string, genres: string]

#### Define schema for Ratings Dataset

In [12]:
# Define schema for ratings dataset
ratingsStruct = [StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", IntegerType(), True)]

ratingsSchema = StructType(ratingsStruct)

#### Read Ratings Dataframe from HDFS

In [13]:
# Read ratings from HDFS (FIRST TIME ONLY)
ratingsDF = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .schema(ratingsSchema) \
    .load("/content/ml-latest/ratings.csv")
#    .load("hdfs:///user/root/data/MOV/CSV/ratings.csv")

ratingsDF.limit(10).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,307,3.5,1256677221
1,1,481,3.5,1256677456
2,1,1091,1.5,1256677471
3,1,1257,4.5,1256677460
4,1,1449,4.5,1256677264
5,1,1590,2.5,1256677236
6,1,1591,1.5,1256677475
7,1,2134,4.5,1256677464
8,1,2478,4.0,1256677239
9,1,2840,3.0,1256677500


#### Write Ratings Dataframe to Parquet File

In [14]:
# ratingsDF.write.parquet("hdfs:///user/root/data/MOV/PARQUET/ratings.parquet")
ratingsDF.write.parquet("/content/ml-latest/ratings.parquet")

#### Reload Ratings Dataframe From Parquet File 

In [15]:
# LOAD RATINGS From Parquet File (everytime after the first export to PARQUEY)

# ratingsDF = spark.read.parquet("hdfs:///user/root/data/MOV/PARQUET/ratings.parquet").drop("timestamp")
ratingsDF = spark.read.parquet("/content/ml-latest/ratings.parquet").drop("timestamp")

#### Make Use of Caching Features

In [16]:
# Caching might be of great help - especially for crossvalidation -among others
# it is recommended for RDD re-use in iterative machine learning applications
# Check the size of your data on disk, and the total memory available to spark
# to see how much of your data fits into memory
# If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. 

ratingsDF.cache()

DataFrame[userId: int, movieId: int, rating: double]

<hr style="border:1px solid blue"></hr>

### Data Exploration & Filtering

#### Select Users Dataframe

In [17]:
# Unique Users Id :
usersDF = ratingsDF.select("userId").distinct()

In [18]:
# Total User Count

usersDF.count()

283228

In [19]:
usersDF.columns

['userId']

#### Group Rating Count by Users

In [20]:
ratingsDF.groupBy("userId").count().show(10, False)

+------+-----+
|userId|count|
+------+-----+
|148   |48   |
|463   |92   |
|471   |363  |
|496   |31   |
|833   |17   |
|1088  |5    |
|1238  |5    |
|1342  |3    |
|1580  |33   |
|1591  |52   |
+------+-----+
only showing top 10 rows



In [21]:
# Summary 
# Got 1000209 ratings from 6040 users on 3883 movies.
print("Got {} ratings from {} users on {} movies.".format(ratingsDF.count(), usersDF.count(), moviesDF.count()))

Got 27753444 ratings from 283228 users on 58098 movies.


In [22]:
ratingsDF.columns

['userId', 'movieId', 'rating']

#### Select Max User ID 
This will be useful for user incrementation

In [23]:
ratingsDF.agg({"userId": "max"}).collect()[0][0]

283228

In [24]:
ratingsDF.schema

StructType([StructField('userId', IntegerType(), True), StructField('movieId', IntegerType(), True), StructField('rating', DoubleType(), True)])

#### Create Ratings View

In [25]:
ratingsDF.createTempView("RATING")

#### Use SQL to select Ratings for a specific User

In [27]:
#%%sql
#select * from RATING where userId = 283228

In [28]:
#%%sql
#select count(*) from RATING where userId = 283228

#### Create a Sample Dataframe 
(example)

In [29]:
sampleDF = moviesDF.sample(fraction=0.001)

<hr style="border:1px solid blue"></hr>

### Exploring Movies Dataset 

#### Select Comedy Movies Only

In [30]:
moviesDF.filter(moviesDF.genres.contains('Comedy')).limit(10).toPandas()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,3,Grumpier Old Men (1995),Comedy|Romance
2,4,Waiting to Exhale (1995),Comedy|Drama|Romance
3,5,Father of the Bride Part II (1995),Comedy
4,7,Sabrina (1995),Comedy|Romance
5,11,"American President, The (1995)",Comedy|Drama|Romance
6,12,Dracula: Dead and Loving It (1995),Comedy|Horror
7,18,Four Rooms (1995),Comedy
8,19,Ace Ventura: When Nature Calls (1995),Comedy
9,20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller


#### Display the TOP rated Comedy Movies (not aggregated)


In [31]:
moviesDF.filter(moviesDF.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .sort(col("rating").desc()) \
    .show(20, truncate=False)

+-------+----------------------------------------------------+---------------------------------------------------+------+------+
|movieId|title                                               |genres                                             |userId|rating|
+-------+----------------------------------------------------+---------------------------------------------------+------+------+
|31878  |Kung Fu Hustle (Gong fu) (2004)                     |Action|Comedy                                      |4     |5.0   |
|1234   |Sting, The (1973)                                   |Comedy|Crime                                       |10    |5.0   |
|34162  |Wedding Crashers (2005)                             |Comedy|Romance                                     |4     |5.0   |
|608    |Fargo (1996)                                        |Comedy|Crime|Drama|Thriller                        |4     |5.0   |
|296    |Pulp Fiction (1994)                                 |Comedy|Crime|Drama|Thriller        

#### Count of Comedy Movies Grouped By Rating 

In [32]:
moviesDF.filter(moviesDF.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .groupBy(col("rating")).count().orderBy("rating") \
    .show(20, truncate=False)

+------+-------+
|rating|count  |
+------+-------+
|0.5   |187410 |
|1.0   |386705 |
|1.5   |188873 |
|2.0   |773957 |
|2.5   |555232 |
|3.0   |2149866|
|3.5   |1222753|
|4.0   |2544453|
|4.5   |722715 |
|5.0   |1267220|
+------+-------+



#### MOST Rated Comedy Movies - No matter the rating

In [33]:
moviesDF.filter(moviesDF.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .groupBy(col("movieId")).count().orderBy("count", ascending=False) \
    .show(20, truncate=False)

+-------+-----+
|movieId|count|
+-------+-----+
|356    |97040|
|296    |92406|
|1      |68469|
|1270   |57492|
|608    |54500|
|588    |51827|
|380    |50864|
|4306   |46826|
|344    |45608|
|1580   |44287|
|1197   |42878|
|1136   |40866|
|1265   |40836|
|6539   |39674|
|367    |38699|
|153    |38647|
|597    |38504|
|500    |38484|
|231    |37823|
|4973   |37167|
+-------+-----+
only showing top 20 rows



#### Most Rated Comedy Movies, grouped by Movie & Rating


In [34]:
moviesDF.filter(moviesDF.genres.contains('Comedy')) \
    .join(ratingsDF, "movieId") \
    .groupBy(col("movieId"),col("title"), col("rating")).count().orderBy("count", ascending=False) \
    .show(20, truncate=False)

+-------+--------------------------------------+------+-----+
|movieId|title                                 |rating|count|
+-------+--------------------------------------+------+-----+
|296    |Pulp Fiction (1994)                   |5.0   |37458|
|356    |Forrest Gump (1994)                   |5.0   |32009|
|356    |Forrest Gump (1994)                   |4.0   |27799|
|1      |Toy Story (1995)                      |4.0   |22710|
|296    |Pulp Fiction (1994)                   |4.0   |22604|
|1270   |Back to the Future (1985)             |4.0   |19223|
|608    |Fargo (1996)                          |5.0   |18616|
|588    |Aladdin (1992)                        |4.0   |17150|
|153    |Batman Forever (1995)                 |3.0   |16892|
|608    |Fargo (1996)                          |4.0   |16635|
|1      |Toy Story (1995)                      |5.0   |16497|
|380    |True Lies (1994)                      |3.0   |16389|
|380    |True Lies (1994)                      |4.0   |15726|
|1197   

In [35]:
from pyspark.sql.functions import *

#### Top Rated Comedy Movies with most ratings

In [36]:
genre = "Comedy"

In [37]:
genreMovieDF = moviesDF.filter(moviesDF.genres.contains(genre))
    
moviesByRating_counts = genreMovieDF \
        .join(ratingsDF, "movieId") \
        .groupBy("movieId").count().alias("count").orderBy(desc("count"))
        #.groupBy("movieId").count().alias("ratings count").orderBy(desc("count"))

movieByRating_Full = genreMovieDF.join(moviesByRating_counts, "movieId") \
                                  .dropDuplicates().orderBy(desc("count"))
    
movieByRating_Full.show(20)

+-------+--------------------+--------------------+-----+
|movieId|               title|              genres|count|
+-------+--------------------+--------------------+-----+
|    356| Forrest Gump (1994)|Comedy|Drama|Roma...|97040|
|    296| Pulp Fiction (1994)|Comedy|Crime|Dram...|92406|
|      1|    Toy Story (1995)|Adventure|Animati...|68469|
|   1270|Back to the Futur...|Adventure|Comedy|...|57492|
|    608|        Fargo (1996)|Comedy|Crime|Dram...|54500|
|    588|      Aladdin (1992)|Adventure|Animati...|51827|
|    380|    True Lies (1994)|Action|Adventure|...|50864|
|   4306|        Shrek (2001)|Adventure|Animati...|46826|
|    344|Ace Ventura: Pet ...|              Comedy|45608|
|   1580|Men in Black (a.k...|Action|Comedy|Sci-Fi|44287|
|   1197|Princess Bride, T...|Action|Adventure|...|42878|
|   1136|Monty Python and ...|Adventure|Comedy|...|40866|
|   1265|Groundhog Day (1993)|Comedy|Fantasy|Ro...|40836|
|   6539|Pirates of the Ca...|Action|Adventure|...|39674|
|    367|    M

In [38]:
movieByRating_Full.count()

14961

In [39]:
movieByRating_Full.coalesce(1) \
      .write \
      .option("header","true") \
      .option("sep",",") \
      .mode("overwrite") \
      .csv("movie_output.csv")
#      .csv("file:///path/output/file") \

#### Get User Input

In [41]:
4
5# This Cell does not work on Jupyter unfortunately 
# due to Pyspark limited Python libraries integration
# This need py4j integration and a call to scala/java
# code for that is provided as a standalone pyton program


for row in sampleDF.rdd.collect(): 
    print("Please rate the following movie (1-5 (best), or 0 if not seen):\n" + row.title + ":")
    rate = int(input())

Please rate the following movie (1-5 (best), or 0 if not seen):
Mina Tannenbaum (1994):
5
Please rate the following movie (1-5 (best), or 0 if not seen):
Bang (1995):
4
Please rate the following movie (1-5 (best), or 0 if not seen):
Map of the World, A (1999):
4
Please rate the following movie (1-5 (best), or 0 if not seen):
Orphans of the Storm (1921):
4
Please rate the following movie (1-5 (best), or 0 if not seen):
Lucía, Lucía (Hija del caníbal, La) (2003):
5
Please rate the following movie (1-5 (best), or 0 if not seen):
Blood of Heroes, The (Salute of the Jugger, The) (1989):
4
Please rate the following movie (1-5 (best), or 0 if not seen):
Italian Job, The (1969):
1
Please rate the following movie (1-5 (best), or 0 if not seen):
Crimson Gold (Talaye sorgh) (2003):
4
Please rate the following movie (1-5 (best), or 0 if not seen):
Damned, The (La Caduta degli dei) (1969):
1
Please rate the following movie (1-5 (best), or 0 if not seen):
To Hell and Back (1955):
2
Please rate the f

In [42]:
joinDf = moviesDF.join(ratingsDF, "movieId")
joinDf.show(10, False)

+-------+------------------------------------------------+--------------------------------+------+------+
|movieId|title                                           |genres                          |userId|rating|
+-------+------------------------------------------------+--------------------------------+------+------+
|307    |Three Colors: Blue (Trois couleurs: Bleu) (1993)|Drama                           |1     |3.5   |
|481    |Kalifornia (1993)                               |Drama|Thriller                  |1     |3.5   |
|1091   |Weekend at Bernie's (1989)                      |Comedy                          |1     |1.5   |
|1257   |Better Off Dead... (1985)                       |Comedy|Romance                  |1     |4.5   |
|1449   |Waiting for Guffman (1996)                      |Comedy                          |1     |4.5   |
|1590   |Event Horizon (1997)                            |Horror|Sci-Fi|Thriller          |1     |2.5   |
|1591   |Spawn (1997)                         

In [43]:
ratingsDF.distinct().groupBy("rating").count()

DataFrame[rating: double, count: bigint]

<hr style="border:1px solid blue"></hr>

### MODELING PART

### Model Training

### Splitting data into training & test sets

In [44]:

trainingDF,testDF = ratingsDF.randomSplit([0.8, 0.2], seed=12345)

# Got 1000209 ratings from 6040 users on 3883 movies.
#print("Training {}, test {}.".format(trainingDF.count(), testDF.count()))

### Timing Utility

In [45]:
from time import time
from datetime import timedelta

class T():
    def __enter__(self):
        self.start = time()
    def __exit__(self, type, value, traceback):
        self.end = time()
        elapsed = self.end - self.start
        print(str(timedelta(seconds=elapsed)))

In [46]:
trainingDF.cache()

DataFrame[userId: int, movieId: int, rating: double]

In [47]:
testDF.cache()

DataFrame[userId: int, movieId: int, rating: double]

In [48]:
trainingDF.select("userId").distinct().count()

281940

In [49]:
testDF.select("userId").distinct().count()

264084

At this point it is interesting to see that there might be users on the test dataset  
that have no occurence in the training dataset, as the figures below can possibly suggest

### Training ALS model on the data


In [50]:

from pyspark.ml.recommendation import ALS

als = ALS(maxIter=5,
          regParam=0.01, 
          implicitPrefs=False, 
          userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating", 
          coldStartStrategy="drop")

In [51]:
# Fit Model (just a test - DO NOT RUN if you still need to cross-validate)

with T():
    model = als.fit(trainingDF)

0:04:38.872982


<hr style="border:1px solid blue"></hr>

### Cross Validation & Model Selection

In [52]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# WARNING : THIS CAN ESCALATE VERY QUICKLY, EVEN WITH ONLY TWO MODELS TO BE TESTED
# UNLESS YOU ARE ABLE TO NARROW DOWN AND LOWER ITERATIONS CONSIDERABLY, 
# YOU WOULD BETTER AVOID THIS STEP ALL TOGETHER

# NOTE THAT USING AN RMSE MINIMIZING LOOP CAN ALSO DO THE JOB

#param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [10, 50, 100, 150]) \
#            .addGrid(als.regParam, [.01, .05, .1, .15]) \
#            .build()

#param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [50, 100]) \
#            .addGrid(als.regParam, [.05, .1]) \
#            .build()

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [50]) \
            .addGrid(als.regParam, [.05]) \
            .build()

In [53]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 

print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  1


In [54]:
cv = CrossValidator(estimator=als, \
                    estimatorParamMaps=param_grid, \
                    evaluator=evaluator, \
                    numFolds=5 , \
                    parallelism=4) # this last parallelism param is crucial for increasing performance

### Fit The Cross Validator 
#### *(Optional - VERY Time Consuming)*

In [78]:
# WARNING : Running this from a notebook hooked to Spark through Livy 
# will end up in a timeout after waiting for a long time
# the job will still be running but it will be abruptly be killed due to a timeout setting
# you need to setup livy.server.session.timeout and extend it to 1h  in livy.conf on the server, and restart livy
# livy.server.session.state-retain.sec = 600s => move to 3600s 
# location : /etc/livy/conf/livy.conf

# check spark web UI
# check generated DAGs
# check memory usage

#cvModel = cv.fit(trainingDF)

<hr style="border:1px solid blue"></hr>

### Model Evaluation

In [57]:
# Evaluate the model by computing the RMSE on the Rating Predictions established for test data

predictions = model.transform(testDF)


In [58]:
predictions.columns

['userId', 'movieId', 'rating', 'prediction']

In [59]:
trainingDF.filter(testDF.userId == "12").show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    12|     45|   3.0|
|    12|    105|   3.0|
|    12|    186|   2.5|
|    12|    596|   2.0|
|    12|   1090|   3.5|
|    12|   1094|   2.0|
|    12|   1380|   3.0|
|    12|   1639|   2.5|
|    12|   2001|   3.0|
|    12|   2012|   1.5|
|    12|   2100|   2.5|
|    12|   2302|   2.5|
|    12|   2353|   2.5|
|    12|   2406|   3.0|
|    12|   2502|   3.5|
|    12|   2657|   2.0|
|    12|   3897|   3.0|
+------+-------+------+



In [60]:
testDF.filter(testDF.userId == "12").show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    12|     48|   2.5|
+------+-------+------+



In [61]:
testDF.show(10, False)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |1591   |1.5   |
|2     |2108   |3.5   |
|2     |2243   |4.5   |
|2     |2915   |3.5   |
|3     |1645   |4.0   |
|4     |11     |3.5   |
|4     |20     |2.5   |
|4     |70     |3.0   |
|4     |186    |2.0   |
|4     |204    |3.5   |
+------+-------+------+
only showing top 10 rows



In [62]:
testDF.groupBy("userId").count().show(10, False)

+------+-----+
|userId|count|
+------+-----+
|148   |10   |
|463   |19   |
|471   |68   |
|496   |4    |
|833   |3    |
|1088  |1    |
|1238  |1    |
|1580  |6    |
|1591  |11   |
|1645  |39   |
+------+-----+
only showing top 10 rows



In [63]:
with T():
    predictions.show(10, False)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|1     |1591   |1.5   |2.2372599 |
|3     |1645   |4.0   |3.7062025 |
|4     |1591   |3.5   |2.5644405 |
|4     |3997   |2.0   |2.0732276 |
|19    |1088   |3.0   |3.2564762 |
|26    |3175   |5.0   |3.7172058 |
|36    |6620   |4.5   |3.8806386 |
|46    |1580   |4.0   |3.2685893 |
|51    |1580   |3.0   |3.9766283 |
|67    |1580   |1.0   |3.3921664 |
+------+-------+------+----------+
only showing top 10 rows

0:01:40.047394


In [64]:
predictions.groupBy("userId").count().show(10, False)

+------+-----+
|userId|count|
+------+-----+
|6336  |211  |
|11141 |67   |
|16339 |48   |
|18051 |110  |
|20924 |35   |
|21700 |27   |
|24171 |45   |
|28088 |65   |
|28146 |10   |
|31261 |1    |
+------+-----+
only showing top 10 rows



### Evaluate using Regression Evaluator

In [65]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Lower values of RMSE indicate better fit

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8419904615630303


<hr style="border:1px solid blue"></hr>

### Model Utilization

#### Generate recommendations for each user

In [66]:
# CAUTION : This takes too much time to compute
# Use subsets instead (cell below)

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

#### Generate recommendations for a subset of user

In [67]:
# Generate top 10 movie recommendations for a specified set of users
users = ratingsDF.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

# Generate top 10 user recommendations for a specified set of movies
movies = ratingsDF.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [68]:
userSubsetRecs.show(10, False)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                            |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471   |[{141532, 8.394004}, {119165, 8.369208}, {154876, 8.369208}, {159467, 8.289714}, {183917, 7.811831}, {135310, 7.691615}, {177921, 7.691615}, {143833, 7.691615}, {178353, 7.691615}, {186233, 7.691615}]   |
|463   |[{182521, 8.265824}, {74404, 7.719043}, {173945, 7.282906}, {92046, 6.9142375}, {58207, 6.8270464}, {159467, 6.8158584}, {178397, 6.6647997}

In [69]:
userSubsetRecs.limit(10).toPandas()

Unnamed: 0,userId,recommendations
0,471,"[(141532, 8.394003868103027), (119165, 8.36920..."
1,463,"[(182521, 8.265824317932129), (74404, 7.719042..."
2,148,"[(173945, 10.885973930358887), (83569, 10.2956..."


<hr style="border:1px solid blue"></hr>

### Data Formatting

In [70]:
# Get model recommendation for users based on their rating 
# You will notice that the Returned Dataframe has nested values 
# DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]
# You will also notice that predicted ratings are all over the place
# This is not strictly an issue as it is not used "directly" and it can be ignored. 
# For more info see: https://stackoverflow.com/q/29051520/426332

recoDF = userSubsetRecs.limit(10)
#.toPandas()

#### Data Formatting & Join for staging & presentation

In [71]:

# Denormalize Recommendation Dataframe & explode the inner list so it can be selectable as a column   
# and filter out the rating colums as we do not need it

recoDF.select(col("userId"),explode(col("recommendations"))) \
      .select(col("userId"),col("col.movieId")) \
      .join(moviesDF,"movieId") \
      .orderBy("userId") \
      .select("userId", "movieId", "title") \
      .toPandas()

Unnamed: 0,userId,movieId,title
0,148,77344,Chizuko's Younger Sister (Futari) (1991)
1,148,83569,Eros Plus Massacre (Erosu purasu Gyakusatsu) (...
2,148,141532,Retrieval (2006)
3,148,142562,The Ceremony (2014)
4,148,156684,A Good American (2014)
5,148,159213,The Chronicles of Evil (2015)
6,148,159305,Lezione ventuno (2008)
7,148,167106,Breaking a Monster (2015)
8,148,173945,The Wearing of the Grin (1951)
9,148,178397,Jai Ho (2014)


In [72]:
movieSubSetRecs.show(10, False)

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                                           |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471    |[{143251, 7.6272025}, {95438, 6.987266}, {154312, 6.835834}, {80897, 6.7905974}, {80160, 6.771138}, {170242, 6.6508036}, {103220, 6.61769}, {46397, 6.6120667}, {1730, 6.6093507}, {153630, 6.586931}]    |
|1591   |[{279618, 7.1030264}, {143251, 6.734741}, {200417, 6.6052356}, {237193, 6.472636}, {21742, 6.439523}, {35476, 6.249999}, {47480, 6.0774145}

In [73]:
from pyspark.sql.functions import col

# Recommendations for a specific user
aUserId = 12
recommandations = userRecs.filter(col("userId") == aUserId)
recommandations.show(1, False)

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                           |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|12    |[{152003, 7.955855}, {141532, 7.6469603}, {169682, 7.5903697}, {178397, 7.372107}, {182521, 7.1692166}, {52108, 7.095045}, {188923, 7.034578}, {159467, 7.033757}, {154876, 7.028566}, {119165, 7.028566}]|
+------+------------------------------------------------------------------------------------------------------------------------------------------------

In [75]:
from pyspark.sql.functions import explode

# Let's flatten the movie recommandations and look in detail
userRecommandations = recommandations.select(
  explode(col("recommendations").movieId).alias("movieId")
)

print("Recommandations for user {} :".format(aUserId))

moviesDF.join(userRecommandations, "movieId").show(10, False)

Recommandations for user 12 :
+-------+--------------------------------------------------------+------------------+
|movieId|title                                                   |genres            |
+-------+--------------------------------------------------------+------------------+
|152003 |Greenery Will Bloom Again (2014)                        |War               |
|141532 |Retrieval (2006)                                        |(no genres listed)|
|169682 |Fare (2017)                                             |Drama|Thriller    |
|178397 |Jai Ho (2014)                                           |Action|Drama      |
|182521 |Thakara (1979)                                          |(no genres listed)|
|52108  |It's Impossible to Learn to Plow by Reading Books (1988)|Drama             |
|188923 |49 Pulses (2017)                                        |(no genres listed)|
|159467 |Fifi Howls from Happiness (2013)                        |Documentary       |
|154876 |Manufractur (19

In [77]:
# Ratings from the user

ratingsDF.filter(col("userId") == aUserId) \
    .join(moviesDF, "movieId") \
    .sort(col("rating").desc()) \
    .show(100, truncate=False)

+-------+------+------+-------------------------------------+----------------------------------------+
|movieId|userId|rating|title                                |genres                                  |
+-------+------+------+-------------------------------------+----------------------------------------+
|1090   |12    |3.5   |Platoon (1986)                       |Drama|War                               |
|2502   |12    |3.5   |Office Space (1999)                  |Comedy|Crime                            |
|1380   |12    |3.0   |Grease (1978)                        |Comedy|Musical|Romance                  |
|45     |12    |3.0   |To Die For (1995)                    |Comedy|Drama|Thriller                   |
|2001   |12    |3.0   |Lethal Weapon 2 (1989)               |Action|Comedy|Crime|Drama               |
|2406   |12    |3.0   |Romancing the Stone (1984)           |Action|Adventure|Comedy|Romance         |
|105    |12    |3.0   |Bridges of Madison County, The (1995)|Drama|Romanc

<hr style="border:1px solid blue"></hr>

### Log Model Experiments with MLFlow


#### Make SURE to read carefully the MLFLOW REAMDE FILE first


In [79]:
# import mlflow
# import mlflow.spark

In [80]:
#not necessary as it defaults to /mlruns
# mlflow.set_tracking_uri("file:///path/to/mlruns")

In [81]:
# mlflow.set_tracking_uri("file:///mlruns")

In [82]:
# mlflow.start_run()

In [83]:
# mlflow.spark.log_model(model, "ALSmodel_Lite8")

In [84]:
# print("Model saved in run %s" % mlflow.active_run().info.run_uuid)

In [85]:
# mlflow.end_run()

**Run MLFlow UI to check your model**
```
mlflow ui --backend-store-uri /mlrun --host 0.0.0.0&
```

go to http://YOUR.IP.ADD.RESS:5000 :

### Serve Model with MLFlow

```console
# Serve & Curl Request Examples  
(Make SURE to read carefully the MLFLOW REAMDE FILE first)

mlflow models serve -m /mlruns/0/6c8050941d0744b8ac3652ff22d40983/artifacts/ALSmodel_Lite2 -h 0.0.0.0 --port 9999 --no-conda


curl -X POST localhost:9999/invocations -H 'Content-Type: application/json; format=pandas-split' -d '{"columns":["userId","movieId","rating"], "data":[[1311,144210,5,1604450652]]}'  

curl --request POST http://localhost:9999/invocations --header 'Content-Type: application/json; format=pandas-split' --data @mlf_data.json

curl -X POST http://localhost:9999/invocations -H 'Content-Type: application/json; format=pandas-split' -d @mlf_data.json
```

