## Assignment 7:
Number of Stops per Citizen of all towns that are within a **R** radius of a certain position (lat, long), calculated using the De Lijn data of all stops in Belgium in the *stops.txt*, the district locations in *zipcodes.csv* and the citizen data in *citizens2.txt*. Since the citizen counts are required, we need to interpret the term *town* as the ones defined in the *flemish_districts.txt*. Therefore multiple joins need to be executed to get the desired result.

#### Input Variables
Please set the desired position in *INPUT_POSITION* using the (latitude, longitude) notation. The desired radius in kilometers can be entered into *INPUT_RADIUS*.

In [1]:
INPUT_POSITION = (51.2181962, 4.4244759)
INPUT_RADIUS = 5

#### Setup SparkContext and SQLContext
The SparkContext is required to setup other aspects of this project. SQLContext allows us to read the *stops.txt* directly into a JSON dataframe, resulting in easy readability and better access to the read data.

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName("Ex7").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

#### Setup Stop Count for Zipcodes
First we'll get all stops from the De Lijn data and then join them with the districts defined in *zipcodes.csv*.

In [3]:
from pyspark.sql.functions import explode_outer

stopsDF = sqlContext.read.json('data/stops.txt')
districts = stopsDF.select(explode_outer("haltes.omschrijvingGemeente").alias("district"))
zipcodesDF = sqlContext.read.csv("data/zipcodes.csv", sep=";").selectExpr("_c0 as zipcode", "_c1 as district", "_c2 as town_latitude", "_c3 as town_longitude")

In [4]:
zipcodesDF.show()

+-------+--------------------+-----------------+-----------------+
|zipcode|            district|    town_latitude|   town_longitude|
+-------+--------------------+-----------------+-----------------+
|   1000|             Brussel|       50.8427501|4.351549900000009|
|   1000|           Bruxelles|       50.8427501|4.351549900000009|
|   1005|Ass. R�un. Com. C...|             null|             null|
|   1005|Brusselse Hoofdst...|50.84487679999999|4.351433499999985|
|   1005|Conseil Region Br...|        50.847857|4.367408000000069|
|   1005|Ver. Verg. Gemeen...|             null|             null|
|   1006|Raad Vlaamse Geme...|             null|             null|
|   1007|Ass. Commiss. Com...|             null|             null|
|   1008|Chambre des Repr�...|50.84655679999999|4.364662199999998|
|   1008|Kamer van Volksve...|50.84655679999999|4.364662199999998|
|   1009|    Belgische Senaat|             null|             null|
|   1009|   Senat de Belgique|         50.79834|4.395649999999

Since multiple district names can be seen within the *district* column above, single names need to be extracted before the join can be executed.

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

cleanName = udf(lambda x: x.split(" ")[0], StringType())

zipcodes = zipcodesDF.withColumn("district", cleanName(zipcodesDF["district"]))

In [6]:
zipcodes.show()

+-------+-------------------+-----------------+-----------------+
|zipcode|           district|    town_latitude|   town_longitude|
+-------+-------------------+-----------------+-----------------+
|   1000|            Brussel|       50.8427501|4.351549900000009|
|   1000|          Bruxelles|       50.8427501|4.351549900000009|
|   1005|               Ass.|             null|             null|
|   1005|          Brusselse|50.84487679999999|4.351433499999985|
|   1005|            Conseil|        50.847857|4.367408000000069|
|   1005|               Ver.|             null|             null|
|   1006|               Raad|             null|             null|
|   1007|               Ass.|             null|             null|
|   1008|            Chambre|50.84655679999999|4.364662199999998|
|   1008|              Kamer|50.84655679999999|4.364662199999998|
|   1009|          Belgische|             null|             null|
|   1009|              Senat|         50.79834|4.395649999999932|
|   1010| 

The following joins are crucial for this assignment. The first one joins the districts and the districts in zipcodes. From that joined dataframe we count how many times a district is found in the dataframe, which is the number of stops for that district. Once we have calculated that new dataframe, we can join it with the previously joined dataframe, which results in the number of stops for each district and it's zipcode.

In [7]:
joined_dfs = districts.join(zipcodes, on=["district"])
grouped = joined_dfs.groupBy("district").count().orderBy("district")
stopcount_zipcodes = joined_dfs.join(grouped, on=["district"]).dropDuplicates().orderBy("district")

In [8]:
stopcount_zipcodes.show()

+----------+-------+-----------------+------------------+-----+
|  district|zipcode|    town_latitude|    town_longitude|count|
+----------+-------+-----------------+------------------+-----+
|    Aaigem|   9420|50.88915799999999|3.9372244999999566|   24|
|   Aalbeke|   8511|       50.7784023|3.2299376999999367|    8|
|     Aalst|   3800|         50.78166| 5.208979999999997|  420|
|     Aalst|   9300|        50.937793|4.0409133000000566|  420|
|    Aalter|   9880|       51.0769076| 3.432344599999965|   60|
|  Aarschot|   3200|50.98061509999999| 4.827381000000059|   96|
|   Aarsele|   8700|50.99693430000001| 3.422498899999937|   22|
| Aartrijke|   8211|       51.1191688| 3.090079400000036|   35|
|Aartselaar|   2630|       51.1338889| 4.384722199999942|   53|
|     Achel|   3930|         51.25224| 5.479530000000068|   38|
|    Adegem|   9991|       51.2045152|3.4900073999999677|   27|
| Adinkerke|   8660|       51.0751267|2.6034114000000272|    6|
|    Afsnee|   9051|       51.0305603|3.

#### Calculating the distance
The following function is designed to calculate the distance between positions, both given in (lat, long) format. The calculation is based on the radius of the earth in kilometers. This function will evaluate if the stop's location lies within the given radius.

In [9]:
from math import sin, cos, sqrt, atan2, radians

# Copyright: https://stackoverflow.com/questions/19412462/getting-distance-between-two-points-based-on-latitude-longitude
def is_within_radius(lat, long):
    R = 6373.0

    lat1 = radians(float(lat))
    lon1 = radians(float(long))
    lat2 = radians(INPUT_POSITION[0])
    lon2 = radians(INPUT_POSITION[1])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c

    if distance <= INPUT_RADIUS:
        return True
    else:
        return False

Using the function above we can evaluate which districts are located within the given radius, along with their stop count.

In [10]:
columns = ["district", "zipcode", "count"]
rows = []
for row in stopcount_zipcodes.rdd.collect():
    if is_within_radius(row[2], row[3]):
        rows.append((row[0], row[1], row[4]))

districts_in_radius = spark.createDataFrame(rows, columns).dropDuplicates(["district"])

In [11]:
districts_in_radius.show()

+----------+-------+-----+
|  district|zipcode|count|
+----------+-------+-----+
|   Berchem|   2600|  190|
|  Borsbeek|   2150|   21|
|   Merksem|   2170|   75|
|Borgerhout|   2140|   60|
| Antwerpen|   2060| 4235|
|    Deurne|   2100|  352|
+----------+-------+-----+



#### Combining the computed data with the districts
The next step consists of joining the towns in *flemish_districs.txt* on their districts with the previously found districts. This will allow us later on to find out which towns satisfy the conditions.

In [12]:
towns_districts = sc.textFile('data/flemish_districs.txt')
towns_districts_DF = towns_districts.map(lambda x: [x.split(":")[0].strip().replace('‐', '-'), [p.strip().replace('‐', '-') for p in x.split(":")[-1].split(",")]]).toDF(["town", "district"])
towns_distr_sep = towns_districts_DF.select("town", explode_outer("district").alias("district"))

Since the data isn't designed to incorporate separation between zipcodes and towns, duplicates are dropped. The data becomes a bit less accurate, but unfortunately there's no workaround for this.

In [13]:
towns_districts_count = towns_distr_sep.join(districts_in_radius, on=["district"]).dropDuplicates(["zipcode"])
towns_districts_count.show()

+----------+---------+-------+-----+
|  district|     town|zipcode|count|
+----------+---------+-------+-----+
|    Deurne|Antwerpen|   2100|  352|
|Borgerhout|Antwerpen|   2140|   60|
| Antwerpen|Antwerpen|   2060| 4235|
|   Berchem|Antwerpen|   2600|  190|
|   Merksem|Antwerpen|   2170|   75|
|  Borsbeek| Borsbeek|   2150|   21|
+----------+---------+-------+-----+



#### Total Stop counts for the *satisfying* Towns
The following code block is designed to use the dataframe above to, for each town, sum all stop counts of the districts belonging to that town.

In [14]:
town_dict = dict()

for row in towns_districts_count.rdd.collect():
    if row[1] not in town_dict:
        town_dict[row[1]] = int(row[3])
    else:
        town_dict[row[1]] += int(row[3])
        
columns = ["town", "stop_count"]
rows = []

for town in town_dict:
    rows.append((town, town_dict[town]))
    
towns = spark.createDataFrame(rows, columns)
towns.show()

+---------+----------+
|     town|stop_count|
+---------+----------+
|Antwerpen|      4912|
| Borsbeek|        21|
+---------+----------+



#### Incorporating the Citizens
As with previous notebooks, we can extract the number of citizens from *citizens2.txt*. This requires some preprocessing, for which the *extractNames* functions is created.

In [15]:
def extractNames(data):
    result = []
    # Only 1 name
    if len(data) == 1:
        # Strip possible trailing whitespaces
        name = data[0].strip()
        result.append(name)
    
    # Both Dutch & French name
    elif len(data) > 1:
        for name in data:
            # Strip possible trailing whitespaces
            name = name.strip()
            # Check whether name is a separation character
            if len(name) == 1:
                continue
            else:
                # Check whether name is encapsulated by parentheses
                if name[0] == "(":
                    name = name[1:-1]
            
            result.append(name)
            
    return result

In [16]:
citizensDF = sc.textFile('data/citizens2.txt')
citizens = citizensDF.map(lambda x: [[name for name in extractNames(x.split(" ")[:-1])], int(x.rpartition(" ")[-1].replace('.', ''))]).toDF(["towns", "citizen_count"])
citizens = citizens.select("citizen_count", explode_outer("towns").alias("town"))
citizens.show()

+-------------+--------------------+
|citizen_count|                town|
+-------------+--------------------+
|       117724|          Anderlecht|
|       177112|             Brussel|
|       177112|           Bruxelles|
|        86336|              Elsene|
|        86336|             Ixelles|
|        47410|           Etterbeek|
|        41016|               Evere|
|        24794|           Ganshoren|
|        52144|               Jette|
|        21765|          Koekelberg|
|        33725|            Oudergem|
|        33725|           Auderghem|
|       132097|          Schaarbeek|
|       132097|          Schaerbeek|
|        24831| Sint‐Agatha‐Berchem|
|        24831|Berchem‐Sainte‐Ag...|
|        49361|         Sint‐Gillis|
|        49361|        Saint‐Gilles|
|        95455| Sint‐Jans‐Molenbeek|
|        95455|Molenbeek‐Saint‐Jean|
+-------------+--------------------+
only showing top 20 rows



Now that we have the citizen counts for each town, it's a simple matter of joining our found towns and their stop counts with the citizen counts.

In [17]:
joined_dfs = towns.join(citizens, on=["town"]).orderBy("town")
joined_dfs.show()

+---------+----------+-------------+
|     town|stop_count|citizen_count|
+---------+----------+-------------+
|Antwerpen|      4912|       521680|
| Borsbeek|        21|        10677|
+---------+----------+-------------+



#### Citizen Ratio
The final step consists of simply dividing the stop count by the citizen count for each town, which results in the number of stops per town per citizen that are within the given radius of the given position

In [18]:
stops_per_town_per_citizen_in_radius = joined_dfs.withColumn("citizen_ratio", joined_dfs["stop_count"] / joined_dfs["citizen_count"])
stops_per_town_per_citizen_in_radius.show()

+---------+----------+-------------+--------------------+
|     town|stop_count|citizen_count|       citizen_ratio|
+---------+----------+-------------+--------------------+
|Antwerpen|      4912|       521680|0.009415733783162092|
| Borsbeek|        21|        10677|0.001966844619275077|
+---------+----------+-------------+--------------------+

