In this weeks assignments you will join and aggregate data from a collection of user and restaurant in Mexico. The collections we use is from: https://archive.ics.uci.edu/ml/datasets/Restaurant+%26+consumer+data

We have placed three files in the folder of this tutorial: 
- `userprofile.csv`: describes several atributes of the users
- `geoplaces2.csv`:  describes several attributes of the restaurants
- `rating_final.csv`: describes how the users rated the restaurant, the food and the service on a scale [0-2]

## part 1 ##

The first assignment is to estimate whether people with a larger budget are generally more satisfied with the restaurant they choose.

Create an RDD `rating` of the restaurant rating (the first rating) in `rating_final.csv`.

In [32]:
ratings_final = sc.textFile("rating_final.csv")
firstline = ratings_final.first()
ratings_finalNoHeader = ratings_final.filter(lambda x: x != firstline)
ratingsSplit = ratings_finalNoHeader.map(lambda x: x.split(','))
rating = ratingsSplit.map(lambda x: (x[0], x[1], x[2] ) ) 
rating.take(5)

[('U1077', '135085', '2'),
 ('U1077', '135038', '2'),
 ('U1077', '132825', '2'),
 ('U1077', '135060', '1'),
 ('U1068', '135104', '1')]

Also create an RDD `userbudget` in which you load the `userID` and `budget` from `userprofiles.csv`.

In [33]:
user_profile_final = sc.textFile("userprofile.csv")
firstline = user_profile_final.first()
user_profile_final_NoHeader = user_profile_final.filter(lambda x: x != firstline)
user_profile_Split = user_profile_final_NoHeader.map(lambda x: x.split(','))
user_profile_Split.take(5)
userbudget = user_profile_Split.map(lambda x: (x[0], x[17]) ) 
userbudget.take(5)

[('U1001', 'medium'),
 ('U1002', 'low'),
 ('U1003', 'low'),
 ('U1004', 'medium'),
 ('U1005', 'medium')]

To join the `userbudget` with the `userrating`, you must convert the `ratings` to a (userID, rating) structure (we don't need the placeID for this assignment).

In [34]:
firstlineFilter = rating.first()
ratings_final_last = rating.filter(lambda x: x != firstlineFilter)
useratings = ratings_final_last.map(lambda x: (x[0], x[2]))
useratings.take(5)

[('U1077', '2'),
 ('U1077', '2'),
 ('U1077', '2'),
 ('U1077', '1'),
 ('U1068', '1')]

Join `useratings` and `userbudget`, and map the result to a `(budget, rating)` structure. Don't forget to convert rating to an int (with `int()`).

In [35]:
users_union = userbudget.leftOuterJoin(useratings) 
union_convert = users_union.map(lambda x:  (x[1][0], int(x[1][1]) ) )
union_convert.take(5)

[('low', 1), ('low', 1), ('low', 2), ('low', 1), ('low', 1)]

Group the result by budget (the key), and compute the average rating. To compute the average of a list `l` in python you can divide `sum(l)` by `len(l)`.

In [36]:
budget_group = union_convert.groupByKey().mapValues( lambda x:  ( sum(x) / len (x) ) )
budget_group.collect()

[('low', 1.1360759493670887),
 ('high', 1.4761904761904763),
 ('medium', 1.2084468664850136),
 ('?', 1.2318840579710144)]

Indeed, it seems that users with a higher budget are more satisfied with the restaurants they visit.

## Part 2 ##

The next assignment is to estimate whether users ratings are affected by the distance between where they live and where the restaurant is. 

We want to compute the distance between the user's home and the restaurant for every rating. Both positions can be looked up from the userprofiles and the places.

Create an RDD `userpos` with the userID, latitude and longitude from `userprofiles.csv`. To use the location, put latitude and longitude inside a tuple.

In [37]:
user_profile_last = sc.textFile("userprofile.csv")
firstline = user_profile_last.first()
user_profile_last_NoHeader = user_profile_last.filter(lambda x: x != firstline)
user_profile_Split = user_profile_last_NoHeader.map(lambda x: x.split(','))
userpos = user_profile_Split.map(lambda x: (x[0], (x[1], x[2])) ) 
userpos.take(5)

[('U1001', ('22.139997', '-100.978803')),
 ('U1002', ('22.150087', '-100.983325')),
 ('U1003', ('22.119847', '-100.946527')),
 ('U1004', ('18.867', '-99.183')),
 ('U1005', ('22.183477', '-100.959891'))]

Create a broadcast variable that contains a dictionary from which you can lookup a users position based on their ID.

In [25]:
broadcast_user = sc.broadcast(userpos.collectAsMap())
# broad_cast_user =  sc.parallelize(userbudget.map(lambda x: (x[0], (x[1][0], x[1][1])) ) )
broadcast_user.value

{'U1001': ('22.139997', '-100.978803'),
 'U1002': ('22.150087', '-100.983325'),
 'U1003': ('22.119847', '-100.946527'),
 'U1004': ('18.867', '-99.183'),
 'U1005': ('22.183477', '-100.959891'),
 'U1006': ('22.15', '-100.983'),
 'U1007': ('22.118464', '-100.938256'),
 'U1008': ('22.122989', '-100.923811'),
 'U1009': ('22.159427', '-100.990448'),
 'U1010': ('22.190889', '-100.998669'),
 'U1011': ('23.724972', '-99.152856'),
 'U1012': ('18.813348', '-99.243697'),
 'U1013': ('22.174624', '-100.993873'),
 'U1014': ('23.751607', '-99.170108'),
 'U1015': ('22.12676', '-100.905209'),
 'U1016': ('22.156247', '-100.977402'),
 'U1017': ('18.952615', '-99.201616'),
 'U1018': ('22.190949', '-100.917902'),
 'U1019': ('22.153385', '-100.975294'),
 'U1020': ('18.878189', '-99.222969'),
 'U1021': ('23.730569', '-99.171883'),
 'U1022': ('22.146708', '-100.964355'),
 'U1023': ('23.752943', '-99.166589'),
 'U1024': ('22.154021', '-100.976028'),
 'U1025': ('22.125603', '-100.907844'),
 'U1026': ('23.733', '

Also create an RDD `placepos` that contains the ID and position of places in `geoplaces2.csv`.

In [26]:
user_profile_last = sc.textFile("geoplaces2.csv")
firstline = user_profile_last.first()
geo_place_profile_last_NoHeader = user_profile_last.filter(lambda x: x != firstline)
geo_profile_Split = geo_place_profile_last_NoHeader.map(lambda x: x.split(','))
geo_coordinate = geo_profile_Split.map(lambda x: (x[0], (x[1], x[2])) ) 
geo_coordinate.take(5)

[('134999', ('18.915421', '-99.184871')),
 ('132825', ('22.1473922', '-100.983092')),
 ('135106', ('22.1497088', '-100.9760928')),
 ('132667', ('23.7526973', '-99.1633594')),
 ('132613', ('23.7529035', '-99.165076'))]

And create a similar broadcast variable to lookup the position of a place based in it's ID.

In [27]:
broadcast_geo = sc.broadcast(geo_coordinate.collectAsMap())
broadcast_geo.value

{'132560': ('23.7523041', '-99.1669133'),
 '132561': ('23.726819', '-99.1265059'),
 '132564': ('23.7309245', '-99.1451848'),
 '132572': ('22.1416471', '-100.9927118'),
 '132583': ('18.9222904', '-99.234332'),
 '132584': ('23.7523648', '-99.1652879'),
 '132594': ('23.7521677', '-99.165709'),
 '132608': ('23.7588052', '-99.1651297'),
 '132609': ('23.7602683', '-99.1658646'),
 '132613': ('23.7529035', '-99.165076'),
 '132626': ('23.7375834', '-99.1351318'),
 '132630': ('23.7529305', '-99.1644725'),
 '132654': ('23.7355234', '-99.1295877'),
 '132660': ('23.7529428', '-99.1646791'),
 '132663': ('23.7525107', '-99.1669536'),
 '132665': ('23.7367977', '-99.1342413'),
 '132667': ('23.7526973', '-99.1633594'),
 '132668': ('23.738212', '-99.1519547'),
 '132706': ('23.7292162', '-99.1323571'),
 '132715': ('23.7324226', '-99.1586602'),
 '132717': ('23.7318602', '-99.1504365'),
 '132723': ('22.1489337', '-101.019845'),
 '132732': ('23.7543569', '-99.171288'),
 '132733': ('23.7527071', '-99.1625655'

To compute the distance an accurate approximation is the Vincenty distance in the geopy library (use `pip install geopy` to install). 

Here is an example to compute the distance:

In [28]:
from geopy.distance import vincenty
vincenty((31.8300167,35.0662833), (31.83,35.0708167)).meters

429.1676583897619

Now, map the ratings, so that you retrieve the position of the user and the position of the restaurant, and compute the vincenti distance between them. Output the distance and rating.

In [29]:
rating_convert = rating.map(lambda x:  (x[0], (x[1] ,  x[2])  ) )
firstline = rating_convert.first()
rating_convert_last = rating_convert.filter(lambda x: x != firstline)


distance_rating = rating_convert_last.map(lambda x:  ( x[1][1],  vincenty( broad_cast_user_bc.value[x[0]],  broad_cast_geo.value[x[1][0]]).meters  ) )
distance_rating.collect()



[('2', 693.4067254748844),
 ('2', 806.8757681276265),
 ('2', 1036.3302977034452),
 ('1', 729.1540436901495),
 ('1', 80.87798147674805),
 ('0', 201.3295067351038),
 ('1', 170.46780148103144),
 ('0', 358.1705302397509),
 ('1', 427.6121939909337),
 ('2', 338.33005498454236),
 ('1', 617.6084169360958),
 ('1', 358.1705302397509),
 ('1', 427.6121939909337),
 ('0', 80.87798147674805),
 ('1', 172.50516454256353),
 ('1', 336.81587971769574),
 ('0', 359.62490007730656),
 ('1', 426.18210809942764),
 ('2', 365.1491452551982),
 ('1', 535.3810371277306),
 ('1', 80.9967707935355),
 ('1', 169.02852165238107),
 ('2', 616.1190326521429),
 ('2', 1378.912605030139),
 ('2', 1291.7963514211717),
 ('2', 1484.714522532005),
 ('2', 9547.24510845921),
 ('1', 4393.866108751058),
 ('2', 641.6592488648951),
 ('1', 785.4387415205989),
 ('1', 186.38206900922526),
 ('0', 1997.925929998826),
 ('0', 2613.95477784675),
 ('0', 851.875791772843),
 ('1', 166.03611090958225),
 ('0', 2412.2070435474625),
 ('0', 2857.31481909

Now average the distance per rating.

In [30]:
rating_avg = distance_rating.groupByKey().mapValues( lambda x:  ( sum(x) / len (x) ) )
rating_avg.collect()

[('1', 10620.704846132312),
 ('0', 32532.243775653256),
 ('2', 27352.013001884905)]

It seems that there is no linear relation between the distance to the restaurant and the given rating.