## DS 5110: Big Data Systems
# Final Project: Netflix Recommender System & Ratings Predictions
### Additional ALS Exploration (Hyperparameters, Personalized Recommendations)
#### Lauren Neal (ln9bv) | Melanie Sattler (ms9py) | Nick Thompson (nat3fa) | Nima Beheshti (nb9pp)

In [1]:
# import modules
import os

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib import recommendation
from pyspark.mllib.recommendation import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

from pyspark.sql import SparkSession

In [2]:
conf = SparkConf().setMaster("local").setAppName("netflix")
sc = SparkContext.getOrCreate(conf=conf)

In [3]:
# Load and parse the data
data = sc.textFile("processed_all.txt")

header = data.first() ## assign first record name "header"
data_noh = data.filter(lambda row: row != header) ## filter "header" record from RDD

# Sample 10% of rawDataRDD (to reduce runtime) using seed 314. Call it sample.
seed = 314
weights = [0.75, 0.25]
(training, test) = data_noh.randomSplit(weights, seed)
training.cache()
test.cache()

training.take(5)

['1488844,3.0,1',
 '822109,5.0,1',
 '885013,4.0,1',
 '30878,4.0,1',
 '823519,3.0,1']

In [4]:
ratings_train = training.map(lambda x: x.split(','))\
                        .map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))



ratings_test = test.map(lambda x: x.split(','))\
                   .map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))

In [5]:
ratings_train.take(5)

[Rating(user=1488844, product=1, rating=3.0),
 Rating(user=822109, product=1, rating=5.0),
 Rating(user=885013, product=1, rating=4.0),
 Rating(user=30878, product=1, rating=4.0),
 Rating(user=823519, product=1, rating=3.0)]

In [6]:
ratings_test.take(5)

[Rating(user=2207774, product=1, rating=5.0),
 Rating(user=1086807, product=1, rating=3.0),
 Rating(user=1181550, product=1, rating=3.0),
 Rating(user=2263586, product=1, rating=4.0),
 Rating(user=2508819, product=1, rating=3.0)]

For loop to test mulitple rank values

In [7]:
print("Started training at :")
print(str(datetime.now()))
print()

rank = (5, 10, 15, 20)
numIterations = 10
alpha = 0.01
results = []
for i in rank:
    model = ALS.train(ratings_train, i, numIterations, alpha)
    test_obscure_rating = ratings_test.map(lambda x: (x[0], x[1]))
    predictions = model.predictAll(test_obscure_rating).map(lambda x: ((x[0], x[1]), x[2]))
    ratesAndPreds = ratings_test.map(lambda x: ((x[0], x[1]), x[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda x: (x[1][0] - x[1][1])**2).mean()
    results.append(MSE)    
print("Finished training at :")
print(str(datetime.now()))

Started training at :
2021-08-02 13:45:17.644582

Finished training at :
2021-08-02 16:35:56.923087
[0.7476981089509888, 0.7276632443801131, 0.7269436888340164, 0.7328898167333998]


In [9]:
Root_MSE = []
for i in results:
    RMSE = i**(0.5)
    Root_MSE.append(RMSE)
    
df_test = pd.DataFrame(list(zip(rank,results,Root_MSE)), columns = ['rank', 'MSE', 'RMSE'])
df_test

Unnamed: 0,rank,MSE,RMSE
0,5,0.747698,0.864695
1,10,0.727663,0.853032
2,15,0.726944,0.85261
3,20,0.73289,0.85609


Lauren's originial code with rank changed to 1

In [7]:
print("Started training at :")
print(str(datetime.now()))
print()

# Build model (ALS)
rank = 15
numIterations = 10
alpha = 0.01

model = ALS.train(ratings_train, 
                  rank, 
                  numIterations, alpha)

print("Finished training at :")
print(str(datetime.now()))

Started training at :
2021-08-07 22:51:01.891872

Finished training at :
2021-08-07 23:10:44.474155


In [8]:
# Evaluate on test data (25% sample of 'processed_all.txt')

## create RDD of test data that does not include ratings
test_obscure_rating = ratings_test.map(lambda x: (x[0], x[1]))

## use RDD w/o ratings to make predictions
predictions = model.predictAll(test_obscure_rating).map(lambda x: ((x[0], x[1]), x[2]))

## add/join predictions to original test data RDD
ratesAndPreds = ratings_test.map(lambda x: ((x[0], x[1]), x[2])).join(predictions)

## calculate MSE and RMSE using predicted ratings versus actual ratings
MSE = ratesAndPreds.map(lambda x: (x[1][0] - x[1][1])**2).mean()
RMSE = MSE**(0.5)


In [9]:
print("Model: ALS")
print('')
print("alpha: "+ str(alpha))
print("Train/Test Split: "+ str(weights))
print('')
print("Total Features Selected: "+ str(rank))
print('')
print("Mean Squared Error: " + str(MSE))
print("Root Mean Squared Error: " + str(RMSE))

Model: ALS

alpha: 0.01
Train/Test Split: [0.75, 0.25]

Total Features Selected: 15

Mean Squared Error: 0.7282367290687349
Root Mean Squared Error: 0.8533678744063049


In [10]:
ratesAndPreds.take(5)

[((209573, 1), (4.0, 3.9097003814804343)),
 ((1959936, 2), (5.0, 2.990446066638587)),
 ((755319, 3), (3.0, 3.5455251470879565)),
 ((596255, 3), (1.0, 0.9982883758030261)),
 ((499971, 3), (5.0, 4.221142056113273))]

Trying to turn the RDD into a Dataframe

# Split

In [11]:
movie_name = sc.textFile("movie_titles.csv")

In [12]:
movie_name.take(10)

['1,2003,Dinosaur Planet',
 '2,2004,Isle of Man TT 2004 Review',
 '3,1997,Character',
 "4,1994,Paula Abdul's Get Up & Dance",
 '5,2004,The Rise and Fall of ECW',
 '6,1997,Sick',
 '7,1992,8 Man',
 '8,2004,What the #$*! Do We Know!?',
 "9,1991,Class of Nuke 'Em High 2",
 '10,2001,Fighter']

In [13]:
def parsemovie_namePair(singlePair):
   splitPair = singlePair.rsplit(',')
   # we should have two items in the list - id and name of the artist.
   if len(splitPair) != 3:
       #print singlePair
       return []
   else:
       try:
           return [(int(splitPair[0]), splitPair[2])]
       except:
           return []
movie_name_all = dict(movie_name.flatMap(lambda x: parsemovie_namePair(x)).collect())
#movie_name_val
#movie_name_ID = movie_name.flatMap(lambda x: parsemovie_namePair(x)).collectAsMap()
movie_names = movie_name_all.values()
list(movie_names)[:10]

#movie_names = movie_name.flatMap(lambda x: (parsemovie_namePair(x)))\
 #                      .map(lambda x: x[2])
#movie_names.take(10)


['Dinosaur Planet',
 'Isle of Man TT 2004 Review',
 'Character',
 "Paula Abdul's Get Up & Dance",
 'The Rise and Fall of ECW',
 'Sick',
 '8 Man',
 'What the #$*! Do We Know!?',
 "Class of Nuke 'Em High 2",
 'Fighter']

In [18]:
# Model Evaluation

# fetch artists for a test user
testUserID = 499971

# broadcast artistByID for speed
movie_all_Broadcast = sc.broadcast( movie_name_all )

# from trainData, collect the artists for the test user. Call the object artistsForUser.
# hint: you will need to apply .value.get(x.product) to the broadcast artistByID, where x is the Rating RDD.
# if you don't do this, you may see artistIDs. you want artist names.
artistsForUser = (ratings_train
                  .filter(lambda observation: observation.user == testUserID)
                  .map(lambda observation: movie_all_Broadcast.value.get(observation.product))
                  .collect())

In [19]:
print([x for x in artistsForUser if x is not None])

['Three Days of the Condor', 'American Beauty', 'Eat Drink Man Woman', 'The Door in the Floor', 'Bend It Like Beckham', 'Alex and Emma', 'Eternal Sunshine of the Spotless Mind', 'Talk to Her', 'Mean Machine', 'Elf', "I'm Not Scared", 'Napoleon Dynamite', 'About a Boy', 'The Last Kiss', 'Whale Rider', 'Garden State', 'Fever Pitch', 'Pelle the Conqueror', 'Me and You and Everyone We Know', 'The Mother', 'Seabiscuit', 'Malena', 'Baran', 'The Story of the Weeping Camel', 'Collateral', 'The Color of Paradise', 'City of God', 'Under the Sand', 'Love Me If You Dare', "I'll Sleep When I'm Dead", 'The Official Story', 'Cold Mountain', 'Pulp Fiction', 'Lord of the Rings: The Two Towers', 'The Crime of Padre Amaro', 'Twisted', 'Lost in Translation', 'Under the Sun', 'Troy', 'A Home at the End of the World', 'Children of Heaven', 'The Piano Teacher', 'Spider-Man', 'King Arthur', 'Dogville', 'The Day After Tomorrow', 'Last Orders', 'Van Helsing', 'Winged Migration', 'American Splendor', 'Wimbledon'

In [20]:
len(artistsForUser)

54

In [23]:
num_recomm = 100 
recommendationsForUser = map(lambda observation: movie_name_all.get(observation.product), model.call("recommendProducts", testUserID, num_recomm))
print([x for x in recommendationsForUser if x is not None])

['King Lear', 'Heimat', 'Rumpole of the Bailey: Series 7', 'Shakira: Live and Off the Record', 'The Flowers of St. Francis', 'Paheli', 'Goddess of Mercy', 'Rumpole of the Bailey: Series 5', 'Pride FC: Body Blow', 'Hamish Macbeth: Season 1', 'Babylon 5: Season 4', 'The Quick & Dirty Guide to Salsa: Part 1: Beginners', "Kiki's Delivery Service: Bonus Material", 'Midori Days', 'Buffy the Vampire Slayer: Season 3', 'Main Hoon Na', 'Gadjo Dilo', 'Beyonce: Live at Wembley', 'Fighter Pilot: Operation Red Flag', 'Babylon 5: Season 2', 'Babylon 5: Season 3', 'Life of Rayful Edmond: The Rise & Fall 1', 'Storm Over Asia', 'Catnapped! The Movie', 'Samurai Trilogy 3: Duel at Ganryu Island', 'Hop', 'Rumpole of the Bailey: Series 6', 'Wing Chun', 'Babylon 5: In the Beginning', 'Viruddh', 'Ah! My Goddess', 'Otogi Zoshi', 'My Beautiful Girl Mari', 'Jubei Chan 2: The Counter Attack of Siberia Yagyu', 'Kill!', 'Banner of the Stars II', 'Buffy the Vampire Slayer: Season 2', 'Island of Greed', 'Buffy the V