# set up packages

In [1]:
# load packages
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
location = "D:/py_movie_recommendation_system/data/"
movies_df = spark.read.load(location+"movies.csv", format='csv', header = True)
ratings_df = spark.read.load(location+"ratings.csv", format='csv', header = True)
links_df = spark.read.load(location+"links.csv", format='csv', header = True)
tags_df = spark.read.load(location+"tags.csv", format='csv', header = True)

# data preprocessing

In [4]:
# drop useless column
movie_ratings=ratings_df.drop('timestamp')

In [5]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [6]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



# train test split

In [7]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [8]:
# Create test and train set
(training,test) = movie_ratings.randomSplit([0.8,0.2])

# tune model

In [9]:
# Create ALS model
model_als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed=202112)

In [10]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [12]:
# Tune model using ParamGridBuilder
params = ParamGridBuilder().addGrid(model_als.maxIter, [3, 5, 10]).addGrid(model_als.regParam, [0.1, 0.01, 0.001]).addGrid(model_als.rank, [5, 10, 15]).addGrid(model_als.alpha, [0.1, 0.01, 0.001]).build()

# Build Cross validation
cv_creator=CrossValidator(estimator=model_als,estimatorParamMaps=params,evaluator=evaluator,numFolds=4,seed=202112)

#Fit ALS model to training data
cv_model=cv_creator.fit(training)

In [13]:
best_model=cv_model.bestModel
best_params=cv_model.getEstimatorParamMaps()[np.argmin(cv_model.avgMetrics)]
print('Best ALS model parameters by CV:')
for i,j in best_params.items():
  print('-> '+i.name+': '+str(j))

Best ALS model parameters by CV:
-> maxIter: 10
-> regParam: 0.1
-> rank: 15
-> alpha: 0.1


# model tesing

In [14]:
#Generate predictions and evaluate using RMSE
prediction_test = best_model.transform(test)
rmse_test = evaluator.evaluate(prediction_test)
print("Root-mean-square error for testing data is " + str(rmse_test))

Root-mean-square error for testing data is 0.8842470570103346


# qualitative check

In [15]:
# define a function to package the recommendation
def topKRecommend(k,id,model):
  '''
  k: the number of movies to recommend
  id: the id of the user to give recommendations
  model: the trained model for recommendation
  '''
  # the table for all top10 recommendations
  all_recommd=model.recommendForAllUsers(k)
  user_recommd=all_recommd.where(all_recommd.userId==id).toPandas()
  if user_recommd.shape[0]==0:
    print('No user with id '+str(id)+' is found in the data.')
    return None
  user_recommd=user_recommd.iloc[0,1]
  user_recommd=pd.DataFrame(user_recommd,columns=['movieId','predicted_ratings'])
  temp=None
  for i in user_recommd['movieId']:
    if not temp:
      temp=movies_df.where(movies_df.movieId==str(i))
    else:
      temp=temp.union(movies_df.where(movies_df.movieId==str(i)))
  out=pd.concat([temp.toPandas(),user_recommd['predicted_ratings']],axis=1)
  out.index=range(1,k+1)
  return out

RMSE = 0.6108675896948604


In [17]:
# access the movie factor matrix
movie_factors=best_model.itemFactors
movie_factors.printSchema()
comd=["movie_factors.selectExpr('id as movieId',"]
for i in range(best_model.rank):
  if i<best_model.rank-1:
    comd.append("'features["+str(i)+"] as feature"+str(i)+"',")
  else:
    comd.append("'features["+str(i)+"] as feature"+str(i)+"'")
comd.append(')')
movie_factors=eval(''.join(comd))
movie_factors.createOrReplaceTempView('movie_factors')
movie_factors.show()

root
 |-- id: integer (nullable = false)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = false)

+-------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----------+-----------+-----------+----------+------------+-------------+----------+
|movieId|   feature0|   feature1|   feature2|   feature3|    feature4|    feature5|    feature6|    feature7|   feature8|   feature9|  feature10| feature11|   feature12|    feature13| feature14|
+-------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----------+-----------+-----------+----------+------------+-------------+----------+
|     10| 0.41162598|  0.8771659|  0.5593326| -0.5951011|   0.1903169| -0.39384925| -0.35029674| 0.013529366|  0.5653978| -0.9044589|-0.31923106|0.63338375|  -0.7000344|  -0.19393574| 0.4127357|
|     20| 0.52996415|   0.994044|-0.14460136|-0.06842906|   0.6973475| -0.07843322| 0.05988



In [16]:
# cosine similarity
# the larger the cosine value, the smaller the two feature vectors' angle, the similar the movies
# this similarity considers the direction only,
# e.g. movie 1 with factor [1,2,3] and movie 2 with factor [2,4,6] are considered the same
def cos_similar(k,mid):
  '''
  k: number of similar movies to find
  mid: id of the movie to find similarities
  '''
  movie_info=spark.sql('select * from movie_factors where movieId='+str(mid)).toPandas()
  if movie_info.shape[0]<=0:
    print('No movie with id '+str(mid)+' is found in the data.')
    return None, None
  norm_m=sum(movie_info.iloc[0,1:].values**2)**0.5
  temp=['select movieId,']
  norm_str=['sqrt(']
  for i in range(best_model.rank):
    comd='feature'+str(i)+'*'+str(movie_info.iloc[0,i+1])
    temp.append(comd+' as inner'+str(i)+',')
    if i<best_model.rank-1:
      norm_str.append('feature'+str(i)+'*feature'+str(i)+'+')
    else:
      norm_str.append('feature'+str(i)+'*feature'+str(i))
  norm_str.append(') as norm')
  temp.append(''.join(norm_str))
  temp.append(' from movie_factors where movieId!='+str(mid))
  inner=spark.sql(' '.join(temp))
  inner=inner.selectExpr('movieId',\
                         '(inner0+inner1+inner2+inner3+inner4)/norm/'+str(norm_m)+' as innerP').\
                         orderBy('innerP',ascending=False).limit(k).toPandas()
  out=None
  for i in inner['movieId']:
    if not out:
      out=movies_df.where(movies_df.movieId==str(i))
    else:
      out=out.union(movies_df.where(movies_df.movieId==str(i)))
  out=out.toPandas()
  out.index=range(1,k+1)
  return out, inner