## Import Libraries

In [1]:
from pyspark import SparkConf, SparkContext 

## Set up SparkContext configuration and start the Spark Context Interface

In [2]:
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
print("finished")

finished


## Constants

In [3]:
givenMovieId = '31'
givenUserId = '1'
avgRating = 3.0

## Create initial RDDs for Ratings and Movies

In [23]:
ratingsInput = sc.textFile("dataset/ratings.csv")
print("RDD for ratings created")

moviesInput = sc.textFile("dataset/movies.csv")
print("RDD for movies created")

RDD for ratings created
RDD for movies created


## Find all the users who have also liked the same movie based on the ratingsRDD


In [47]:
ratingsRDD = ratingsInput.map(lambda line: line.split(','))

In [48]:
ratingsRDD.take(2)

[['userId', 'movieId', 'rating', 'timestamp'],
 ['1', '31', '2.5', '1260759144']]

In [49]:
ratings_header = ratingsRDD.first()

## Filter out the header

In [50]:
ratingsRDD_no_header = ratingsRDD.filter(lambda line: line != ratings_header)

In [51]:
ratingsRDD_no_header.take(2)

[['1', '31', '2.5', '1260759144'], ['1', '1029', '3.0', '1260759179']]

## Filter out the specific movies and ratings that like the movie

In [52]:
ratingsRDD_users_who_also_liked = ratingsRDD_no_header.filter(lambda line: line[1] == givenMovieId and float(line[2]) > avgRating )

In [57]:
ratingsRDD_users_who_also_liked.collect()

[['31', '31', '4.0', '1273541953'],
 ['32', '31', '4.0', '834828440'],
 ['73', '31', '3.5', '1255591860'],
 ['110', '31', '4.0', '840100695'],
 ['111', '31', '3.5', '1097429230'],
 ['165', '31', '3.5', '1111981801'],
 ['242', '31', '4.0', '956686556'],
 ['325', '31', '4.5', '1356316520'],
 ['341', '31', '4.5', '1240055463'],
 ['485', '31', '4.0', '1337748425'],
 ['487', '31', '5.0', '832836558'],
 ['496', '31', '4.0', '834060152'],
 ['511', '31', '4.0', '829352433'],
 ['516', '31', '4.0', '844687388'],
 ['525', '31', '4.0', '1024928582'],
 ['607', '31', '3.5', '1118248028'],
 ['641', '31', '4.0', '841551303']]

## Extract just these users 

In [61]:
mapped_usersRDD = ratingsRDD_users_who_also_liked.map(lambda line: line[0])

In [64]:
mapped_usersRDD.collect()

['31',
 '32',
 '73',
 '110',
 '111',
 '165',
 '242',
 '325',
 '341',
 '485',
 '487',
 '496',
 '511',
 '516',
 '525',
 '607',
 '641']

## Grab all the reviews that these users have given

In [70]:
mapped_users_list = mapped_usersRDD.collect()

In [72]:
ratingsRDD_no_header.filter(lambda x: x[0] in mapped_users_list).collect()

[['31', '31', '4.0', '1273541953'],
 ['31', '32', '4.5', '1273720546'],
 ['31', '50', '3.5', '1273633559'],
 ['31', '111', '4.5', '1273714533'],
 ['31', '260', '4.0', '1273720416'],
 ['31', '296', '4.5', '1273714417'],
 ['31', '318', '4.0', '1273542992'],
 ['31', '372', '3.5', '1273541973'],
 ['31', '379', '4.0', '1273542073'],
 ['31', '527', '5.0', '1273543009'],
 ['31', '541', '4.0', '1273720299'],
 ['31', '778', '3.5', '1273720368'],
 ['31', '858', '4.5', '1273720163'],
 ['31', '904', '4.0', '1273714421'],
 ['31', '1089', '4.5', '1273714480'],
 ['31', '1091', '3.0', '1273542081'],
 ['31', '1186', '2.0', '1273542019'],
 ['31', '1197', '4.5', '1273542870'],
 ['31', '1206', '4.5', '1273720435'],
 ['31', '1208', '4.5', '1273542817'],
 ['31', '1221', '4.5', '1273714380'],
 ['31', '1234', '4.5', '1273714490'],
 ['31', '1299', '4.0', '1273542012'],
 ['31', '1378', '4.0', '1273542028'],
 ['31', '1416', '3.0', '1273542066'],
 ['31', '1957', '4.5', '1273542033'],
 ['31', '1967', '4.5', '12735