In [0]:
dbutils.library.installPyPI("koalas")

In [0]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
import databricks.koalas as ks
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType
from mpl_toolkits.mplot3d import Axes3D

import warnings
warnings.filterwarnings("ignore")

os.environ["PYSPARK_PYTHON"] = "python3"

In [0]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [0]:
## PART1: Data Exploration and Data ETL

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [0]:
movies_df = spark.read.load("/FileStore/tables/movies-1.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings-2.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links-3.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags-1.csv", format='csv', header = True)

In [0]:
movies_df.show(5)

In [0]:
ratings_df.show(5)

In [0]:
links_df.show(5)

In [0]:
tags_df.show(5)

In [0]:
tmp1 = ratings_df.groupBy("userID").count().select('count').rdd.min()[0]
tmp2 = ratings_df.groupBy("movieId").count().select('count').rdd.min()[0]
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))
# For the users that rated movies and the movies that were rated:
# Minimum number of ratings per user is 20
# Minimum number of ratings per movie is 1

In [0]:
tmp1 = ratings_df.groupBy("movieId").count().filter('count = 1').count()
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))
# 3446 out of 9724 movies are rated by only one user

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

In [0]:
%sql 
Select count(distinct userID) as Number_of_users from ratings

In [0]:
%sql 
Select count(distinct movieId) as Numer_of_movies from movies

In [0]:
%sql 
Select count(distinct b.movieId) as Number_movies_are_rated_by_users from ratings as b

In [0]:
# check null in tables
print("If there is missing data in each table?")
print('movie_df: {}'.format(movies_df.count() == movies_df.na.drop().count()))
print('ratings_df: {}'.format(ratings_df.count() == ratings_df.na.drop().count()))
print('links_df: {}'.format(links_df.count() == links_df.na.drop().count()))
print('tags_df: {}'.format(tags_df.count() == tags_df.na.drop().count()))

In [0]:
%sql
Select distinct genres from movies

In [0]:
%sql
Select distinct explode(split(genres,'[|]')) as Category from movies Order By Category

In [0]:
%sql
Select Category, count(movieId) as number from movies
lateral view explode(split(genres,'[|]')) as Category group by Category order by number desc

In [0]:
%sql
select t.Category, concat_ws(',',collect_set(t.title)) as list_of_movies from
  (
    Select Category, title from movies
    lateral view explode(split(genres,'[|]')) as Category 
    group by Category, title
    ) as t
group by t.Category

In [0]:
movie_ratings=ratings_df.drop('timestamp')

In [0]:
movie_ratings.show(5)

In [0]:
# deal with data type 
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [0]:
movie_ratings.show()

In [0]:
# ALS model selection and evaluation

# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [0]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2], seed = 2020)

In [0]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [0]:
#Tune model using ParamGridBuilder
paramGrid = ParamGridBuilder()\
            .addGrid(als.regParam, [0.1, 0.01, 0.001])\
            .addGrid(als.maxIter, [3, 5, 10])\
            .addGrid(als.rank, [5, 10, 15])\
            .build()

In [0]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [0]:
# Build Cross validation 
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

In [0]:
#Fit ALS model to training data
model = als.fit(training)

In [0]:
#Extract best model from the tuning exercise using ParamGridBuilder
cvModel = crossval.fit(training)
predictions = cvModel.transform(training)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [0]:
#Generate predictions and evaluate using RMSE
best_model = cvModel.bestModel
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [0]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:"+str(best_model._java_obj.parent().getRank())), 
print (" MaxIter:"+str(best_model._java_obj.parent().getMaxIter())), 
print (" RegParam:"+str(best_model._java_obj.parent().getRegParam()))

In [0]:
predictions.show()

In [0]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [0]:
alldata.registerTempTable("alldata")

In [0]:
%sql 
select * from alldata

In [0]:
%sql 
select * from movies join alldata on movies.movieId=alldata.movieId

In [0]:
# recommend movie to users id: 232, 575
userRecs = best_model.recommendForAllUsers(10)
display(userRecs.filter(userRecs.userId == 575))

In [0]:
user_recommendation = userRecs.to_koalas()

In [0]:
user_recommendation.head()

In [0]:
movies_koalas = movies_df.to_koalas()
def movie_recommendation(user_recommendation, userId, movies_koalas):
  rec_movieId = []
  for item in user_recommendation.loc['userId' == userId][1]:
    rec_movieId.append(item[0])
  return movies_koalas.loc[movies_koalas.movieId.isin(rec_movieId)]

In [0]:
movie_recommendation(user_recommendation, 575, movies_koalas)

In [0]:
movie_recommendation(user_recommendation, 232, movies_koalas)