In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Movies")
sc = SparkContext(conf = conf) 

In [2]:
df_users = sc.textFile('./moviedata/users.csv')
df_zipcodes = sc.textFile('./moviedata/zipcodes.csv')
df_rating = sc.textFile('./moviedata/rating.csv')
df_movies = sc.textFile('./moviedata/movies.csv')

In [3]:
def mapreduce_join_util(x):
#     print x[0], x[1]
    first_d = []
    second_d = []
    for u, v in x[1]:
        if u == 1:
            first_d.append(v)
        else:
            second_d.append(v)
    return [(u, v) for u in first_d for v in second_d]
    

In [4]:
def mapreduce_join(rdd1, rdd2):
    rdd1_with_record_type = rdd1.map(lambda x: (x[0], [(1, x[1])]))
    rdd2_with_record_type = rdd2.map(lambda x: (x[0],[(2, x[1])]))
    combined_rdd = rdd1_with_record_type.union(rdd2_with_record_type).reduceByKey(lambda x,y : x+y)
    return combined_rdd.flatMap(mapreduce_join_util)

In [5]:
zipcode_user = df_users.map(lambda x: x.split(",")).map(lambda x:  (str(x[4])[1:-1],str(x[0])))

In [6]:
zipcode_user.take(5)

[('94560', '780'),
 ('48825', '781'),
 ('77081', '783'),
 ('91040', '784'),
 ('23322', '785')]

In [7]:
zipcode_state = df_zipcodes.map(lambda x: x.split(",")).map(lambda x:  (str(x[0])[1:-1], str(x[3])[1:-1]))

In [8]:
zipcode_state.take(5)

[('2574', 'MA'),
 ('1886', 'MA'),
 ('1472', 'MA'),
 ('2671', 'MA'),
 ('2672', 'MA')]

In [9]:
user_state = mapreduce_join(zipcode_user,zipcode_state)
# zipcode_user_state = zipcode_user + zipcode_state


In [10]:
user_state.take(5)

[('95', 'NY'), ('6', 'WA'), ('924', 'NY'), ('403', 'CT'), ('874', 'TN')]

In [11]:
user_movieid_rating = df_rating.map(lambda x: x.split(",")).map(lambda x: (str(x[0]), (str(x[1]),int(x[2]))))

In [12]:
user_movieid_rating.take(5)

[('253', ('97', 4)),
 ('284', ('269', 4)),
 ('106', ('526', 4)),
 ('121', ('180', 3)),
 ('62', ('86', 2))]

In [13]:
# user_movieid_rating_state = user_movieid_rating.join(user_state)
movieid_rating_state = mapreduce_join(user_movieid_rating , user_state)

In [14]:
movieid_rating_state.take(5)

[(('117', 4), 'NY'),
 (('137', 4), 'NY'),
 (('213', 4), 'NY'),
 (('302', 4), 'NY'),
 (('693', 3), 'NY')]

In [15]:
movieid_state_rating = movieid_rating_state.map(lambda x: (x[0][0], (x[1], x[0][1])))

In [16]:
movieid_state_rating.take(5)

[('117', ('NY', 4)),
 ('137', ('NY', 4)),
 ('213', ('NY', 4)),
 ('302', ('NY', 4)),
 ('693', ('NY', 3))]

In [17]:
def clean_df_movies(x):
    y = x.split('\"')
    return y[0].split(',')[0:1] + y[2].split(',')[2:]

In [18]:
df_movies_cleaned = df_movies.map(lambda x : clean_df_movies(x))

In [19]:
movie_genre=  """ unknown | Action | Adventure | Animation | Children | Comedy | Crime | Documentary | Drama | Fantasy | Film_Noir | Horror | Musical | Mystery | Romance | Sci_Fi | Thriller | War | Western """.split("|")


In [21]:
for i in range(1, 20):
    movieid_list = df_movies_cleaned.flatMap(lambda x: [x] if int(x[i]) == 1 else []).map(lambda x: (str(x[0]), i))
    state_rating = mapreduce_join(movieid_list, movieid_state_rating).map(lambda x: (x[1][0], (x[1][1], 1)))
    state_avg_rating = state_rating.reduceByKey(lambda x,y : (x[0] + y[0], x[1]+y[1])).map(lambda x : (x[0], float(x[1][0])/x[1][1]))
    print movie_genre[i-1], state_avg_rating.takeOrdered(1, key = lambda x: -x[1])

 unknown  [('MD', 1.0)]
 Action  [('ME', 4.5)]
 Adventure  [('ME', 4.5)]
 Animation  [('MS', 5.0)]
 Children  [('MS', 4.444444444444445)]
 Comedy  [('WV', 4.666666666666667)]
 Crime  [('WV', 4.833333333333333)]
 Documentary  [('SD', 5.0)]
 Drama  [('WV', 4.767441860465116)]
 Fantasy  [('AL', 5.0)]
 Film_Noir  [('HI', 5.0)]
 Horror  [('ME', 5.0)]
 Musical  [('ME', 4.375)]
 Mystery  [('WV', 4.666666666666667)]
 Romance  [('WV', 4.5)]
 Sci_Fi  [('WV', 4.476190476190476)]
 Thriller  [('WV', 4.5)]
 War  [('WV', 4.684210526315789)]
 Western  [('ME', 4.333333333333333)]
