### Movie Recommender System

This notebook summarizes top 10 movies recommendation and top 10 movies recommended for each genre (which we do not consider users previous history and keyword, etc.)
There are other popular recommender techniques such as ALS for collabotive filtering or content filtering based on keyword which are not used in this notebook

Mount an Azure Blob Storage Account

In [0]:
spark.conf.set(
  "fs.azure.account.key.ivyprojectmovie.blob.core.windows.net",
  "+z/sopCfJqbq0DURX9seAUXcbf7LB1/1SqyWohUF2Wyi9eQXu5csVrEhCnyRx7McF2p9oBxob2L/Y7m68iVLeA==")

In [0]:
dbutils.fs.ls("wasbs://container-movie@ivyprojectmovie.blob.core.windows.net/")

Read Files

In [0]:
files = {}
for file in dbutils.fs.ls("wasbs://container-movie@ivyprojectmovie.blob.core.windows.net/"):
  extension = file[1].rsplit('.')[1]
  if extension == 'csv':
    files[file[1].rsplit('.')[0]] = file[0]
print(files)

data = {i: spark.read.csv(files[i], inferSchema = True, header = True) for i in files}

In [0]:
for i in data:
  print(i)
  data[i].show(5)
  print('\n')

Import Libraries

In [0]:
import datetime
import pyspark.sql.functions as f
import pyspark.sql.types
import pandas as pd

from pyspark.sql.functions import (year, month, dayofmonth, unix_timestamp, from_unixtime, rank, min,col,date_format)
from pyspark.sql import Window

Clean Data

In [0]:
data['ratings'] = data['ratings'].withColumn('rating_date', f.from_unixtime("timestamp"))
data['ratings'] = data['ratings'].withColumn('rating_date', date_format('rating_date', "yyyy-MM-dd"))
display(data['ratings'])

userId,movieId,rating,timestamp,rating_date
1,296,5.0,1147880044,2006-05-17
1,306,3.5,1147868817,2006-05-17
1,307,5.0,1147868828,2006-05-17
1,665,5.0,1147878820,2006-05-17
1,899,3.5,1147868510,2006-05-17
1,1088,4.0,1147868495,2006-05-17
1,1175,3.5,1147868826,2006-05-17
1,1217,3.5,1147878326,2006-05-17
1,1237,5.0,1147868839,2006-05-17
1,1250,4.0,1147868414,2006-05-17


In [0]:
data['tags'] = data['tags'].withColumn('tag_date', f.from_unixtime("timestamp"))
data['tags'] = data['tags'].withColumn('tag_date', date_format('tag_date', "yyyy-MM-dd"))
display(data['tags'])

userId,movieId,tag,timestamp,tag_date
3,260,classic,1439472355,2015-08-13
3,260,sci-fi,1439472256,2015-08-13
4,1732,dark comedy,1573943598,2019-11-16
4,1732,great dialogue,1573943604,2019-11-16
4,7569,so bad it's good,1573943455,2019-11-16
4,44665,unreliable narrators,1573943619,2019-11-16
4,115569,tense,1573943077,2019-11-16
4,115713,artificial intelligence,1573942979,2019-11-16
4,115713,philosophical,1573943033,2019-11-16
4,115713,tense,1573943042,2019-11-16


Explore Data & Clean Data

In [0]:
# shape of each dataset
def spark_df_shape(self):
    return (self.count(),len(self.columns)) 
pyspark.sql.dataframe.DataFrame.shape = spark_df_shape

for i in data:
  print(i)
  print(f'shape of {i} dataset is')
  print(data[i].shape())

In [0]:
number_of_ratings = data['ratings'].count()
ratings = data['ratings'].rdd
number_of_users = ratings.map(lambda r: r[0]).distinct().count() # unique user ID 
number_of_movies = ratings.map(lambda r: r[1]).distinct().count() # unique movieID
print(f"the dataset contains {number_of_ratings} ratings from {number_of_users} users on {number_of_movies} movies")

In [0]:
# count by rating date
data['ratings'].groupby('rating_date').count().sort(col('count').desc()).show()

In [0]:
# average rating for each movieId
movie_avg_rating = data['ratings'].groupby('movieId').mean('rating').withColumnRenamed('avg(rating)','avg_rating')
display(movie_avg_rating)

movieId,avg_rating
1088,3.2610254571531017
1580,3.5779684502238327
3175,3.60844250363901
44022,3.2222222222222223
175197,2.600591715976331
1645,3.552476780185758
471,3.651442307692308
3794,3.235449735449736
8638,3.9696180555555554
33722,3.6538461538461537


In [0]:
# we want to see for each movie, how many ratings they have
movie_with_rating_count = data['ratings'].groupby('movieId').count()
display(movie_with_rating_count)

movieId,count
36525,462
1580,9382
3175,3435
1959,1150
1645,3230
1591,1250
471,2496
833,338
1088,2789
1238,706


In [0]:
data['movies'].select('movieId').distinct().rdd.map(lambda r: r[0]).count()

In [0]:
# average rating score with each movie join with the movies dataset
df = movie_avg_rating.join(data['movies'], 'movieId','inner')
df = df.join(movie_with_rating_count,'movieId','inner')
display(df)

movieId,avg_rating,title,genres,count
1088,3.2610254571531017,Dirty Dancing (1987),Drama|Musical|Romance,2789
1580,3.5779684502238327,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi,9382
3175,3.60844250363901,Galaxy Quest (1999),Adventure|Comedy|Sci-Fi,3435
44022,3.2222222222222223,Ice Age 2: The Meltdown (2006),Adventure|Animation|Children|Comedy,1170
175197,2.600591715976331,The Dark Tower (2017),Fantasy|Horror|Sci-Fi|Western,169
1645,3.552476780185758,The Devil's Advocate (1997),Drama|Mystery|Thriller,3230
471,3.651442307692308,"Hudsucker Proxy, The (1994)",Comedy,2496
3794,3.235449735449736,Chuck & Buck (2000),Comedy|Drama,189
8638,3.9696180555555554,Before Sunset (2004),Drama|Romance,1152
33722,3.6538461538461537,Ladies in Lavender (2004),Comedy|Drama|Romance,39


In [0]:
df = df.withColumn("year", f.regexp_extract(df.title, r"(\d{4})", 0))          
display(df)

movieId,avg_rating,title,genres,count,year
36525,3.477272727272727,Just Like Heaven (2005),Comedy|Fantasy|Romance,462,2005
1580,3.5779684502238327,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi,9382,1997
3175,3.60844250363901,Galaxy Quest (1999),Adventure|Comedy|Sci-Fi,3435,1999
1959,3.6030434782608696,Out of Africa (1985),Drama|Romance,1150,1985
1645,3.552476780185758,The Devil's Advocate (1997),Drama|Mystery|Thriller,3230,1997
1591,2.6108,Spawn (1997),Action|Adventure|Sci-Fi|Thriller,1250,1997
471,3.651442307692308,"Hudsucker Proxy, The (1994)",Comedy,2496,1994
833,2.6568047337278107,High School High (1996),Comedy,338,1996
1088,3.2610254571531017,Dirty Dancing (1987),Drama|Musical|Romance,2789,1987
1238,4.009915014164306,Local Hero (1983),Comedy,706,1983


### Simple Recommender 
##### Top 10 Movies 
       - ignore our users previous review history/genre and so on

In [0]:
# filter out those with 10 or less reviews
import re
overall_mean = df.select(f.mean('avg_rating')).collect()[0][0]
print(overall_mean)
minimum_vote = df.approxQuantile('count', [0.8], 0)[0]
print(minimum_vote)
qualified_movies = df.filter(df['count'] >= minimum_vote).toPandas()
qualified_movies.shape

def score(movie, m =minimum_vote, C = overall_mean ):
  v = movie['count']
  R = movie['avg_rating']
  return (v/(v+m) * R) + (m/(m+v) * C)

# def extract_year(movie):
#   title = movie['title']
#   year = re.findall(r'(\d+)', title) 
#   return year

qualified_movies['score'] = qualified_movies.apply(score, axis = 1)

In [0]:
qualified_movies.shape

In [0]:
qualified_movies[['title','year','score']].sort_values('score', ascending = False).head(10)

Unnamed: 0,title,year,score
3561,"Shawshank Redemption, The (1994)",1994,4.418552
5010,Planet Earth (2006),2006,4.349163
50,"Godfather, The (1972)",1972,4.322
1909,Planet Earth II (2016),2016,4.304887
5108,"Usual Suspects, The (1995)",1995,4.285329
5813,"Godfather: Part II, The (1974)",1974,4.272983
2798,Band of Brothers (2001),2001,4.251262
5159,Schindler's List (1993),1993,4.24489
3337,Seven Samurai (Shichinin no samurai) (1954),1954,4.234509
3096,Rear Window (1954),1954,4.230119


##### Genre Recommender
   - Top 10 movies recommeded for each genre

In [0]:
genres = list(set('|'.join(list(qualified_movies["genres"].unique())).split('|')))
print(genres)
print("\n")
print(f"there are {len(genres)} of unique genres")

In [0]:
qualified_movies_genre = qualified_movies.copy()
genres.remove('(no genres listed)')
for genre in genres:
    qualified_movies_genre[genre] = qualified_movies_genre['genres'].map(lambda val: 1 if genre in val else 0)

In [0]:
qualified_movies_genre.head()

Unnamed: 0,movieId,avg_rating,title,genres,count,year,score,Sci-Fi,Adventure,Romance,Drama,Animation,Western,Documentary,Thriller,Fantasy,War,Film-Noir,Horror,Action,Musical,Children,Crime,Comedy,Mystery,IMAX
0,1088,3.261025,Dirty Dancing (1987),Drama|Musical|Romance,2789,1987,3.257919,0,0,1,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0
1,1580,3.577968,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi,9382,1997,3.575522,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0
2,3175,3.608443,Galaxy Quest (1999),Adventure|Comedy|Sci-Fi,3435,1999,3.60142,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
3,44022,3.222222,Ice Age 2: The Meltdown (2006),Adventure|Animation|Children|Comedy,1170,2006,3.216413,0,1,0,0,1,0,0,0,0,0,0,0,0,0,1,0,1,0,0
4,175197,2.600592,The Dark Tower (2017),Fantasy|Horror|Sci-Fi|Western,169,2017,2.698326,1,0,0,0,0,1,0,0,1,0,0,1,0,0,0,0,0,0,0


In [0]:
genre_recommended = {}
for genre in genres:
  genre_movies = qualified_movies_genre[qualified_movies_genre[genre]==1]
  top10 = genre_movies.sort_values(by = ['score'], ascending = False)[:11]
  movie_detail = top10[['title','score']]
  genre_recommended[genre] = movie_detail 
  
x = [item[0] for item in genre_recommended.items()]
top_10_genre = pd.DataFrame({'genre': x[0], 'title': genre_recommended[x[0]]['title'], 'score': genre_recommended[x[0]]['score']})
for row in x[1:]:
    top_10_genre_1 = pd.DataFrame({'genre': row, 'title': genre_recommended[row]['title'], 'score':genre_recommended[row]['score']})
    top_10_genre = pd.concat([top_10_genre, top_10_genre_1])
top_10_genre .reset_index(drop = True, inplace = True)
print("the shape of recommended top 10 movies for each genre dataframe is : ")   
print(top_10_genre.shape)    
top_10_genre.head(20)

Unnamed: 0,genre,title,score
0,Sci-Fi,Inception (2010),4.157753
1,Sci-Fi,"Matrix, The (1999)",4.142185
2,Sci-Fi,Star Wars: Episode V - The Empire Strikes Back...,4.141135
3,Sci-Fi,Star Wars: Episode IV - A New Hope (1977),4.11153
4,Sci-Fi,Blade Runner (1982),4.10434
5,Sci-Fi,Interstellar (2014),4.091171
6,Sci-Fi,"Prestige, The (2006)",4.086606
7,Sci-Fi,Nausicaä of the Valley of the Wind (Kaze no ta...,4.065412
8,Sci-Fi,Eternal Sunshine of the Spotless Mind (2004),4.055605
9,Sci-Fi,Spider-Man: Into the Spider-Verse (2018),4.04902


In [0]:
# Tag Recommender
# tags
# df_tags = data['movies'].join(data['tags'], 'movieId','inner').drop('timestamp')
# display(df_tags)
# print(df_tags.shape())

In [0]:
# from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer,Normalizer
# from pyspark.ml import Pipeline
# tokenizer = Tokenizer(inputCol="tag", outputCol="token_text")
# stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
# count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
# idf = IDF(inputCol="c_vec", outputCol="tf_idf")
# normalizer = Normalizer(inputCol="tf_idf", outputCol="norm")


# data_prep_pipe = Pipeline(stages=[tokenizer,stopremove,count_vec,idf,normalizer])
# cleaner= data_prep_pipe.fit(df_tags)
# clean_df_tags = cleaner.transform(df_tags)
# display(clean_df_tags)

# # dot product, calculate cosine similarity, decide to change spark DF to pandas DF as it's easier to calculate the similarity