In this exercise we will determine the stops/citizen for each town whose center is within radius of a user specified point.

We will first determine an RDD with all the stops that belong to towns that are in range. Next we will determine the main town that the
stops are located in, retrieve the population and calculate for each town the stops per citizen.

Lastly we will provide the stops per citizen for each town separately, and for all towns combined.


First we initialise PySpark.


In [1]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()


We first read all the stops. The specified file is a preprocessed version of the JSON "stops.txt", in which 
each line contains one stop in the format 

halte_id;halte_name;lat;long;town_name

This makes it easier to parse since it only requires a call to str.split(). The result is an RDD with tuples
(halte_id, halte_name, lat, long, town_name).


In [2]:
stops = sc.textFile("./converted_stops.csv").map(lambda stop: tuple([x.strip() for x in stop.split(";")]))


Next, we read all the coordinates of the towns. This CSV file is a preprocessed version of "zipcodes.csv" which
filters out useless data. This creates an RDD with tuples (district-name, (lat, long))


In [3]:
town_coords = sc.textFile("./coord_map.csv").map(lambda towncoord: tuple([x.strip() for x in towncoord.split(";")])).map(lambda x: (x[0], (x[1], x[2])))


We will now join the stops with the coords of their respective towns. We first apply a keyBy operation that creates
(town, stop) tuples to make the join possible. The result is an RDD with tuples (town, (stop, town-coord)).

Note: the town names in the CSV file contain some errors, therefore 270 towns will be lost.


In [4]:
stops_with_town_coords = stops.keyBy(lambda stop: stop[4]).join(town_coords)


We will now add the user-specified lat, long and radius. This will result in tuples of the form
(stop, stop_town_coord, input_coord, input_radius).


In [5]:
lat = 50.9797558
long = 5.7018474
radius = 50000.0

stops_with_input = stops_with_town_coords.map(lambda x: (x[1][0], x[1][1], (lat, long), radius))


The next step is to determine the distance between the towns and the user specified point. This will map all
tuples to (stop, distance, input_radius). In order to filter out the stops that are out of range, we use "filter()"
with the condition that distance <= radius.

Note: since the earth is a sphere euclidian distances are not 
accurate enough, I have used an online implementation of the haversine method.
http://evoling.net/code/haversine/
https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points/4913653#4913653
There are many sources, I don't know which one is the original.


In [6]:
def haversine(coord1, coord2):
    from math import radians, cos, sin, asin, sqrt
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    
    lat1, lon1 = coord1
    lat2, lon2 = coord2
    
    lat1 = float(lat1)
    lon1 = float(lon1)
    lat2 = float(lat2)
    lon2 = float(lon2)
    
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2.0)**2.0 + cos(lat1) * cos(lat2) * sin(dlon/2.0)**2.0
    c = 2.0 * asin(sqrt(a)) 
    # Radius of earth in kilometers is 6371
    km = 6371.0 * c
    m = km * 1000.0
    return m

stops_with_town_distances = stops_with_input.map(lambda x: (x[0], haversine(x[1], x[2]), x[3]))
stops_within_radius = stops_with_town_distances.filter(lambda x: x[1] <= x[2])


The next step is to determine what main town the stops are in. We will load "townname_map.csv". This is a 
preprocessed version of "flemish_districts.txt" that maps district names onto town names. Joining with this RDD creates tuples
(district_name, (stop, main-town)). In preparation for the next step we will clean up the RDD and map it 
into (main-town, stop). In order to make the join possible the stops RDD will be mapped to tuples (district, stop).

Note that "flemish_districts.txt" was not 100% correct and as a result a maximum of 274 towns will be lost. 


In [7]:
town_name_map = sc.textFile("./townname_map.csv").map(lambda mapping: tuple([x.strip() for x in mapping.split(";")]))
stops_by_district = stops_within_radius.map(lambda x: (x[0][4], x[0]))
stops_by_town = stops_by_district.join(town_name_map).map(lambda x: (x[1][1], x[1][0]))


Now that we've mapped the main town names to the stops, we join with the town population map "town_pop_map.csv", which 
is a preprocessed version of citizens2.txt where the space-delimiter is replaced by a ";" delimiter for easier parsing.
This file is read and mapped to (town-name, pop).

Note: "citizens2.txt" and "flemish_districts.txt" are not 100% compatible so 68 towns are lost during the join.
This is however better that the difference between stops.txt and citizens2.txt in which 1114 towns would be lost.

This joining process will produce tuples of the form (town-name, (stop, town_pop))


In [8]:
town_pop_map = sc.textFile("./town_pop_map.csv").map(lambda mapping: tuple([x.strip() for x in mapping.split(";")]))

town_stop_pop = stops_by_town.join(town_pop_map)


Since we want to know the stops per citizen for each town, we have to count the stops. For this we map the
(town-name, (stop, town-pop)) tuples to (town, (1, town-pop)) tuples and we use the following addition:
(town-name, (x, town-pop)) + (town-name, (y, town-pop)) = (town, (x + y, town-pop)). This results in (town, (stop-count, pop-count)).
A simple division will then result in the stop-citizen ratio.

"town_stop_per_citizen" will contain the stops per citizen for each town separately, this is (town, stops-per-citizen).
"stop_per_citizen_total" will contain the stops per citizens for all towns combined, this is a single float.


In [9]:
town_stopcount_pop = town_stop_pop.map(lambda x: (x[0], (1, int(x[1][1])))).reduceByKey(lambda x, y: (x[0] + y[0], x[1]))
town_stop_per_citizen = town_stopcount_pop.mapValues(lambda x: float(x[0]) / float(x[1])).collect()
stop_count, citizen_count = town_stopcount_pop.map(lambda x: (x[1][0], x[1][1])).reduce(lambda x,y: (x[0] + y[0], x[1] + y[1]))
stop_per_citizen_total = float(stop_count) / float(citizen_count)


Finally we print the output.


In [10]:
print("Overall stops per citizen: {}".format(stop_per_citizen_total))
print("Town - stops per citizen")
print("------------------------\n")

for town in town_stop_per_citizen:
  print("{} - {}".format(town[0], town[1]))

Overall stops per citizen: 0.00586341288271
Town - stops per citizen
------------------------

Opglabbeek - 0.00387259173202
Nieuwerkerken - 0.00589334483254
Bocholt - 0.00718599495451
Lummen - 0.00522317188984
Kortessem - 0.012083876318
Beringen - 0.00382384252721
Lommel - 0.00193980719492
Landen - 0.00119099855826
Peer - 0.00713371135906
Kinrooi - 0.00703937136777
Zonhoven - 0.00593947393231
Balen - 0.00691192865106
Gingelom - 0.011927480916
Bree - 0.0091875
As - 0.0041514041514
Bilzen - 0.00727576705161
Bekkevoort - 0.00446642207689
Heers - 0.0148249828415
Diepenbeek - 0.00831024930748
Maasmechelen - 0.00443069501612
Tessenderlo - 0.00497109201924
Geetbets - 0.0177387267905
Halen - 0.0106810490694
Overpelt - 0.00853374709077
Hoeselt - 0.00908809253331
Linter - 0.0033089755963
Ham - 0.0030518819939
Genk - 0.00319517846056
Dessel - 0.00387921996226
Diest - 0.00860946621309
Kortenaken - 0.0127048659637
Riemst - 0.00930707337577
Alken - 0.0102040816327
Herstappe - 0.0681818181818
Zutend