Let's setup Spark on your Colab environment. Run the cell below!


In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!pip install keras
!pip install scikit-surprise
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!pip install findspark
!pip install wget



In [2]:
import pandas as pd
from pyspark import SparkConf, SparkContext, sql
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import findspark
findspark.add_packages('mysql:mysql-connector-java:8.0.11')
import keras
import os.path
from os import path
from zipfile import ZipFile
import wget


# **1. Download the movie lens dataset and extract** 

In [3]:
# Download the actual data from http://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
dataset_path = "dataset/"

if not path.exists(dataset_path):
    !mkdir dataset
    #!wget -P dataset/ https://files.grouplens.org/datasets/movielens/ml-25m.zip
    #data_url = "https://files.grouplens.org/datasets/movielens/ml-25m.zip"
    data_url1 = "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
    wget.download(data_url1, 'dataset/')
else:
  print("dataset already exist. No need to download ")


dataset already exist. No need to download 


In [4]:
# Only extract the data the first time the script is run.
movielens_dir = dataset_path + "/ml-latest-small"
movielens_zipped_file = dataset_path + "ml-latest-small.zip"

if not path.exists(movielens_dir):
    with ZipFile(movielens_zipped_file, "r") as z:
        # Extract files
        print("Extracting all the files now...")
        z.extractall(path=dataset_path)
        print("Done!")
else:
   print("dataset already exist. No need to extract ")


dataset already exist. No need to extract 


# **2. Setup the big data environment with pyspark**

In [5]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")
# create the context
sc = SparkContext(conf=conf)

spark = SparkSession \
    .builder \
    .appName("Movie recommendation") \
    .getOrCreate()


KeyboardInterrupt: 

In [None]:
spark

# **3. Data exploration and cleaning**


**a .Reading the downloaded movie dataset to pyspark dataframe**



In [6]:
ratings_file = movielens_dir + "/ratings.csv"
movies_file = movielens_dir + "/movies.csv"


In [None]:
# Define the dataset schema 

from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('title', StringType()),
   StructField('genres', StringType())]
)

# creating the pyspark dataframes and cache in memory

ratings_df = spark.read\
                  .options(header =True, inferSchema=False)\
                  .schema(ratings_df_schema)\
                  .csv(ratings_file)
movies_df = spark.read\
                .options(header =True, inferSchema=False)\
                .schema(movies_df_schema)\
                .csv(movies_file)

ratings_df.cache()
movies_df.cache()

In [None]:
ratings_df.show(10, truncate=False)
movies_df.show(10, truncate=False)

In [None]:
ratings_df


In [None]:
movies_df

# **4. Building the user-based collaborative filtering**



> For the this project, we will use the SurPRISE (Simple Python RecommendatIon System Engine) library


> This is because it is faster and has an integrated SVD algorithms, a Matrix factorization algoritms





In [7]:
# Import libraries from Surprise package

from surprise import Reader, Dataset, SVD, SVDpp
from surprise import accuracy
from surprise.model_selection import cross_validate

In [8]:
# Surprise is only compatible with pandas. So we will convert the pyspark dataframes to pandas dataframes
#ratings_df_pd =ratings_df.toPandas()

In [9]:
ratings_pd = pd.read_csv(ratings_file)


In [10]:
ratings_pd = ratings_pd[['userId', 'movieId', 'rating']]
movies_pd = pd.read_csv(movies_file)

In [11]:
movies_pd

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
...,...,...,...
9737,193581,Black Butler: Book of the Atlantic (2017),Action|Animation|Comedy|Fantasy
9738,193583,No Game No Life: Zero (2017),Animation|Comedy|Fantasy
9739,193585,Flint (2017),Drama
9740,193587,Bungo Stray Dogs: Dead Apple (2018),Action|Animation


In [12]:
ratings_pd

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0
3,1,47,5.0
4,1,50,5.0
...,...,...,...
100831,610,166534,4.0
100832,610,168248,5.0
100833,610,168250,5.0
100834,610,168252,5.0


In [13]:
ratings_pd.dtypes

userId       int64
movieId      int64
rating     float64
dtype: object

In [23]:
# Load Reader library
reader = Reader(rating_scale=(1, 5))

# Load ratings dataset with Dataset library
data = Dataset.load_from_df(ratings_pd, reader)


svd = SVD(n_factors=50)
#svd_plusplus = SVDpp(n_factors=50)


# Run 5-fold cross-validation and print results.

cross_validate(svd, data, measures=['RMSE', 'MAE'], cv=5, verbose=True)

Evaluating RMSE, MAE of algorithm SVD on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.8691  0.8740  0.8724  0.8767  0.8649  0.8714  0.0041  
MAE (testset)     0.6667  0.6714  0.6743  0.6718  0.6630  0.6694  0.0041  
Fit time          3.20    2.83    2.71    2.69    2.70    2.83    0.19    
Test time         0.10    0.10    0.15    0.10    0.10    0.11    0.02    


{'test_rmse': array([0.86905931, 0.87403258, 0.87240831, 0.87674671, 0.86490019]),
 'test_mae': array([0.66665083, 0.67140472, 0.6742839 , 0.67176314, 0.66295676]),
 'fit_time': (3.195455312728882,
  2.833423376083374,
  2.713743209838867,
  2.685817003250122,
  2.7037711143493652),
 'test_time': (0.10372352600097656,
  0.10372233390808105,
  0.14561080932617188,
  0.10272598266601562,
  0.09973263740539551)}

We get a mean Root Mean Sqaure Error of 0.8714 which is more than good enough for our case. Let us now train on our dataset and arrive at predictions.

In [15]:
# Build the the training set and fit the model

trainset = data.build_full_trainset()

svd.fit(trainset)  # old version use svd.train

<surprise.prediction_algorithms.matrix_factorization.SVD at 0x20acf902490>

In [16]:
id_2_names = dict()

for idx, names in zip(movies_pd['movieId'], movies_pd['title']):
    id_2_names[idx] = names

In [17]:
def create_test_set(user_id):
    
    fill = trainset.global_mean
    anti_testset = list()
    u = trainset.to_inner_uid(user_id)
    
    # ur == users ratings
    user_items = set([item_inner_id for (item_inner_id, rating) in trainset.ur[u]])
    
    anti_testset += [(trainset.to_raw_uid(u), trainset.to_raw_iid(i), fill) for
                            i in trainset.all_items() if i not in user_items]
    
    return anti_testset

Now let us create a fuction to implement a top 10 movies rated by a user



In [18]:
def top_recommendations(user_id, num_recommender=10):
    
    testSet = create_test_set(user_id)
    predict = svd.test(testSet)  # we can change to SVD++ later
    
    recommendation = list()
    
    for userID, movieID, actualRating, estimatedRating, _ in predict:
        intMovieID = int(movieID)
        recommendation.append((intMovieID, estimatedRating))
        
    recommendation.sort(key=lambda x: x[1], reverse=True)
    
    movie_names = []
    movie_ratings = []
    
    for name, ratings in recommendation[:20]:
        movie_names.append(id_2_names[name])
        movie_ratings.append(ratings)
        
    movie_dataframe =  pd.DataFrame({'movie_names': movie_names,
                                     'rating': movie_ratings})
    
    
    return movie_dataframe.sort_values('rating', ascending=False)[['movie_names', 'rating']].head(num_recommender)
    

In [19]:
# Now let us simulate the top 10 movies recommended movies for user 600

top_recommendations(600, num_recommender=10)

Unnamed: 0,movie_names,rating
0,One Flew Over the Cuckoo's Nest (1975),4.394664
1,"Avengers, The (2012)",4.094646
2,Jaws (1975),4.084902
3,Cool Hand Luke (1967),4.084296
4,"Departed, The (2006)",4.070869
5,"Grand Day Out with Wallace and Gromit, A (1989)",4.067885
6,In Bruges (2008),4.049651
7,Wallace & Gromit: The Wrong Trousers (1993),4.046354
8,"Godfather: Part II, The (1974)",4.044107
9,Army of Darkness (1993),4.036432


# Model evaluation

In [20]:
# Than predict ratings for all pairs (u, i) that are NOT in the training set.
testset = trainset.build_anti_testset()

predictions_svd = svd.test(testset)

In [21]:
print('SVD - RMSE:', accuracy.rmse(predictions_svd, verbose=False))
print('SVD - MAE:', accuracy.mae(predictions_svd, verbose=False))

SVD - RMSE: 0.48725238724656844
SVD - MAE: 0.37719565898840046


From the above evaluation metrics, the model obtained an RMSE of 0.4872 during the testing phase which is pretty good

## Testing the model on a user
Now let's use SVD to predict the rating that User with ID 500 will give to a random movie (let's say with Movie ID 100).

In [22]:
svd.predict(500, 100)

Prediction(uid=500, iid=100, r_ui=None, est=2.6362786014980713, details={'was_impossible': False})

For movie with ID 100, I get an estimated prediction of 2.6362. The recommender system works purely on the basis of an assigned movie ID and tries to predict ratings based on how the other users have predicted the movie.

# Conclusion

In this notebook, I attempted to build a model-based Collaborative Filtering movie recommendation sytem based on latent features from matrix factorization method called SVD. As it captures the underlying features driving the raw data, it can scale significantly better to massive datasets as well as make better recommendations based on user's tastes.