# Movie Lens Recommender 🎥 - PySpark ⭐
*Credits* : Hadi Jamal Ahmad - Julie Ngan - M2 BDML-APP - EFREI Paris

*Goal* : recommend movies to a watcher based on the clustered movies and clustered users with collaborative filtering ➡ recommend a watcher based on what others might like whore are near to him in preferences.

⏰ Save the datasets into a data folder on your google Drive

In [1]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
os.environ["SPARK_VERSION"] = "3.0.1"                        

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.0.1-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [6]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.ml.feature import MinMaxScaler, StandardScaler,VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id,desc, stddev, mean, min, max, avg, col,isnan, when, count, from_json, year, month, dayofmonth, explode
from scipy.stats import pearsonr
from pyspark.mllib.stat import Statistics
import ast
from pyspark.sql.types import *
from pyspark.ml.regression import LinearRegression
from sklearn.model_selection import train_test_split  
from pyspark import SparkContext

import datetime
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
def missing_values(sparkdf):
  sparkdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sparkdf.columns if c != 'Date']).show()

def read(filename):
  df = spark.read.format("csv").options(header="true").load(filename)
  missing_values(df)
  return df

# TO DO drop duplicates
def preprocess():
  print('******************** RATINGS *******************')
  ratings = read(url + "ratings_small.csv")
  ratings = ratings.withColumn("userId", ratings.userId.cast('int'))
  ratings = ratings.withColumn("movieId", ratings.movieId.cast('int'))
  ratings = ratings.withColumn("rating", ratings.rating.cast('double'))
  ratings.show(20, False) 
  ratings.summary().show() 
  print('There are ', ratings.count(), ' rows ')

  print('******************** KEYWORDS ******************')
  keywords = read(url + "keywords.csv")
  keywords = keywords.na.drop(thresh=2,how="all")
  keywords_scheme=ArrayType(StructType([
    StructField('name',StringType(),nullable=True),
    StructField('id',IntegerType(),nullable=True)
  ]))
  keywords= keywords.withColumn('keywords_name',from_json(col('keywords'),keywords_scheme).getField("name").alias('keywords_name'))
  keywords = keywords.withColumn('keywords_id',from_json(col('keywords'),keywords_scheme).getField("id").alias('keywords_id'))
  keywords = keywords.drop('keywords')
  keywords = keywords.na.drop(thresh=2,how="all")
  keywords = keywords.select("id",explode(keywords.keywords_name))
  keywords.show(20, False) 
  keywords.summary().show() 
  print('There are ', keywords.count(), ' rows \n\n')

  print('******************* LINKS *******************')
  links = read(url + "links_small.csv")
  links = links.filter(~links.tmdbId.isNull())
  links = links.na.drop(thresh=3,how="all")
  #links.show(20, False) 
  #links.summary().show() 
  print('There are ', links.count(), ' rows \n\n')
  
  print('******************* MOVIES ******************')
  movies_metadata = read(url + "movies_metadata.csv")

  companies_scheme=ArrayType(StructType([
    StructField('name',StringType(),nullable=True),
    StructField('id',IntegerType(),nullable=True)
  ]))
  movies_metadata= movies_metadata.withColumn('production_companies_name',from_json(col('production_companies'),companies_scheme).getField("name").alias('production_companies_name'))
  movies_metadata = movies_metadata.withColumn('production_companies_id',from_json(col('production_companies'),companies_scheme).getField("id").alias('production_companies_id'))
  
  countries_scheme=ArrayType(StructType([
    StructField('iso_3166_1',StringType(),nullable=True),
    StructField('name',StringType(),nullable=True)
  ]))
  movies_metadata= movies_metadata.withColumn('production_countries_name',from_json(col('production_countries'),countries_scheme).getField("name").alias('production_companies_name'))
  movies_metadata = movies_metadata.withColumn('production_iso_id',from_json(col('production_countries'),countries_scheme).getField("iso_3166_1").alias('production_companies_iso'))

  language_scheme=ArrayType(StructType([
  StructField('iso_639_1',StringType(),nullable=True),
  StructField('name',StringType(),nullable=True)
  ]))
  movies_metadata= movies_metadata.withColumn('spoken_languages_name',from_json(col('spoken_languages'),language_scheme).getField("name").alias('spoken_languages_name'))
  movies_metadata = movies_metadata.withColumn('spoken_languages_iso',from_json(col('spoken_languages'),language_scheme).getField("iso_639_1").alias('spoken_languages_iso'))

  collection_scheme=ArrayType(StructType([
  StructField('id',IntegerType(),nullable=True),
  StructField('name',StringType(),nullable=True)
  ]))
  movies_metadata= movies_metadata.withColumn('belongs_to_collection_name',from_json(col('belongs_to_collection'),collection_scheme).getField("name").alias('belongs_to_collection_name'))
  movies_metadata = movies_metadata.withColumn('belongs_to_collection_id',from_json(col('belongs_to_collection'),collection_scheme).getField("id").alias('belongs_to_collection_id'))

  genres_scheme=ArrayType(StructType([
  StructField('id',IntegerType(),nullable=True),
  StructField('name',StringType(),nullable=True)
  ]))
  movies_metadata= movies_metadata.withColumn('genres_name',from_json(col('genres'),genres_scheme).getField("name").alias('genres_name'))
  movies_metadata = movies_metadata.withColumn('genres_id',from_json(col('genres'),genres_scheme).getField("id").alias('genres_id'))

  drop_list = ['production_companies', 'production_countries', 'spoken_languages', 'belongs_to_collection', 'genres', 'adult', 'video', 'imdb_id', 'status', 'runtime', 'homepage', 'poster_path', 'original_title','overview'
                , 'production_companies_id','belongs_to_collection_id','genres_id', 'spoken_languages_iso', 'original_language', 'tagline', 'timestamp']
  movies_metadata = movies_metadata.select([column for column in movies_metadata.columns if column not in drop_list])

  # fill null values in 'revenue' column with mean_revenue
  # replace  null values in revenue column with the mean of revenue values instead, to correct errors of having not enough revenue values to do a linear regression
  # calculate mean of the 'revenue' column
  mean_revenue = movies_metadata.agg({"revenue": "mean"}).collect()[0][0]
  movies_metadata = movies_metadata.na.fill({'revenue': mean_revenue})

  mean_budget = movies_metadata.agg({"budget": "mean"}).collect()[0][0]
  movies_metadata = movies_metadata.na.fill({'budget': mean_budget})

  mean_vote_avg = movies_metadata.agg({"vote_average": "mean"}).collect()[0][0]
  movies_metadata = movies_metadata.na.fill({'vote_average': mean_vote_avg})

  mean_vote_count = movies_metadata.agg({"vote_count": "mean"}).collect()[0][0]
  movies_metadata = movies_metadata.na.fill({'vote_count': mean_vote_count})

  movies_metadata = movies_metadata.na.drop(thresh=14 , how='all')
  movies_metadata = movies_metadata.withColumn("budget", movies_metadata.budget.cast('integer'))
  movies_metadata = movies_metadata.withColumn("id", movies_metadata.id.cast('integer'))
  movies_metadata = movies_metadata.withColumn("revenue", movies_metadata.revenue.cast('integer'))
  movies_metadata = movies_metadata.withColumn("vote_average", movies_metadata.vote_average.cast('double'))
  movies_metadata = movies_metadata.withColumn("vote_count", movies_metadata.vote_count.cast('integer'))
  movies_metadata = movies_metadata.withColumnRenamed("id","movieId")
  movies_metadata = movies_metadata.withColumn('popularity', movies_metadata.popularity.cast('double'))
  #movies_metadata.show(20, False)
  #movies_metadata.summary().show() 
  print('There are ', movies_metadata.count(), ' rows \n\n')

  print('***************** CREDITS ***************')
  credits = read(url + 'credits.csv')
  credits = credits.na.drop(thresh=3, how='all')

  cast_scheme=ArrayType(StructType([
  StructField('cast_id',IntegerType(),nullable=True),
  StructField('character',StringType(),nullable=True),
  StructField('credit_id',StringType(),nullable=True),
  StructField('gender',IntegerType(),nullable=True),
  StructField('id',IntegerType(),nullable=True),
  StructField('name',StringType(),nullable=True),
  StructField('order',IntegerType(),nullable=True),
  StructField('profile_path',StringType(),nullable=True)
  ]))
  credits= credits.withColumn('cast_id',from_json(col('cast'),cast_scheme).getField("cast_id").alias('cast_id'))
  credits = credits.withColumn('cast_character',from_json(col('cast'),cast_scheme).getField("character").alias('cast_character'))
  credits= credits.withColumn('credit_id',from_json(col('cast'),cast_scheme).getField("credit_id").alias('credit_id'))
  credits = credits.withColumn('cast_gender',from_json(col('cast'),cast_scheme).getField("gender").alias('cast_gender'))  
  credits= credits.withColumn('cast_id_id',from_json(col('cast'),cast_scheme).getField("id").alias('cast_id'))
  credits = credits.withColumn('cast_name',from_json(col('cast'),cast_scheme).getField("name").alias('cast_name'))  
  credits= credits.withColumn('cast_order',from_json(col('cast'),cast_scheme).getField("order").alias('cast_order'))
  credits = credits.withColumn('cast_profile_path',from_json(col('cast'),cast_scheme).getField("profile_path").alias('cast_profile_path'))

  crew_scheme=ArrayType(StructType([
  StructField('credit_id',StringType(),nullable=True),
  StructField('department',StringType(),nullable=True),
  StructField('genre',IntegerType(),nullable=True),
  StructField('id',IntegerType(),nullable=True),
  StructField('job',StringType(),nullable=True),
  StructField('name',StringType(),nullable=True),
  StructField('profile_path',StringType(),nullable=True)
  ]))
  credits= credits.withColumn('crew_credit_id',from_json(col('crew'),crew_scheme).getField("credit_id").alias('crew_credit_id'))
  credits = credits.withColumn('crew_department',from_json(col('crew'),crew_scheme).getField("department").alias('crew_department'))
  credits= credits.withColumn('crew_genre',from_json(col('crew'),crew_scheme).getField("genre").alias('crew_genre'))
  credits = credits.withColumn('crew_id',from_json(col('crew'),crew_scheme).getField("id").alias('crew_id'))  
  credits= credits.withColumn('crew_job',from_json(col('crew'),crew_scheme).getField("job").alias('crew_job'))
  credits = credits.withColumn('crew_name',from_json(col('crew'),crew_scheme).getField("name").alias('crew_name'))  
  credits= credits.withColumn('crew_profile_path',from_json(col('crew'),crew_scheme).getField("profile_path").alias('crew_profile_path'))
  credits = credits.select([column for column in credits.columns if column not in ['cast', 'crew']])
  credits = credits.na.drop(thresh=16, how='all')
  #credits.show(20, False) 
  #credits.summary().show() 
  print('There are ', credits.count(), ' rows ')
  ratings.toPandas().to_csv(url + "processed/ratings.csv")
  movies_metadata.toPandas().to_csv(url+"processed/movies.csv")
  links.toPandas().to_csv(url+"processed/links.csv")
  keywords.toPandas().to_csv(url+"processed/keywords.csv")
  credits.toPandas().to_csv(url+"processed/credits.csv")
  return ratings, keywords, links, movies_metadata, credits 


In [100]:
def load_datasets():
  keywords = read(url+"processed/keywords.csv")
  ratings = read(url+"processed/ratings.csv")
  links = read(url+"processed/links.csv")
  movies = read(url+"processed/movies.csv")
  credits = read(url+"processed/credits.csv")
  return ratings, keywords, links, movies_metadata, credits 

In [8]:
url = 'drive/MyDrive/data/'
ratings, keywords, links, movies_metadata, credits = preprocess() #load_datasets()

******************** RATINGS *******************
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |31     |2.5   |1260759144|
|1     |1029   |3.0   |1260759179|
|1     |1061   |3.0   |1260759182|
|1     |1129   |2.0   |1260759185|
|1     |1172   |4.0   |1260759205|
|1     |1263   |2.0   |1260759151|
|1     |1287   |2.0   |1260759187|
|1     |1293   |2.0   |1260759148|
|1     |1339   |3.5   |1260759125|
|1     |1343   |2.0   |1260759131|
|1     |1371   |2.5   |1260759135|
|1     |1405   |1.0   |1260759203|
|1     |1953   |4.0   |1260759191|
|1     |2105   |4.0   |1260759139|
|1     |2150   |3.0   |1260759194|
|1     |2193   |2.0   |1260759198|
|1     |2294   |2.0   |1260759108|
|1     |2455   |2.5   |1260759113|
|1     |2968   |1.0   |1260759200|
|1     |36

## 1. Exploratory Data Analysis

In [9]:
movie_ratings = ratings.join(movies_metadata, ['movieId'], 'inner')

In [10]:
def insights(movies):
  print('great budget')
  movies.filter(movies.budget > 27000) \
      .show(truncate=False) 

  print('great box office revenue')
  movies.filter(movies.revenue > 9771658) \
      .show(truncate=False) 

  print('best rated movies')
  movies.filter(movies.vote_average > 7) \
      .show(truncate=False) 

  print('most hated movies')
  movies.filter(movies.vote_average < 5) \
      .show(truncate=False) 

  print('group movies by collection')
  movies.groupBy("belongs_to_collection_name").count().show(200,False)

  print('group movies by genres')
  movies.groupBy("genres_name").count().show(200,False)

  print('show by release years')
  movies.groupBy(year("release_date")).count().show(200,False)
  movies.select('release_date', 'title', 'vote_average')
 
  print('Avg Rates by movie')
  movies.groupBy(col('title')).agg(mean('vote_average')).show(200,False)

  print("best movie by year")
  movies.groupBy(year("release_date"), 'title').max('vote_average').show(200,False)

  print('**************Analyze users preferences ************')
  print("Average rate by movie")
  ratings.groupBy("movieId").agg(avg("rating"), count("rating")).show(200,False)
  
  print("Number of rates by user")
  ratings.groupBy('userId').count().show(200,False)

  print("Genres associated to collections")
  collections = movies_metadata.select("belongs_to_collection_name", "genres_name")
  collections = collections.dropna(subset=["belongs_to_collection_name", "genres_name"])
  collections_rdd = collections.rdd
  items = collections_rdd.flatMap(lambda row: [(row["belongs_to_collection_name"], row["genres_name"])])
  items.take(10)

  print("affichage temporel")

In [11]:
import pyspark.sql.functions as func
from pyspark.sql.functions import explode

def insights_credits(credits):
  credits = credits.withColumnRenamed("id","movieId")
  movie_credits = credits.join(movies_metadata, ['movieId'], 'outer')

  print('Vote average grouped by movie')
  movie_credits.groupBy('movieId', 'title').agg(mean('vote_average')).na.drop(thresh=3,how="all").show(200,False)

  print("All movies and credits")
  movie_credits.na.drop(thresh=17,how="all").show(200, False)

  print('Movies grouped by production company')
  df2 = movie_credits.select(explode(movie_credits.production_companies_name), movie_credits.title, movie_credits.production_countries_name)
  df2 = df2.withColumnRenamed("col", "production_companies_name")
  df2.show()
  df2.groupBy('production_companies_name').count()\
    .na.drop(how="all").show(200,False)

  print('Movies ordered by popularity')
  movie_credits = movie_credits.withColumn('popularityRound', func.round(movie_credits['popularity']).cast('integer'))
  movie_credits.orderBy(col("popularityRound").desc()).select('popularity','title','movieId','vote_average', 'vote_count').na.drop(thresh=5,how="all").show()
insights_credits(credits)

Vote average grouped by movie
+-------+-------------------------------------------------------------------+-----------------+
|movieId|title                                                              |avg(vote_average)|
+-------+-------------------------------------------------------------------+-----------------+
|580    |Jaws: The Revenge                                                  |3.5              |
|513    |Fire                                                               |6.0              |
|50291  |Alien Nation: Millennium                                           |5.5              |
|410988 |Stake Land II: The Stakelander                                     |5.4              |
|10495  |The Karate Kid, Part III                                           |5.5              |
|21348  |Kirikou and the Sorceress                                          |7.2              |
|10014  |A Nightmare on Elm Street Part 2: Freddy's Revenge                 |5.7              |
|127329 |N

In [12]:
def insights_keywords(keywords):
  keywords = keywords.withColumnRenamed("id","movieId")
  movie_keywords = keywords.join(movies_metadata, ['movieId'], 'inner')
  movie_keywords.show(20,False)

insights_keywords(keywords)

+-------+----------------------------+--------+----------+------------+---------+---------------------------+------------+----------+--------------------------------------------+--------------------------+-----------------+---------------------+--------------------------------+---------------------------+
|movieId|col                         |budget  |popularity|release_date|revenue  |title                      |vote_average|vote_count|production_companies_name                   |production_countries_name |production_iso_id|spoken_languages_name|belongs_to_collection_name      |genres_name                |
+-------+----------------------------+--------+----------+------------+---------+---------------------------+------------+----------+--------------------------------------------+--------------------------+-----------------+---------------------+--------------------------------+---------------------------+
|862    |toy comes to life           |30000000|21.946943 |1995-10-30  |37355403

In [13]:
insights(movie_ratings)

great budget
+-------+------+------+----------+---------+----------+------------+---------+-----------------------------------------------+------------+----------+----------------------------------------------------------------------------------------------------------------+------------------------------------------+-----------------+---------------------------+----------------------------------+-----------------------------------------+
|movieId|userId|rating|timestamp |budget   |popularity|release_date|revenue  |title                                          |vote_average|vote_count|production_companies_name                                                                                       |production_countries_name                 |production_iso_id|spoken_languages_name      |belongs_to_collection_name        |genres_name                              |
+-------+------+------+----------+---------+----------+------------+---------+-----------------------------------------------+-

### Distribution des valeurs des notes

Ratings vary from 0.5 which means the watcher hated the movie, to 5

There are 100 004 movies and watchers

# 2. Predict Movie Revenues and Vote averages

In [91]:
movies_metadata.printSchema()

root
 |-- budget: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: integer (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- production_companies_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- production_countries_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- production_iso_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- spoken_languages_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- belongs_to_collection_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres_name: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [14]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
import pickle
def predict(colPred, movies_metadata):
  vectorAssembler = VectorAssembler(inputCols = ['budget', 'popularity','revenue', 'vote_average', 'vote_count'], outputCol = 'movies_features')
  #uncomment the line below the first time you compile this cell
  movies_metadata = vectorAssembler.setHandleInvalid("skip").transform(movies_metadata)

  # Split the data into training and test sets (30% held out for testing)
  (trainingData, testData) = movies_metadata.randomSplit([0.7, 0.3])

  # Define Linear Regression algorithm
  lr = LinearRegression(labelCol=colPred, featuresCol='movies_features')

  # Define evaluator as R2, which calculates the coefficient of determination
  evaluator = RegressionEvaluator(
      metricName="r2", 
      labelCol=colPred)

  # Define a grid of parameters to search
  paramGrid = ParamGridBuilder() \
      .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
      .addGrid(lr.fitIntercept, [False, True]) \
      .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
      .build()

  # Set up 3-fold cross validation
  crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

  # Run cross-validation, and choose the best set of parameters.
  cvModel = crossval.fit(trainingData)
  cvmodel = cvModel.bestModel
  #pickle.dumps(url+"linearReg.sav")

  # Generate predictions on testData
  #cvModel = CrossValidatorModel.load(url+"linearReg.sav")
  predictions = cvModel.transform(testData)
  eval = evaluator.evaluate(predictions)  
  print("r2 :",eval)
  predictions = predictions.select("prediction", colPred, "movies_features")
  predictions.show(10, False)

predict('revenue',movies_metadata)
predict('vote_average', movies_metadata)

r2 : 1.0
+----------------------+--------+------------------------------------+
|prediction            |revenue |movies_features                     |
+----------------------+--------+------------------------------------+
|0.0033154313307866817 |0       |[0.0,6.5898,0.0,7.3,218.0]          |
|0.003973805654659905  |0       |[0.0,7.832755,0.0,7.8,246.0]        |
|0.004773469247162652  |0       |[0.0,12.105748,0.0,6.4,225.0]       |
|0.0015223408784318328 |0       |[0.0,4.964833,0.0,6.3,134.0]        |
|2.8673905866995143E-4 |0       |[0.0,11.97502,0.0,7.2,39.0]         |
|1.3276643542158798E-4 |0       |[0.0,6.321126,0.0,7.0,73.0]         |
|0.0033891253631339316 |0       |[0.0,6.796858,0.0,7.5,223.0]        |
|-0.0013145063439745676|0       |[0.0,2.617272,0.0,7.1,39.0]         |
|1.0878107001066966E7  |10878107|[0.0,7.851263,1.0878107E7,6.9,246.0]|
|-4.792268897412578E-4 |0       |[0.0,3.809138,0.0,6.2,52.0]         |
+----------------------+--------+-----------------------------------

# 3. Recommender :


In [15]:
#Extract genres outside of the array
movies = movies_metadata.select("movieId","budget", "revenue", "vote_average", "vote_count", "popularity", "title", explode(movies_metadata.genres_name))
movies = movies.withColumnRenamed("col", "genres")
movies.show(20, False)


+-------+--------+---------+------------+----------+----------+------------------------------+---------+
|movieId|budget  |revenue  |vote_average|vote_count|popularity|title                         |genres   |
+-------+--------+---------+------------+----------+----------+------------------------------+---------+
|862    |30000000|373554033|7.7         |5415      |21.946943 |Toy Story                     |Animation|
|862    |30000000|373554033|7.7         |5415      |21.946943 |Toy Story                     |Comedy   |
|862    |30000000|373554033|7.7         |5415      |21.946943 |Toy Story                     |Family   |
|15602  |0       |0        |6.5         |92        |11.7129   |Grumpier Old Men              |Romance  |
|15602  |0       |0        |6.5         |92        |11.7129   |Grumpier Old Men              |Comedy   |
|11862  |0       |76578911 |5.7         |173       |8.387519  |Father of the Bride Part II   |Comedy   |
|710    |58000000|352194034|6.6         |1194      |14.

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
als = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

In [17]:
def get_mat_sparsity(ratings):
    count_nonzero = ratings.select("rating").count()
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")
    
get_mat_sparsity(ratings)

The ratings dataframe is  98.36% sparse.


In [18]:
# add movie data into ratings
movie_ratings = ratings.join(movies_metadata, ['movieId'], 'inner')
movie_ratings.show(20,False)
# TO DO : train movie_ratings
(train, test) = movie_ratings.randomSplit([0.8, 0.2], seed = 2020)

+-------+------+------+----------+---------+----------+------------+---------+-----------------------------------------------+------------+----------+----------------------------------------------------------------------------------------------------------------+------------------------------------------+-----------------+---------------------------+----------------------------------+-----------------------------------------+
|movieId|userId|rating|timestamp |budget   |popularity|release_date|revenue  |title                                          |vote_average|vote_count|production_companies_name                                                                                       |production_countries_name                 |production_iso_id|spoken_languages_name      |belongs_to_collection_name        |genres_name                              |
+-------+------+------+----------+---------+----------+------------+---------+-----------------------------------------------+------------+-

In [19]:
ratings.show(20, False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |31     |2.5   |1260759144|
|1     |1029   |3.0   |1260759179|
|1     |1061   |3.0   |1260759182|
|1     |1129   |2.0   |1260759185|
|1     |1172   |4.0   |1260759205|
|1     |1263   |2.0   |1260759151|
|1     |1287   |2.0   |1260759187|
|1     |1293   |2.0   |1260759148|
|1     |1339   |3.5   |1260759125|
|1     |1343   |2.0   |1260759131|
|1     |1371   |2.5   |1260759135|
|1     |1405   |1.0   |1260759203|
|1     |1953   |4.0   |1260759191|
|1     |2105   |4.0   |1260759139|
|1     |2150   |3.0   |1260759194|
|1     |2193   |2.0   |1260759198|
|1     |2294   |2.0   |1260759108|
|1     |2455   |2.5   |1260759113|
|1     |2968   |1.0   |1260759200|
|1     |3671   |3.0   |1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [20]:
url = 'drive/MyDrive/data/'

def train_and_save_model():
  param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

  evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
  print ("Num models to be tested: ", len(param_grid))
  cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
  model = cv.fit(train)
  best_model = model.bestModel
  test_predictions = best_model.transform(test)
  RMSE = evaluator.evaluate(test_predictions)
  print(RMSE)
  print("**Best Model**")
  print("  Rank:", best_model._java_obj.parent().getRank())
  print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
  print("  RegParam:", best_model._java_obj.parent().getRegParam())
  print('saving model ...')
  best_model.save('als_model2.sav')
  print('model saved')

#le faire une fois (sinon le stocker sur le github)
#train_and_save_model()

#   1. Suggest top N movies similar to a given movie title.

Choses à faire dans l'analyse :
- Regarder la distribution des valeurs de notes
- Le prendre en compte pour l'échantillon de recommendations
Par exemple, 3/5 sera la médiane.

In [21]:
from pyspark.sql.functions import explode
from pyspark.ml.recommendation import ALS, ALSModel

def recommend(title, nb_rec,movies):
  als = ALSModel.load(url + 'als_model2.sav')
  #result = als.score(X_test, Y_test)
  subset = movies_metadata.filter(movies_metadata.title == title)
  print('The movie information : ', title)
  subset.show()

  ratings.join(movies_metadata, on='movieId').filter(movies_metadata.title ==title).sort('rating', ascending=False).show()
  recommendations = als.recommendForItemSubset(subset, nb_rec) #best_model.recommendForUserSubset(subset,5)
  recommendations.show(nb_rec,False)
  nrecommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations")) \
    .select('movieId', col("rec_exp.userId"), col("rec_exp.rating"))
  print("We recommend you to see movies liked by")
  movies_associated = nrecommendations.join(movies_metadata)
  nrecommendations.limit(nb_rec).show()
  nrecommendations.join(movies_metadata, on='movieId').show()
  return nrecommendations

recommend('GoldenEye', 5,movies_metadata)

The movie information :  GoldenEye
+--------+-------+----------+------------+---------+---------+------------+----------+-------------------------+-------------------------+-----------------+---------------------+--------------------------+--------------------+
|  budget|movieId|popularity|release_date|  revenue|    title|vote_average|vote_count|production_companies_name|production_countries_name|production_iso_id|spoken_languages_name|belongs_to_collection_name|         genres_name|
+--------+-------+----------+------------+---------+---------+------------+----------+-------------------------+-------------------------+-----------------+---------------------+--------------------------+--------------------+
|58000000|    710| 14.686036|  1995-11-16|352194034|GoldenEye|         6.6|      1194|     [United Artists, ...|     [United Kingdom, ...|         [GB, US]| [English, Pусский...|      [James Bond Colle...|[Adventure, Actio...|
+--------+-------+----------+------------+---------+-----

DataFrame[movieId: int, userId: int, rating: float]

In [22]:
movies_metadata.show()
movie_ratings = ratings.join(movies_metadata, on='movieId').filter(movies_metadata.title =='GoldenEye').sort('rating', ascending=False)
movie_ratings.groupBy('userId').count().show(20,False)

+---------+-------+----------+------------+---------+--------------------+------------+----------+-------------------------+-------------------------+-----------------+---------------------+--------------------------+--------------------+
|   budget|movieId|popularity|release_date|  revenue|               title|vote_average|vote_count|production_companies_name|production_countries_name|production_iso_id|spoken_languages_name|belongs_to_collection_name|         genres_name|
+---------+-------+----------+------------+---------+--------------------+------------+----------+-------------------------+-------------------------+-----------------+---------------------+--------------------------+--------------------+
| 30000000|    862| 21.946943|  1995-10-30|373554033|           Toy Story|         7.7|      5415|     [Pixar Animation ...|     [United States of...|             [US]|            [English]|      [Toy Story Collec...|[Animation, Comed...|
|        0|  15602|   11.7129|  1995-12-22| 

Cluster movies

In [23]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import StandardScaler, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col,when


def normalize(df):
  unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
  # Scale dataset
  for i in df.columns:
      # VectorAssembler Transformation - Converting column to vector type
      #explode les array 
      assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
      scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
      pipeline = Pipeline(stages=[assembler, scaler])

      # Fitting pipeline on dataframe
      try:
        df = df.withColumn(i,col(i).cast("int"))
        df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
        print(i, " ok")
        if i == "col":
          continue
          #one hot encoding
      except:
        print('error for ',i)
  return df

movies_metadata = normalize(movies_metadata)

budget  ok
movieId  ok
popularity  ok
error for  release_date
error for  revenue
error for  title
vote_average  ok
vote_count  ok
error for  production_companies_name
error for  production_countries_name
error for  production_iso_id
error for  spoken_languages_name
error for  belongs_to_collection_name
error for  genres_name


In [24]:
movies_metadata.show(20,False) #2908 rows

+---------+-------+----------+------------+---------+-----+------------+----------+-------------------------------------------------------------------------------------------------------+------------------------------------------+-----------------+---------------------------+----------------------------------+------------------------------------------+-------------+--------------+-----------------+-------------------+-----------------+
|budget   |movieId|popularity|release_date|revenue  |title|vote_average|vote_count|production_companies_name                                                                              |production_countries_name                 |production_iso_id|spoken_languages_name      |belongs_to_collection_name        |genres_name                               |budget_Scaled|movieId_Scaled|popularity_Scaled|vote_average_Scaled|vote_count_Scaled|
+---------+-------+----------+------------+---------+-----+------------+----------+-------------------------------------

In [25]:
movies.show()

+-------+--------+---------+------------+----------+----------+--------------------+---------+
|movieId|  budget|  revenue|vote_average|vote_count|popularity|               title|   genres|
+-------+--------+---------+------------+----------+----------+--------------------+---------+
|    862|30000000|373554033|         7.7|      5415| 21.946943|           Toy Story|Animation|
|    862|30000000|373554033|         7.7|      5415| 21.946943|           Toy Story|   Comedy|
|    862|30000000|373554033|         7.7|      5415| 21.946943|           Toy Story|   Family|
|  15602|       0|        0|         6.5|        92|   11.7129|    Grumpier Old Men|  Romance|
|  15602|       0|        0|         6.5|        92|   11.7129|    Grumpier Old Men|   Comedy|
|  11862|       0| 76578911|         5.7|       173|  8.387519|Father of the Bri...|   Comedy|
|    710|58000000|352194034|         6.6|      1194| 14.686036|           GoldenEye|Adventure|
|    710|58000000|352194034|         6.6|      119

In [110]:
#import required libraries
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
genre_indexer = StringIndexer(inputCol="genres", outputCol="genreIndex")
df1 = genre_indexer.fit(movies).transform(movies)
ohe_genre_vector = OneHotEncoder(inputCol="genreIndex", outputCol="genre_vec")
movies = ohe_genre_vector.fit(df1).transform(df1)
movies.show()

+-------+--------+---------+------------+----------+----------+--------------------+---------+----------+---------------+
|movieId|  budget|  revenue|vote_average|vote_count|popularity|               title|   genres|genreIndex|      genre_vec|
+-------+--------+---------+------------+----------+----------+--------------------+---------+----------+---------------+
|    862|30000000|373554033|           7|      5415|        21|           Toy Story|Animation|       8.0| (19,[8],[1.0])|
|    862|30000000|373554033|           7|      5415|        21|           Toy Story|   Comedy|       1.0| (19,[1],[1.0])|
|    862|30000000|373554033|           7|      5415|        21|           Toy Story|   Family|       6.0| (19,[6],[1.0])|
|  15602|       0|        0|           6|        92|        11|    Grumpier Old Men|  Romance|      12.0|(19,[12],[1.0])|
|  15602|       0|        0|           6|        92|        11|    Grumpier Old Men|   Comedy|       1.0| (19,[1],[1.0])|
|  11862|       0| 76578

In [49]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import numpy as np


# Get clusters of movies and users
def Kmeans(data, predCol, searchMode, nb_cluster):
  assembler2 = VectorAssembler(inputCols=data.columns, outputCol = 'features')
  output = assembler2.transform(data)
  scale=StandardScaler(inputCol='features',outputCol='standardized')
  data_scale=scale.fit(output)
  movies_scaled=data_scale.transform(output)
  movies_scaled.show(10, False)

  movies2 = output.select("features",predCol)
  movies2 = movies2.na.drop(how="all")
  evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
  if searchMode:
    print(' \n Let us determine the best number of clusters : ')
    # Silhouette Score Method
    fitted_kmeans = {}
    labels_kmeans = {}
    silhouette_score=[]

    for i in range(2,11):
        KMeans_algo=KMeans(featuresCol='standardized', k=i)
        KMeans_fit=KMeans_algo.fit(movies_scaled)
        output=KMeans_fit.transform(movies_scaled)
        score=evaluator.evaluate(output)
        silhouette_score.append(score)
        print("Silhouette Score:",score)
        # Evaluation
        # Either sum of squared errors
        summary = KMeans_fit.summary
        tmp_scores = summary.trainingCost
        print("training cost : ",tmp_scores)

    # Plot silhouette score evolution
    fig, ax = plt.subplots(1,1, figsize =(10,8))
    ax.plot(range(2,11),silhouette_score[0:9])
    ax.set_xlabel('Number of Clusters')
    ax.set_ylabel('Silhouette Score')
    return KMeans(featuresCol='standardized', k=5).fit(movies_scaled)
  else:
    kmeans = KMeans(featuresCol='features').setK(nb_cluster).setSeed(2)
    kmeans.setWeightCol(predCol)
    kmeans.setMaxIter(10)
    model = kmeans.fit(movies2)
    predictions = model.transform(movies2)
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    print("Silhouette with squared euclidean distance = " + str(silhouette))
    centers = model.clusterCenters()
    print("Cluster Centers of Movies: ")
    for center in centers:
        print(center)

    return model
    

data = movies_metadata.select('movieId', 'budget', 'vote_average', 'vote_count').na.drop()
# uncomment this line to see the silhouette score method
#Kmeans(data, "movieId", True,5)

# we must choose nb_cluster = 7 (the lowest silhouette score)
movieClusters = Kmeans(data,"movieId", False, 7)

data2 = ratings.select("userId","movieId","rating").na.drop()
#search
#Kmeans(data2, "rating",True, 5)

#nb_cluster choice
userClusters = Kmeans(data2, "rating",False, 7)


+-------+--------+------------+----------+--------------------------+------------------------------------------------------------------------------+
|movieId|budget  |vote_average|vote_count|features                  |standardized                                                                  |
+-------+--------+------------+----------+--------------------------+------------------------------------------------------------------------------+
|862    |30000000|7           |5415      |[862.0,3.0E7,7.0,5415.0]  |[0.009168292126957927,0.715604854099034,5.482199049875691,4.195534904691783]  |
|15602  |0       |6           |92        |[15602.0,0.0,6.0,92.0]    |[0.16594396028398792,0.0,4.699027757036307,0.07128147945182715]               |
|11862  |0       |5           |173       |[11862.0,0.0,5.0,173.0]   |[0.1261650594083236,0.0,3.9158564641969216,0.13404017331702278]               |
|710    |58000000|6           |1194      |[710.0,5.8E7,6.0,1194.0]  |[0.0075516095245245106,1.383502717924

Exception ignored in: <function JavaWrapper.__del__ at 0x7fda784a93a0>
Traceback (most recent call last):
  File "/content/spark-3.0.1-bin-hadoop3.2/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'KMeans' object has no attribute '_java_obj'


[3.45974560e+02 5.29534334e+04 3.81600455e+00]
[3.55351465e+02 1.10411333e+05 3.78378273e+00]
[3.51443375e+02 7.30721221e+04 3.83223731e+00]
[3.54492546e+02 3.32545698e+04 3.80104639e+00]
[3.50547634e+02 9.19251756e+04 3.76073219e+00]


In [62]:
def search_recommandations():  
  assembler2 = VectorAssembler(inputCols=data.columns, outputCol = 'features')
  output = assembler2.transform(data)
  scale=StandardScaler(inputCol='features',outputCol='standardized')
  data_scale=scale.fit(output)
  movies_scaled=data_scale.transform(output)
  movies2 = output.select("features","movieId")
  movies2 = movies2.na.drop(how="all")

  assemblerUser = VectorAssembler(inputCols=["rating","userId","movieId"], outputCol = 'features')
  output2 = assemblerUser.transform(ratings.drop('timestamp'))
  scale2=StandardScaler(inputCol='features',outputCol='standardized')
  data_scale2=scale2.fit(output2)
  ratings_scaled=data_scale2.transform(output)
  ratings2 = output2.select("features","rating")
  ratings2 = movies2.na.drop(how="all")
  movieClusters.clusterCenters()[1] == userClusters.clusterCenters()[1]
  movieClusters.transform(movies2).take(1) == userClusters.transform(movies2).take(1)
search_recommandations()

  movieClusters.clusterCenters()[1] == userClusters.clusterCenters()[1]


Py4JJavaError: ignored

In [64]:
assembler2 = VectorAssembler(inputCols=data.columns, outputCol = 'features')
output = assembler2.transform(data)
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(output)
movies_scaled=data_scale.transform(output)
movies2 = output.select("features","movieId")
movies2 = movies2.na.drop(how="all")

assemblerUser = VectorAssembler(inputCols=["rating","userId","movieId"], outputCol = 'features')
output2 = assemblerUser.transform(ratings.drop('timestamp'))
scale2=StandardScaler(inputCol='features',outputCol='standardized')
data_scale2=scale2.fit(output2)
ratings_scaled=data_scale2.transform(output)
ratings2 = output2.select("features","rating")
ratings2 = movies2.na.drop(how="all")

movieClusters.transform(movies2).take(1)
userClusters.transform(ratings2).take(1)

Py4JJavaError: ignored

  2. Predict user rating for the movies they have not rated for. You may use a test set to test your prediction accuracy, in which the test ratings can be regarded as not rated during training .[texte du lien](https://)

> Let's see the missing ratings from users and predict the ratings



In [None]:
print(movies_metadata.distinct().count(), 'distinct movies,', ratings.distinct().count(), "ratings and", movie_ratings.distinct().count(), "movies rated by users. Here is the details : \n")
movie_ratings.groupBy("userId").count().show()

7445 distinct movies, 100004 ratings and  25599 movies rated by users. Here is the details : 

+------+-----+
|userId|count|
+------+-----+
|   463|   82|
|   496|   73|
|   148|   31|
|   471|   28|
|   623|   28|
|   243|   61|
|   392|    7|
|   516|   62|
|    31|   15|
|   580|  188|
|   251|    7|
|    85|   57|
|   137|   16|
|   451|    9|
|   458|   26|
|    65|    5|
|   588|   35|
|    53|   18|
|   481|   46|
|   255|   19|
+------+-----+
only showing top 20 rows



In [128]:
from pyspark.sql import Row


def predict_movie_ratings():
  (training, test) = ratings.randomSplit([0.8, 0.2])
  als = ALS(
      maxIter=5, 
      regParam=0.01, 
      userCol="userId", 
      itemCol="movieId", 
      ratingCol="rating",
      coldStartStrategy="drop") # avoid null evaluation metrics

  model = als.fit(training)
  predictions = model.transform(test)
  predictions.show()
  evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
  rmse = evaluator.evaluate(predictions)
  print("Root-mean-square error = " + str(rmse))

predict_movie_ratings()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   452|    463|   2.0| 976424451| 2.1425714|
|   380|    463|   3.0| 968949106|  3.396055|
|   292|    471|   3.5|1140049920|  4.337501|
|    19|    471|   3.0| 855192558| 3.5838814|
|   299|    471|   4.5|1344186741|  4.514789|
|   309|    471|   4.0|1114565458| 4.0581236|
|    23|    471|   3.5|1148730134|  4.484413|
|   514|    471|   4.0| 853893788| 4.2827435|
|   574|    471|   3.5|1232817270| 4.0677524|
|   509|    496|   3.0| 940013481| 2.0602877|
|   463|   1088|   3.0|1050499697| 2.3124497|
|   133|   1088|   1.5|1416166508| 1.3684623|
|    57|   1088|   4.0| 907764935|  4.454381|
|    15|   1088|   2.0|1122576683|0.84613925|
|   262|   1088|   2.0|1433938031| 2.0613623|
|   564|   1088|   2.0| 974844186| 3.3288193|
|   505|   1088|   4.0|1340407488| 3.4572065|
|   514|   1088|   3.0| 853896732| 1.9094237|
|   509|   1088|   2.0|1093295913|