### ALS - Movies Recommendation
Please experiment with the ALS algorithm, you can use the following notebook as starting template: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/42740061589303/2103584303593622/8105921225255291/latest.html

Try to train and run the model on the 20 Million movie ratings dataset instead of the 1 Million one. Files:

/databricks-datasets/cs110x/ml-20m/data-001/movies.csv

/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv

Test with various values of:

ranks
regularization parameter
number of iterations
Compare the models and find the best model based on the error value (i.e RMSE).

The documenation of the algorithm can be found at: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation

Prepare a table with 10 of your own ratings for the movies that you select and run the code that generates recommendations (predicted ratings) of movies based on your individual set of 10 rated movies

From the obtained results show 50 movies with highest ratings.
For each of the top 50 recommended movies next to the rating proposed by ALS show the average rating for all users from the dataset.

#### 1. Try to train and run the model on the 20 Million movie ratings dataset instead of the 1 Million one.

In [0]:
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-datasets/COVID/,COVID/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0


In [0]:
%fs
ls /databricks-datasets/cs110x/ml-20m/data-001

path,name,size
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/README.txt,README.txt,8964
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/links.csv,links.csv,569517
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/links.csv.gz,links.csv.gz,245973
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/movies.csv,movies.csv,1397542
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/movies.csv.gz,movies.csv.gz,498839
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv,ratings.csv,533444411
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv.gz,ratings.csv.gz,132656084
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/tags.csv,tags.csv,16603996
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/tags.csv.gz,tags.csv.gz,4787917


In [0]:
%fs
head /databricks-datasets/cs110x/ml-20m/data-001/movies.csv

In [0]:
%fs
head /databricks-datasets/cs110x/ml-20m/data-001/ratings.csv

In [0]:
from pyspark.sql.types import *
 
movies_df_schema = StructType([
   StructField('movieId', IntegerType()),
   StructField('title', StringType()),
   StructField('genres', StringType())
])
 
ratings_df_schema = StructType([
   StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', FloatType())
])

In [0]:
# Loading the data
# File location and type
file_location = "/databricks-datasets/cs110x/ml-20m/data-001/movies.csv"
file_type = 'csv'

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
 
# The applied options are for CSV files. For other file types, these will be ignored.
df_movies = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(movies_df_schema) \
  .load(file_location)

display(df_movies)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
# File location and type
file_location = "/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv"
file_type = 'csv'

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
 
# The applied options are for CSV files. For other file types, these will be ignored.
df_ratings = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(ratings_df_schema) \
  .load(file_location)


display(df_ratings)

userId,movieId,rating
1,2,3.5
1,29,3.5
1,32,3.5
1,47,3.5
1,50,3.5
1,112,3.5
1,151,4.0
1,223,4.0
1,253,4.0
1,260,4.0


In [0]:
print(df_movies.count())
print(df_ratings.count())

In [0]:
# Let's just check how many ratings there is for certain movie
# I will check one of my favorite movie, 'Clueless (1995)', which has movieId 39

df_ratings.filter(df_ratings['movieId']==39).count()

In [0]:
# Randomly split Dataset for train, valid, test

(s60, s20a, s20b) = df_ratings.randomSplit([0.6, 0.2, 0.2])
df_training = s60.cache()
df_validation = s20a.cache()
df_test = s20b.cache()

In [0]:
df_training.show(100)

#### 2. Test with various values of: ranks, regularization parameter, number of iterations. Compare the models and find the best model based on the error value (i.e RMSE).

In [0]:
# Now, we've got all the data prepared.
# Let's start using Collaborative Filtering approach

from pyspark.ml.recommendation import ALS
 
als = ALS()

# setting number of iterations and regularization parameter
als.setMaxIter(5) \
  .setRegParam(0.1) \
  .setUserCol('userId') \
  .setItemCol('movieId') \
  .setRatingCol('rating')

# Create RMSE evaluator to evaluate performance of prediction model later below
from pyspark.ml.evaluation import RegressionEvaluator
reg_eval = RegressionEvaluator(predictionCol='prediction', labelCol='rating', metricName='rmse')

#we will actually find the best performing model by comparing model with different rank
ranks = [3, 6, 9, 12]
errors = [0, 0, 0, 0]
models = [0, 0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1

for rank in ranks:
  als.setRank(rank)
  
  # generate a model using ALS from training data
  model = als.fit(df_training)
  
  # run the model
  df_predicted_ratings = model.transform(df_validation)
  
  # filter NaN value of ratings out from prediction
  df_predicted_ratings = df_predicted_ratings.filter(df_predicted_ratings.prediction != float('nan'))
  
  # Run RMSE evaluator created above
  error = reg_eval.evaluate(df_predicted_ratings)
  errors[err] = error
  models[err] = model
  print('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print('The best model was trained with rank %s' % ranks[best_rank])
my_ratings_model = models[best_rank]  #the best model will be my ratings model

#### 3. Prepare a table with 10 of your own ratings for the movies that you select and run the code that generates recommendations (predicted ratings) of movies based on your individual set of 10 rated movies

Prepare a table with 10 of my own ratings for the movies I select

In [0]:
my_user_id = 0
 
my_rated_movies = [
    (my_user_id, 1, 4), # Toystory (1995)
    (my_user_id, 39, 5), # Clueless (1995)
    (my_user_id, 586, 5), # Home Alone (1990)
    (my_user_id, 17, 4), # Sense and Sensibility (1995)
    (my_user_id, 919, 3), # Wizard of Oz, The (1939)
    (my_user_id, 928, 4), # Rebecca (1940)
    (my_user_id, 1013, 5), # Parent Trap, The (1961)
    (my_user_id, 949, 2), # East of Eden (1955)
    (my_user_id, 215, 5), # Before Sunrise (1995)
    (my_user_id, 837, 5), # Matilda (1996)
]
 
df_my_ratings = sqlContext.createDataFrame(my_rated_movies, ['userId', 'movieId', 'rating'])
 
display(df_my_ratings)

userId,movieId,rating
0,1,4
0,39,5
0,586,5
0,17,4
0,919,3
0,928,4
0,1013,5
0,949,2
0,215,5
0,837,5


In [0]:
# Add my movie ratings data to training dataset
print(df_training.count()) # original training data set counting
df_training_and_my = df_training.unionAll(df_my_ratings)
print(df_training_and_my.count()) # after joining with my ratings data counting, +10 increased


In [0]:
# Train my prediction model

als = ALS()
als.setMaxIter(5) \
  .setRegParam(0.1) \
  .setUserCol('userId') \
  .setItemCol('movieId') \
  .setRatingCol('rating') \
  .setPredictionCol('prediction') \
  .setRank(12) #based on result in question 2, rank = 12 (the best model I chose above based on smallest RMSE)

my_ratings_model = als.fit(df_training_and_my)

In [0]:
# create list of movie IDs of movies i rated
my_rated_movie_ids = [x[1] for x in my_rated_movies]
print(my_rated_movie_ids)

# remove 10 movies I rated from original df_movies dataset before prediction process
df_not_rated_movies = df_movies.filter(~df_movies['movieId'].isin(my_rated_movie_ids))
print(df_not_rated_movies.count()) # so counting result is -10 from previous # of movies 


In [0]:
from pyspark.sql.functions import lit
df_my_unrated_movies = df_not_rated_movies.withColumn('userId', lit(my_user_id))
display(df_my_unrated_movies) # you can see the data table which excepted 10 movies that I already rated 

movieId,title,genres,userId
2,Jumanji (1995),Adventure|Children|Fantasy,0
3,Grumpier Old Men (1995),Comedy|Romance,0
4,Waiting to Exhale (1995),Comedy|Drama|Romance,0
5,Father of the Bride Part II (1995),Comedy,0
6,Heat (1995),Action|Crime|Thriller,0
7,Sabrina (1995),Comedy|Romance,0
8,Tom and Huck (1995),Adventure|Children,0
9,Sudden Death (1995),Action,0
10,GoldenEye (1995),Action|Adventure|Thriller,0
11,"American President, The (1995)",Comedy|Drama|Romance,0


In [0]:
# Run my prediction model and show predicted ratings as a result

spark.conf.set("spark.sql.crossJoin.enabled", "true")
 
df_raw_predicted_ratings = my_ratings_model.transform(df_my_unrated_movies)
 
display(df_raw_predicted_ratings) # we can see there are still some NaN values

movieId,title,genres,userId,prediction
148,"Awfully Big Adventure, An (1995)",Drama,0,2.1780214
463,Guilty as Sin (1993),Crime|Drama|Thriller,0,3.6027539
471,"Hudsucker Proxy, The (1994)",Comedy,0,3.506189
496,What Happened Was... (1994),Comedy|Drama|Romance|Thriller,0,2.0728354
833,High School High (1996),Comedy,0,3.4840844
1088,Dirty Dancing (1987),Drama|Musical|Romance,0,4.513625
1238,Local Hero (1983),Comedy,0,3.4989343
1342,Candyman (1992),Horror|Thriller,0,2.7770896
1580,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi,0,4.029814
1591,Spawn (1997),Action|Adventure|Sci-Fi|Thriller,0,2.8748188


In [0]:
# so let's remove those NaN values
df_predicted_ratings = df_raw_predicted_ratings.filter(df_raw_predicted_ratings['prediction'] != float('nan'))
display(df_predicted_ratings)

movieId,title,genres,userId,prediction
2,Jumanji (1995),Adventure|Children|Fantasy,0,4.0101132
3,Grumpier Old Men (1995),Comedy|Romance,0,3.8566751
4,Waiting to Exhale (1995),Comedy|Drama|Romance,0,3.595026
5,Father of the Bride Part II (1995),Comedy,0,4.120101
6,Heat (1995),Action|Crime|Thriller,0,3.58305
7,Sabrina (1995),Comedy|Romance,0,4.0633836
8,Tom and Huck (1995),Adventure|Children,0,4.192077
9,Sudden Death (1995),Action,0,3.2609265
10,GoldenEye (1995),Action|Adventure|Thriller,0,3.6015778
11,"American President, The (1995)",Comedy|Drama|Romance,0,4.196409


#### 4. From the obtained results show 50 movies with highest ratings.

In [0]:
print ('50 movies with highest ratings recommended(predicted) for me:')
display(df_predicted_ratings.sort("prediction", ascending=False).head(50))

movieId,title,genres,userId,prediction
107743,"Class of 92, The (2013)",Documentary,0,7.058151245117188
106160,Bekas (2012),Drama,0,6.300572872161865
77736,Crazy Stone (Fengkuang de shitou) (2006),Comedy|Crime,0,6.228982925415039
107780,Cats (1998),Musical,0,6.096003532409668
32954,"Chant of Jimmy Blacksmith, The (1978)",Drama,0,6.052312850952148
53839,East Side Story (1997),Documentary|Musical,0,6.042161464691162
41889,Lili (1953),Drama|Musical|Romance,0,5.998879432678223
123109,P.U.N.K.S (1999),Children|Comedy|Sci-Fi,0,5.998711585998535
88466,Broken Sky (El cielo dividido) (2006),Drama,0,5.910849571228027
49115,"Boynton Beach Bereavement Club, The (2005)",Comedy|Drama,0,5.870808124542236


#### 5. For each of the top 50 recommended movies next to the rating proposed by ALS show the average rating for all users from the dataset.

In [0]:
from pyspark.sql import functions as f
avg_ratings = df_ratings.groupBy('movieId').agg(f.avg(df_ratings.rating).alias("average"))
display(avg_ratings)

movieId,average
3997,2.0703468490473864
1580,3.55831928049466
3918,2.918940609951846
2366,3.54926814546552
3175,3.600717102904267
4519,3.2463842975206614
1591,2.6201712654614653
471,3.664181753638623
36525,3.482891360136869
44022,3.334077079107505


In [0]:
import pandas as pd
avg_ratings = avg_ratings.withColumnRenamed("movieId","Id")
display(avg_ratings)

Id,average
3997,2.0703468490473864
1580,3.55831928049466
3918,2.918940609951846
2366,3.54926814546552
3175,3.600717102904267
4519,3.2463842975206614
1591,2.6201712654614653
471,3.664181753638623
36525,3.482891360136869
44022,3.334077079107505


In [0]:
# change column name of avg_ratings before joining the table. Otherwise it will be duplicate columns.
import pandas as pd
avg_ratings = avg_ratings.withColumnRenamed("movieId","Id") 

# join top 50 movies table and avg_rating table
with_avg = df_predicted_ratings.join(avg_ratings,df_predicted_ratings["movieId"]==avg_ratings["Id"])
df_predicted_ratings_with_avg = with_avg.drop("Id")
display(df_predicted_ratings_with_avg)

movieId,title,genres,userId,prediction,average
148,"Awfully Big Adventure, An (1995)",Drama,0,2.1780214,2.8893557422969187
463,Guilty as Sin (1993),Crime|Drama|Thriller,0,3.6027539,2.8
471,"Hudsucker Proxy, The (1994)",Comedy,0,3.506189,3.664181753638623
496,What Happened Was... (1994),Comedy|Drama|Romance|Thriller,0,2.0728354,3.2893462469733654
833,High School High (1996),Comedy,0,3.4840844,2.725998598458304
1088,Dirty Dancing (1987),Drama|Musical|Romance,0,4.513625,3.209207300463089
1238,Local Hero (1983),Comedy,0,3.4989343,3.96665623043206
1342,Candyman (1992),Horror|Thriller,0,2.7770896,2.949072666463971
1580,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi,0,4.029814,3.55831928049466
1591,Spawn (1997),Action|Adventure|Sci-Fi|Thriller,0,2.8748188,2.6201712654614653
