Data ETL and Exploration

In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline
import os
os.environ['PYSPARK_PYTHON'] = 'python3'

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('movie analysis').config('spark.some.config.option', 'some-value').getOrCreate()
movies_df = spark.read.load('/FileStore/tables/movies.csv', format = 'csv', header = True)
ratings_df = spark.read.load('/FileStore/tables/ratings.csv', format = 'csv', header = True)
links_df = spark.read.load('/FileStore/tables/links.csv', format = 'csv', header = True)
tags_df = spark.read.load('/FileStore/tables/tags.csv', format = 'csv', header = True)

In [0]:
movies_df.show(5)

In [0]:
ratings_df.show(5)

In [0]:
links_df.show(5)

In [0]:
tags_df.show(5)

In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [0]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

In [0]:
query = spark.sql('SELECT COUNT(DISTINCT userId) AS num_users FROM ratings')
display(query)

num_users
610


In [0]:
query = spark.sql('SELECT COUNT(movieId) AS num_movies FROM movies')
display(query)

num_movies
9742


In [0]:
#number of movies that have been rated before
query = spark.sql('SELECT COUNT(movieId) AS num_rated FROM movies WHERE movieId IN (SELECT movieId FROM ratings)')
display(query)

num_rated
9724


In [0]:
#which movies have not received any ratings
query = spark.sql('SELECT movieId, title FROM movies WHERE movieId NOT IN (SELECT movieId FROM ratings)')
display(query)

movieId,title
1076,"Innocents, The (1961)"
2939,Niagara (1953)
3338,For All Mankind (1989)
3456,"Color of Paradise, The (Rang-e khoda) (1999)"
4194,I Know Where I'm Going! (1945)
5721,"Chosen, The (1981)"
6668,"Road Home, The (Wo de fu qin mu qin) (1999)"
6849,Scrooge (1970)
7020,Proof (1991)
7792,"Parallax View, The (1974)"


In [0]:
#find all the different movie genres
query = spark.sql("SELECT DISTINCT EXPLODE(SPLIT(genres,'[|]')) AS genres FROM movies ORDER BY genres")
display(query)

genres
(no genres listed)
Action
Adventure
Animation
Children
Comedy
Crime
Documentary
Drama
Fantasy


In [0]:
#number of movies in each genre
query = spark.sql("SELECT genres, COUNT(movieId) AS num_movies FROM (SELECT EXPLODE(SPLIT(genres, '[|]')) AS genres, movieId FROM movies) GROUP BY genres ORDER BY num_movies DESC")
display(query)

genres,num_movies
Drama,4361
Comedy,3756
Thriller,1894
Action,1828
Romance,1596
Adventure,1263
Crime,1199
Sci-Fi,980
Horror,978
Fantasy,779


ALS based model training

In [0]:
ratings = ratings_df.drop('timestamp')

In [0]:
from pyspark.sql.types import IntegerType, FloatType
ratings = ratings.withColumn('userId', ratings['userId'].cast(IntegerType()))
ratings = ratings.withColumn('movieId', ratings['movieId'].cast(IntegerType()))
ratings = ratings.withColumn('rating', ratings['rating'].cast(FloatType()))

In [0]:
display(ratings)

userId,movieId,rating
1,1,4.0
1,3,4.0
1,6,4.0
1,47,5.0
1,50,5.0
1,70,3.0
1,101,5.0
1,110,4.0
1,151,5.0
1,157,5.0


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
(train, test) = ratings.randomSplit([.8,.2])

In [0]:
als = ALS(maxIter = 5, rank = 10, regParam = 0.01, userCol = 'userId', itemCol = 'movieId', ratingCol = 'rating', coldStartStrategy = 'drop')
paramGrid = (ParamGridBuilder().addGrid(als.regParam, [.05, .1, .3, .5]).addGrid(als.rank, [5, 10, 15]).addGrid(als.maxIter, [1, 5, 10]).build())

In [0]:
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')
cv = CrossValidator(estimator = als, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds = 5)
model = cv.fit(train)
best = model.bestModel

In [0]:
preds = best.transform(test)
rmse = evaluator.evaluate(preds)

In [0]:
print('RMSE = ' + str(rmse))
print('Best Model Parameters:')
print('Rank: ', str(best._java_obj.parent().getRank()))
print('Max Iter: ', str(best._java_obj.parent().getMaxIter()))
print('Reg Param: ', str(best._java_obj.parent().getRegParam()))

In [0]:
display(preds)

userId,movieId,rating,prediction
602,471,4.0,3.1901875
57,471,3.0,3.4311104
171,471,3.0,4.3806076
32,471,3.0,4.2933917
608,471,1.5,3.0534174
307,833,1.0,0.33906984
606,1088,3.0,3.3986058
132,1088,4.0,2.8598394
169,1088,4.5,4.3873987
387,1088,1.5,2.5315177


In [0]:
alldata = best.transform(ratings)
rmse = evaluator.evaluate(alldata)
print('RMSE = ' + str(rmse))

In [0]:
alldata.registerTempTable('alldata')
query = spark.sql('SELECT * FROM movies JOIN alldata ON movies.movieId = alldata.movieId')
display(query)

movieId,title,genres,userId,movieId.1,rating,prediction
471,"Hudsucker Proxy, The (1994)",Comedy,133,471,4.0,3.046037
471,"Hudsucker Proxy, The (1994)",Comedy,597,471,2.0,3.9713385
471,"Hudsucker Proxy, The (1994)",Comedy,385,471,4.0,3.0281596
471,"Hudsucker Proxy, The (1994)",Comedy,436,471,3.0,3.3384461
471,"Hudsucker Proxy, The (1994)",Comedy,602,471,4.0,3.1901875
471,"Hudsucker Proxy, The (1994)",Comedy,91,471,1.0,2.7729642
471,"Hudsucker Proxy, The (1994)",Comedy,409,471,3.0,3.7746398
471,"Hudsucker Proxy, The (1994)",Comedy,372,471,3.0,3.4055138
471,"Hudsucker Proxy, The (1994)",Comedy,599,471,2.5,2.95581
471,"Hudsucker Proxy, The (1994)",Comedy,603,471,4.0,4.042655


Using model to provide movie recommendations for some users

In [0]:
import databricks.koalas as ks

In [0]:
userRecs = best.recommendForAllUsers(10)
userRecs_ks = userRecs.to_koalas()
movie_ks = movies_df.to_koalas()

In [0]:
def recommend(Id):
  recs = []
  for rec in userRecs_ks.loc[str(Id), 'recommendations']:
    recs.append(str(rec[0]))
  return (movie_ks[movie_ks['movieId'].isin(recs)])

In [0]:
print("Recommended movies for user ID 69 are:")
recommend(69)

Unnamed: 0,movieId,title,genres
181,213,Burnt by the Sun (Utomlyonnye solntsem) (1994),Drama
897,1194,Cheech and Chong's Up in Smoke (1978),Comedy
2419,3214,American Flyers (1985),Drama
2423,3223,"Zed & Two Noughts, A (1985)",Drama
2665,3567,Bossa Nova (2000),Comedy|Drama|Romance
5467,26171,Play Time (a.k.a. Playtime) (1967),Comedy
7114,70946,Troll 2 (1990),Fantasy|Horror
8473,112804,I Origins (2014),Drama|Sci-Fi
9247,155509,Mr. Right (2016),Action|Comedy|Romance
9524,171867,Rough Night (2017),Comedy|Drama


In [0]:
print("Recommended movies for user ID 420 are:")
recommend(420)

Unnamed: 0,movieId,title,genres
2453,3266,Man Bites Dog (C'est arrivé près de chez vous)...,Comedy|Crime|Drama|Thriller
2569,3436,Dying Young (1991),Drama|Romance
4474,6611,Umberto D. (1952),Drama
4507,6669,Ikiru (1952),Drama
5074,7982,"Tale of Two Sisters, A (Janghwa, Hongryeon) (2...",Drama|Horror|Mystery|Thriller
5414,25825,Fury (1936),Drama|Film-Noir
5454,26116,"Hush... Hush, Sweet Charlotte (1964)",Horror|Thriller
7678,89118,"Skin I Live In, The (La piel que habito) (2011)",Drama
7704,89904,The Artist (2011),Comedy|Drama|Romance
9367,162414,Moonlight,Drama


Use model to find movies similar to other movies

In [0]:
factors = best.itemFactors.to_koalas()

In [0]:
def similarMovies(Id, matrix = 'cosine_similarity'):
  try:
    feature = factors.loc[factors.id == str(Id), 'features'].to_numpy()[0]
  except:
    return 'No movie with id ' + str(Id)
  if matrix == 'cosine_similarity':
    sim = pd.DataFrame(columns = ('movieId','cosine_similarity'))
    for i, f in factors.to_numpy():
      cossim = np.dot(feature, f)/(np.linalg.norm(feature)*np.linalg.norm(feature))
      sim = sim.append({'movieId':str(i), 'cosine_similarity':cossim}, ignore_index = True)
    sim_cossim = sim.sort_values(by = ['cosine_similarity'], ascending = False)[1:11]
    joint = sim_cossim.merge(movie_ks.to_pandas(), left_on = 'movieId', right_on = 'movieId', how = 'inner')
  if matrix == 'euclidean_distance':
    sim = pd.DataFrame(columns = ('movieId','euclidean_distance'))
    for i, f in factors.to_numpy():
      eudis = np.linalg.norm(np.array(feature) - np.array(f))
      sim = sim.append({'movieId':str(i), 'euclidean_distance':eudis}, ignore_index = True)
    sim_eudis = sim.sort_values(by = ['euclidean_distance'])[1:11]
    joint = sim_eudis.merge(movie_ks.to_pandas(), left_on = 'movieId', right_on = 'movieId', how = 'inner')
  return joint[['movieId','title','genres']]

In [0]:
similarMovies(177013)

In [0]:
print('Similar movies to Toy Story by cosine similarity are:')
similarMovies(1)

Unnamed: 0,movieId,title,genres
0,33649,Saving Face (2004),Comedy|Drama|Romance
1,6818,Come and See (Idi i smotri) (1985),Drama|War
2,7748,Pierrot le fou (1965),Crime|Drama
3,187,Party Girl (1995),Comedy
4,8477,"Jetée, La (1962)",Romance|Sci-Fi
5,3266,Man Bites Dog (C'est arrivé près de chez vous)...,Comedy|Crime|Drama|Thriller
6,4495,Crossing Delancey (1988),Comedy|Romance
7,6201,Lady Jane (1986),Drama|Romance
8,102217,Bill Hicks: Revelations (1993),Comedy
9,92494,Dylan Moran: Monster (2004),Comedy|Documentary


In [0]:
print('Similar movies to Toy Story by euclidean distance are:')
similarMovies(1, 'euclidean_distance')

Unnamed: 0,movieId,title,genres
0,1033,"Fox and the Hound, The (1981)",Animation|Children|Drama
1,2918,Ferris Bueller's Day Off (1986),Comedy
2,2797,Big (1988),Comedy|Drama|Fantasy|Romance
3,149330,A Cosmic Christmas (1977),(no genres listed)
4,184931,Death Wish (2018),Action|Crime|Drama|Thriller
5,1270,Back to the Future (1985),Adventure|Comedy|Sci-Fi
6,34534,Four Brothers (2005),Action|Crime|Drama
7,122882,Mad Max: Fury Road (2015),Action|Adventure|Sci-Fi|Thriller
8,26375,Silver Streak (1976),Action|Comedy|Crime
9,1197,"Princess Bride, The (1987)",Action|Adventure|Comedy|Fantasy|Romance
