**Item Based Collaborative filtering on MovieLens Dataset**

In [None]:
import os

# Download Java JDK Version 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and Install Apache Spark (with Findspark)
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
# Findspark for Jupyter Notebook (spark-2.4.4-bin-hadoop2.7)
import findspark
findspark.init()

# Start Apache Spark Session & Context
from pyspark import SparkContext
sc = SparkContext(appName='item-based-movie_recommendation-Model')

In [None]:
from google.colab import drive
drive.mount('/content/drive');

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
datasets_path = "drive/My Drive/Private/movie-dataset/"

# assigns the ratings raw data variable of the small movie ratings
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [None]:
# filters to remove the header from the data, splits the data into columns and assigns the three columns
# The four columns contained in the ratings data are userId,movieId,rating,timestamp
# We ignore timestamp in the modeling here
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [None]:
# The columns are userId,movieId,rating
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [None]:
# Assigns the movies title file of the small movie data set
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [None]:
small_movies_titles = small_movies_data.map(lambda x: (int(x[0]),x[1]))
small_movies_titles.take(3)

[(1, 'Toy Story (1995)'),
 (2, 'Jumanji (1995)'),
 (3, 'Grumpier Old Men (1995)')]

In [None]:
# create training, validation and test data sets
small_training_RDD, small_validation_RDD, small_test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
small_validation_for_predict_RDD = small_validation_RDD.map(lambda x: (x[0], x[1]))
small_test_for_predict_RDD = small_test_RDD.map(lambda x: (x[0], x[1]))

In [None]:
# Train the model using the training dataset and predict the error using the validation dataset
# Rank 40 gave the lowest error and this rank will be used to train the large dataset
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12,25,30,40,50]
errors = [0, 0, 0,0,0,0,0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(small_training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(small_validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = small_validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank
print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9121002114021121
For rank 8 the RMSE is 0.9184327213070025
For rank 12 the RMSE is 0.9160151537868968
For rank 25 the RMSE is 0.9132077259174307
For rank 30 the RMSE is 0.9137297663767054
For rank 40 the RMSE is 0.9112483859775192
For rank 50 the RMSE is 0.910364842826824
The best model was trained with rank 50


In [None]:
# The ALS trained model has userFeatures and productFeatures matrix
(model.userFeatures().count())

610

In [None]:
# The productFeatures matrix will be used to create an item-item collaborative filtering recommendation model
model.productFeatures().count()

8108

In [None]:
model.productFeatures().lookup(1084)[0]

array('d', [0.39810314774513245, -0.29046911001205444, -0.1648561954498291, 0.3116704225540161, -0.7353532314300537, -0.6675311923027039, 0.6000886559486389, 0.19133085012435913, 0.022775545716285706, -0.5071583390235901, 0.20149832963943481, 0.3613540232181549, -0.08961668610572815, -0.3068521320819855, 0.4140118658542633, 0.218542218208313, 0.13478650152683258, -0.24588732421398163, 0.012199376709759235, -0.279768168926239, 0.263459712266922, 0.10598113387823105, 0.5126112699508667, -0.7114983201026917, -0.2845548689365387, 0.09647301584482193, 0.2586582899093628, -0.24576963484287262, 0.19419355690479279, 0.322068989276886, -0.18895022571086884, 0.08893996477127075, 0.2329738736152649, -0.1257471889257431, 0.06765387207269669, -0.1970863789319992, -0.09525509178638458, 0.11352469772100449, 0.06367164850234985, -0.12774166464805603, -0.08439932018518448, 0.2912535071372986, 0.6823577880859375, -0.26788848638534546, -0.11575732380151749, -0.172343909740448, 0.33088189363479614, 0.2795

In [None]:
import numpy as np
from numpy import linalg as LA

In [None]:
def cosineSimilarity(vec1, vec2):
  return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))

In [None]:
# Choose a movie ID to predict the similar movies to it
itemId = 1084
itemFactor = np.asarray(model.productFeatures().lookup(itemId))[0]
itemFactor

array([ 0.39810315, -0.29046911, -0.1648562 ,  0.31167042, -0.73535323,
       -0.66753119,  0.60008866,  0.19133085,  0.02277555, -0.50715834,
        0.20149833,  0.36135402, -0.08961669, -0.30685213,  0.41401187,
        0.21854222,  0.1347865 , -0.24588732,  0.01219938, -0.27976817,
        0.26345971,  0.10598113,  0.51261127, -0.71149832, -0.28455487,
        0.09647302,  0.25865829, -0.24576963,  0.19419356,  0.32206899,
       -0.18895023,  0.08893996,  0.23297387, -0.12574719,  0.06765387,
       -0.19708638, -0.09525509,  0.1135247 ,  0.06367165, -0.12774166,
       -0.08439932,  0.29125351,  0.68235779, -0.26788849, -0.11575732,
       -0.17234391,  0.33088189,  0.27958229,  0.45832768, -0.14493284])

In [None]:
cosineSimilarity(itemFactor,itemFactor)

1.0000000000000002

In [None]:
# similarity matrix built for a particular movie item using the product features matrix
sims = model.productFeatures().map(lambda products:(products[0],
                                        cosineSimilarity(np.asarray(products[1]), itemFactor)))\
                                .join(small_movies_titles).map(lambda r: (r[1][1], r[1][0], r[0]))
sims.take(3)

[('Waiting to Exhale (1995)', 0.6360031591374433, 4),
 ('Tom and Huck (1995)', 0.45465474118549326, 8),
 ('Dracula: Dead and Loving It (1995)', 0.47638501529621, 12)]

In [None]:
sortedSims = sims.takeOrdered(20, key=lambda x: -x[1])
sortedSims

[('Bonnie and Clyde (1967)', 1.0000000000000002, 1084),
 ('3:10 to Yuma (1957)', 0.9485227892121055, 5300),
 ('Lenny (1974)', 0.9485227892121055, 5341),
 ('"4 Months', 0.9485227892121055, 55069),
 ('Thief (1981)', 0.9485227892121055, 5867),
 ('"Story of Women (Affaire de femmes', 0.9485227880204576, 8804),
 ('Control (2007)', 0.9485227880204576, 55444),
 ('"Last Seduction', 0.9485227880204576, 5893),
 ('"Passenger', 0.9485227880204576, 26350),
 ('"Marriage of Maria Braun', 0.9485227880204576, 31522),
 ('"Lust', 0.9485227873491127, 55253),
 ('Little Children (2006)', 0.9477962784256139, 48696),
 ("Babette's Feast (Babettes gæstebud) (1987)", 0.9447845882000453, 6791),
 ('Clockwatchers (1997)', 0.9419216814569622, 1875),
 ('"Palm Beach Story', 0.9366419346082678, 2937),
 ('Secrets & Lies (1996)', 0.9336650957969266, 1041),
 ('Pat Garrett and Billy the Kid (1973)', 0.9335813218582445, 7889),
 ('In the Heat of the Night (1967)', 0.9329165534542325, 1950),
 ('"Face in the Crowd', 0.93042002

In [None]:
predictions.take(3)

[((372, 1084), 3.522072819467323),
 ((4, 1084), 3.540132540956333),
 ((402, 1084), 3.319105112869569)]

In [None]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.521268898300244)),
 ((1, 1025), (5.0, 4.517873821845743)),
 ((1, 1089), (5.0, 4.7671637254739085))]

In [None]:
# Load the complete ratings dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print('There are %s recommendations in the complete dataset' % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [None]:
# Training and Test data from the complete movie dataset
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [None]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8189249933498421


In [None]:
# Load the complete movie titles file
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print('There are %s movies in the complete dataset' % (complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [None]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
complete_model.productFeatures().count()

50094

In [None]:
movie_ID_with_avg_ratings_RDD.count()

53889

In [None]:
# Chosen MovieID for calculating similar movies
itemId = 25800

complete_itemFactor = np.asarray(complete_model.productFeatures().lookup(itemId))[0]

# calculating the similarity coefficients of all the movies against the chosen movie ID
complete_sims = complete_model.productFeatures().map(lambda products:(products[0],
                                        cosineSimilarity(np.asarray(products[1]), complete_itemFactor)))\
                                .join(complete_movies_titles).join(movie_rating_counts_RDD)\
                                .join(movie_ID_with_avg_ratings_RDD)
complete_sims = complete_sims.map(lambda r: (r[1][0][0][1], r[1][0][0][0], r[0], r[1][0][1], r[1][1][1]))

In [None]:
# Filter the data by movieIDs that have the count of user ratings higher than five
# Sort the similarity data by decreasing order of the similarity coefficient, and then take the top 20 from the ordered
# list
complete_sortedSims = complete_sims.filter(lambda r: r[3]>=5).takeOrdered(20, key=lambda x: -x[1])
complete_sortedSims

[('Lady for a Day (1933)', 0.9999999999999999, 25800, 23, 3.5),
 ('Fanny (1961)', 0.9787880117586683, 63629, 14, 3.3214285714285716),
 ('Take Out (2004)', 0.9648250427600895, 90895, 10, 3.5),
 ('The Possibilities Are Endless (2014)',
  0.9627766247028035,
  135228,
  7,
  4.071428571428571),
 ('The Mobfathers (2016)', 0.9621657346771444, 161976, 6, 2.5833333333333335),
 ('Teenage Kicks (2016)', 0.9609674082641296, 169190, 7, 3.2857142857142856),
 ('Vertical Features Remake (1978)', 0.960817656462365, 109598, 8, 4.0625),
 ('Sin hijos (2015)', 0.9607987565712782, 153830, 5, 3.0),
 ('"Pool', 0.9605592842410388, 62420, 11, 3.3636363636363638),
 ('Temptation of a Monk (You Seng) (1993)', 0.9593227749239196, 7754, 5, 2.2),
 ("Manuscripts Don't Burn (2013)", 0.9579743909949162, 149616, 5, 3.9),
 ('In the Shadow of Women (2015)',
  0.9560931551192208,
  152258,
  6,
  3.6666666666666665),
 ('Social Genocide (Memoria del saqueo) (2004)',
  0.9560019842385644,
  109251,
  6,
  3.75),
 ('#chicago

In [None]:
# A test variable is created for pickling the productFeatures matrix data, combined with movie titles
# and average ratings.  
test2 = complete_model.productFeatures().join(complete_movies_titles)\
                                .join(movie_ID_with_avg_ratings_RDD)

In [None]:
# pickle the productFeatures matrix
test2.saveAsPickleFile(datasets_path+'item_based_features')