In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

from pyspark.sql.types import StringType

sqlContext = SQLContext(sc)
conf = SparkConf().setAppName("My App").setMaster("local[*]")
sc.stop()
sc = SparkContext(conf = conf)


## Read both the csv files

In [2]:
movies = sc.textFile('/Users/jitu/Documents/Fall/Big Data/Assignments/ml-latest-small/movies.csv')
ratings = sc.textFile('/Users/jitu/Documents/Fall/Big Data/Assignments/ml-latest-small/ratings.csv')

In [3]:
##### ------------- user input ----------------
userID = '1'
movieID = '31'
avgrating = '3.0'

## split input using , as a delimiter and exclude columns like genre and timestamp as I am not using those for recommendation

In [4]:
movie_rdd = movies.map(lambda line: line.split(',')[:-1])       ## Excluding Genre column
ratings_rdd = ratings.map(lambda line: line.split(',')[:-1])     ## Excluding Timestamp column
movie_rdd.collect()


[['movieId', 'title'],
 ['1', 'Toy Story (1995)'],
 ['2', 'Jumanji (1995)'],
 ['3', 'Grumpier Old Men (1995)'],
 ['4', 'Waiting to Exhale (1995)'],
 ['5', 'Father of the Bride Part II (1995)'],
 ['6', 'Heat (1995)'],
 ['7', 'Sabrina (1995)'],
 ['8', 'Tom and Huck (1995)'],
 ['9', 'Sudden Death (1995)'],
 ['10', 'GoldenEye (1995)'],
 ['11', '"American President', ' The (1995)"'],
 ['12', 'Dracula: Dead and Loving It (1995)'],
 ['13', 'Balto (1995)'],
 ['14', 'Nixon (1995)'],
 ['15', 'Cutthroat Island (1995)'],
 ['16', 'Casino (1995)'],
 ['17', 'Sense and Sensibility (1995)'],
 ['18', 'Four Rooms (1995)'],
 ['19', 'Ace Ventura: When Nature Calls (1995)'],
 ['20', 'Money Train (1995)'],
 ['21', 'Get Shorty (1995)'],
 ['22', 'Copycat (1995)'],
 ['23', 'Assassins (1995)'],
 ['24', 'Powder (1995)'],
 ['25', 'Leaving Las Vegas (1995)'],
 ['26', 'Othello (1995)'],
 ['27', 'Now and Then (1995)'],
 ['28', 'Persuasion (1995)'],
 ['29',
  '"City of Lost Children',
  ' The (Cité des enfants perdus'

## Filtering out header from movie RDD and selecting movie ID and movie name

In [5]:
movi = movie_rdd.filter(lambda x: x[0]!='movieId')
#all_users = users.filter(lambda x: x[0:-2])
all_movi = movi.map(lambda x: (x[0],x[1]))
all_movi.collect()

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)'),
 ('11', '"American President'),
 ('12', 'Dracula: Dead and Loving It (1995)'),
 ('13', 'Balto (1995)'),
 ('14', 'Nixon (1995)'),
 ('15', 'Cutthroat Island (1995)'),
 ('16', 'Casino (1995)'),
 ('17', 'Sense and Sensibility (1995)'),
 ('18', 'Four Rooms (1995)'),
 ('19', 'Ace Ventura: When Nature Calls (1995)'),
 ('20', 'Money Train (1995)'),
 ('21', 'Get Shorty (1995)'),
 ('22', 'Copycat (1995)'),
 ('23', 'Assassins (1995)'),
 ('24', 'Powder (1995)'),
 ('25', 'Leaving Las Vegas (1995)'),
 ('26', 'Othello (1995)'),
 ('27', 'Now and Then (1995)'),
 ('28', 'Persuasion (1995)'),
 ('29', '"City of Lost Children'),
 ('30', 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)'),
 ('31', 

## filtering out users using condition that movie ID should be equal to given movie ID and user ID should not be given user ID because we dont want those movies which user has already seen

In [6]:
users = ratings_rdd.filter(lambda x: x[1]==movieID and x[0]!=userID)
#all_users = users.filter(lambda x: x[0:-2])
all_users = users.map(lambda x: (x[0],1))
all_users.collect()

[('7', 1),
 ('31', 1),
 ('32', 1),
 ('36', 1),
 ('39', 1),
 ('73', 1),
 ('88', 1),
 ('96', 1),
 ('110', 1),
 ('111', 1),
 ('150', 1),
 ('161', 1),
 ('165', 1),
 ('186', 1),
 ('242', 1),
 ('254', 1),
 ('288', 1),
 ('310', 1),
 ('311', 1),
 ('325', 1),
 ('338', 1),
 ('341', 1),
 ('386', 1),
 ('394', 1),
 ('452', 1),
 ('457', 1),
 ('461', 1),
 ('468', 1),
 ('485', 1),
 ('487', 1),
 ('496', 1),
 ('511', 1),
 ('516', 1),
 ('525', 1),
 ('590', 1),
 ('596', 1),
 ('601', 1),
 ('607', 1),
 ('619', 1),
 ('639', 1),
 ('641', 1)]

## Here I have filtered out all movies which user has seen by filtering user ID from from RDD and selected only two columns user ID and movie ID

In [7]:
users1 = ratings_rdd.filter(lambda x: x[0]!=userID)
#all_users1 = users.filter(lambda x: x[0:-2])
all_users1 = users1.map(lambda x: (x[0],x[1]))
all_users1.collect()

[('userId', 'movieId'),
 ('2', '10'),
 ('2', '17'),
 ('2', '39'),
 ('2', '47'),
 ('2', '50'),
 ('2', '52'),
 ('2', '62'),
 ('2', '110'),
 ('2', '144'),
 ('2', '150'),
 ('2', '153'),
 ('2', '161'),
 ('2', '165'),
 ('2', '168'),
 ('2', '185'),
 ('2', '186'),
 ('2', '208'),
 ('2', '222'),
 ('2', '223'),
 ('2', '225'),
 ('2', '235'),
 ('2', '248'),
 ('2', '253'),
 ('2', '261'),
 ('2', '265'),
 ('2', '266'),
 ('2', '272'),
 ('2', '273'),
 ('2', '292'),
 ('2', '296'),
 ('2', '300'),
 ('2', '314'),
 ('2', '317'),
 ('2', '319'),
 ('2', '339'),
 ('2', '349'),
 ('2', '350'),
 ('2', '356'),
 ('2', '357'),
 ('2', '364'),
 ('2', '367'),
 ('2', '370'),
 ('2', '371'),
 ('2', '372'),
 ('2', '377'),
 ('2', '382'),
 ('2', '405'),
 ('2', '410'),
 ('2', '454'),
 ('2', '457'),
 ('2', '468'),
 ('2', '474'),
 ('2', '480'),
 ('2', '485'),
 ('2', '497'),
 ('2', '500'),
 ('2', '508'),
 ('2', '509'),
 ('2', '515'),
 ('2', '527'),
 ('2', '537'),
 ('2', '539'),
 ('2', '550'),
 ('2', '551'),
 ('2', '552'),
 ('2', '

## joining all users who have seen given movie with RDD with all users and movies list
## and selected only movie column and made a tuple (x,1) and then reduced it by key to get a count of users who have seen that particular movie. Finally we have a list of movies and users count for that movie

In [8]:
movies_list = all_users.join(all_users1).map(lambda x: (x[1][1],1)).reduceByKey(lambda amt1,amt2 : amt1+amt2)
movies_list.collect()

[('141', 13),
 ('150', 24),
 ('153', 21),
 ('160', 15),
 ('185', 24),
 ('256', 15),
 ('344', 24),
 ('345', 7),
 ('356', 27),
 ('435', 7),
 ('508', 19),
 ('520', 13),
 ('552', 10),
 ('553', 13),
 ('586', 18),
 ('588', 25),
 ('596', 8),
 ('608', 18),
 ('648', 22),
 ('733', 17),
 ('736', 19),
 ('778', 11),
 ('1136', 9),
 ('1196', 15),
 ('1197', 12),
 ('1282', 6),
 ('1380', 7),
 ('1517', 12),
 ('1641', 4),
 ('1704', 13),
 ('1777', 10),
 ('1909', 6),
 ('2268', 7),
 ('2321', 10),
 ('2355', 10),
 ('2640', 7),
 ('2692', 10),
 ('2762', 15),
 ('2797', 13),
 ('2959', 14),
 ('2962', 1),
 ('3081', 6),
 ('3160', 8),
 ('3408', 9),
 ('3578', 14),
 ('3793', 12),
 ('3897', 10),
 ('4022', 11),
 ('4034', 8),
 ('4993', 12),
 ('5218', 6),
 ('5378', 10),
 ('5952', 12),
 ('8368', 9),
 ('8636', 8),
 ('35836', 4),
 ('36517', 2),
 ('46976', 5),
 ('55269', 3),
 ('65261', 1),
 ('68954', 3),
 ('70293', 1),
 ('10', 20),
 ('16', 14),
 ('179', 2),
 ('207', 10),
 ('237', 10),
 ('248', 4),
 ('249', 5),
 ('252', 12),
 ('

# Calculate average rating of a movie

## first filtered out given user and header from RDD and then selected only movie ID and rating

In [9]:
ratings = ratings_rdd.filter(lambda x: (x[0]!=userID and x[0]!='userId'))
#all_users1 = users.filter(lambda x: x[0:-2])
all_ratings = ratings.map(lambda x: (x[1],x[2]))
all_ratings.collect()

[('10', '4.0'),
 ('17', '5.0'),
 ('39', '5.0'),
 ('47', '4.0'),
 ('50', '4.0'),
 ('52', '3.0'),
 ('62', '3.0'),
 ('110', '4.0'),
 ('144', '3.0'),
 ('150', '5.0'),
 ('153', '4.0'),
 ('161', '3.0'),
 ('165', '3.0'),
 ('168', '3.0'),
 ('185', '3.0'),
 ('186', '3.0'),
 ('208', '3.0'),
 ('222', '5.0'),
 ('223', '1.0'),
 ('225', '3.0'),
 ('235', '3.0'),
 ('248', '3.0'),
 ('253', '4.0'),
 ('261', '4.0'),
 ('265', '5.0'),
 ('266', '5.0'),
 ('272', '3.0'),
 ('273', '4.0'),
 ('292', '3.0'),
 ('296', '4.0'),
 ('300', '3.0'),
 ('314', '4.0'),
 ('317', '2.0'),
 ('319', '1.0'),
 ('339', '3.0'),
 ('349', '4.0'),
 ('350', '4.0'),
 ('356', '3.0'),
 ('357', '3.0'),
 ('364', '3.0'),
 ('367', '3.0'),
 ('370', '2.0'),
 ('371', '3.0'),
 ('372', '3.0'),
 ('377', '3.0'),
 ('382', '3.0'),
 ('405', '2.0'),
 ('410', '3.0'),
 ('454', '4.0'),
 ('457', '3.0'),
 ('468', '4.0'),
 ('474', '2.0'),
 ('480', '4.0'),
 ('485', '3.0'),
 ('497', '3.0'),
 ('500', '4.0'),
 ('508', '4.0'),
 ('509', '4.0'),
 ('515', '4.0'),
 ('5

## Now, join movie list with rating RDD above 
## and selected only movie ID and reduced it by key to get count (This count will be used to get average rating)

In [10]:
count = movies_list.join(all_ratings).map(lambda x: (x[0],1)).reduceByKey(lambda amt1,amt2 : amt1+amt2)
count.collect()

[('160', 63),
 ('256', 42),
 ('588', 215),
 ('648', 168),
 ('778', 124),
 ('1136', 145),
 ('1380', 90),
 ('2692', 80),
 ('2797', 107),
 ('3160', 57),
 ('5218', 63),
 ('8368', 84),
 ('65261', 14),
 ('16', 88),
 ('207', 28),
 ('315', 44),
 ('445', 9),
 ('762', 39),
 ('839', 11),
 ('920', 68),
 ('1089', 132),
 ('1220', 94),
 ('1617', 125),
 ('1648', 9),
 ('2014', 15),
 ('3060', 21),
 ('3210', 52),
 ('3791', 24),
 ('4447', 47),
 ('4545', 14),
 ('4734', 31),
 ('4823', 19),
 ('6480', 4),
 ('6638', 4),
 ('8366', 15),
 ('44', 38),
 ('70', 48),
 ('1373', 25),
 ('1405', 45),
 ('1918', 42),
 ('2012', 94),
 ('2338', 13),
 ('2701', 47),
 ('2707', 36),
 ('4020', 11),
 ('4223', 37),
 ('4718', 39),
 ('4951', 7),
 ('6264', 3),
 ('6383', 12),
 ('6944', 11),
 ('8528', 37),
 ('24', 34),
 ('74', 19),
 ('111', 118),
 ('113', 5),
 ('119', 1),
 ('230', 25),
 ('278', 5),
 ('348', 40),
 ('429', 9),
 ('492', 21),
 ('581', 17),
 ('915', 23),
 ('933', 29),
 ('1054', 2),
 ('1204', 51),
 ('1235', 44),
 ('1244', 54),

## again join those two rdds and get total count of ratings

In [11]:
avg_ratings = movies_list.join(all_ratings).map(lambda x: (x[0],x[1][1])).reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))
avg_ratings.collect()

[('160', 147.5),
 ('256', 113.5),
 ('588', 790.0),
 ('648', 593.5),
 ('778', 513.5),
 ('1136', 612.5),
 ('1380', 313.0),
 ('2692', 336.0),
 ('2797', 410.0),
 ('3160', 210.5),
 ('5218', 214.0),
 ('8368', 316.0),
 ('65261', 58.5),
 ('16', 347.5),
 ('207', 94.5),
 ('315', 128.0),
 ('445', 28.0),
 ('762', 96.0),
 ('839', 36.0),
 ('920', 275.0),
 ('1089', 549.5),
 ('1220', 366.0),
 ('1617', 515.0),
 ('1648', 28.5),
 ('2014', 47.5),
 ('3060', 82.5),
 ('3210', 188.0),
 ('3791', 85.0),
 ('4447', 140.5),
 ('4545', 43.0),
 ('4734', 100.5),
 ('4823', 61.0),
 ('6480', 14.5),
 ('6638', 12.5),
 ('8366', 50.0),
 ('44', 102.5),
 ('70', 145.0),
 ('1373', 69.5),
 ('1405', 138.5),
 ('1918', 128.5),
 ('2012', 307.0),
 ('2338', 32.5),
 ('2701', 95.5),
 ('2707', 126.5),
 ('4020', 34.5),
 ('4223', 136.5),
 ('4718', 112.0),
 ('4951', 18.5),
 ('6264', 7.5),
 ('6383', 23.0),
 ('6944', 34.5),
 ('8528', 123.5),
 ('24', 103.5),
 ('74', 57.5),
 ('111', 498.5),
 ('113', 16.0),
 ('119', '3.0'),
 ('230', 92.5),
 ('278

## And join both RDDs with total ratings and total count to get average rating for that movie and sort it on based of ratings

In [12]:
avg_by_key = avg_ratings.join(count).map(lambda x: (x[0],(float(x[1][0])/float(x[1][1]))))
avg_by_key = avg_by_key.sortBy(lambda x: x[1], False)
avg_by_key.collect()

[('67504', 5.0),
 ('93320', 5.0),
 ('83359', 5.0),
 ('5765', 5.0),
 ('32525', 5.0),
 ('83411', 5.0),
 ('5244', 5.0),
 ('83318', 5.0),
 ('3437', 5.0),
 ('9010', 5.0),
 ('6273', 4.833333333333333),
 ('6375', 4.75),
 ('5238', 4.75),
 ('50641', 4.75),
 ('116', 4.75),
 ('31435', 4.75),
 ('178', 4.7),
 ('7116', 4.7),
 ('2330', 4.666666666666667),
 ('3739', 4.666666666666667),
 ('2563', 4.666666666666667),
 ('1939', 4.636363636363637),
 ('7566', 4.625),
 ('5114', 4.6),
 ('2924', 4.583333333333333),
 ('3469', 4.541666666666667),
 ('52668', 4.5),
 ('93855', 4.5),
 ('6883', 4.5),
 ('7074', 4.5),
 ('184', 4.5),
 ('27646', 4.5),
 ('89085', 4.5),
 ('95475', 4.5),
 ('52767', 4.5),
 ('2267', 4.5),
 ('5121', 4.5),
 ('8609', 4.5),
 ('2305', 4.5),
 ('4405', 4.5),
 ('26712', 4.5),
 ('31547', 4.5),
 ('7787', 4.5),
 ('162376', 4.5),
 ('5059', 4.5),
 ('7075', 4.5),
 ('51277', 4.5),
 ('5475', 4.5),
 ('8797', 4.5),
 ('95499', 4.5),
 ('102252', 4.5),
 ('8751', 4.5),
 ('50740', 4.5),
 ('50742', 4.5),
 ('6077', 

## Finally, join above RDD with movie name RDD to get the movie name which are recommended for users and select TOP 5

In [13]:
final_recom = avg_by_key.join(all_movi).map(lambda x: (x[1][0],x[1][1]))
final_recom = final_recom.sortBy(lambda x: x[0], False)
#final_recom.map(lambda x: (x[1]))
final_recom.map
final_recom.top(5)

[(5.0, 'Trailer Park Boys (1999)'),
 (5.0, 'The Earrings of Madame de... (1953)'),
 (5.0, 'Shogun Assassin (1980)'),
 (5.0, "Love Me If You Dare (Jeux d'enfants) (2003)"),
 (5.0,
  'Land of Silence and Darkness (Land des Schweigens und der Dunkelheit) (1971)')]