Datasets:
*	ref_table_precinct_locations_PSGC.csv – lookup table for precincts
*	results_president.csv – precinct-level election results for the 2016 presidential race
*	results_vice-president.csv – precinct-level election results for the 2016 vice presidential race

Tasks:
1.	Create a denormalized table replacing precinct_code in the results_*.csv files with the columns: region, province, municipality, and barangay. 

2.	Create an interesting data visualization using this dataset.

Tools:
-	Code should be written in a Jupyter Notebook
-	For data transformation you can use Pandas
-	For visualization use matplotlib or seaborn
-	Put your code in a Github repository

Extra:
-	Bonus points if you use PySpark for data transformation
-	Bonus points if you use Tableau for visualization
 
Good luck and enjoy. 😊


# Denormalize Data Using PySpark

In [382]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import udf, first
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

In [2]:
sc = SparkContext()
spark = SparkSession(sc)

Load data into spark dataframes.

In [484]:
president = spark.read.csv('results_president.csv', header=True)
vicepres = spark.read.csv('results_vice-president.csv', header=True)
mappings = spark.read.csv('ref_table_precinct_locations.csv', header=True)
psgc = spark.read.csv('ref_table_precinct_locations_PSGC.csv', header=True)

We'll need to clean the formats of the `income_class` column as there are improperly transformed string types.

In [485]:
income_class_map = {
    '1st':'1st Class',
    '2nd':'2nd Class',
    '3rd':'3rd Class', 
    '4th':'4th Class',
    '5th':'5th Class',
    '6th':'6th Class',
    '-':'Unknown',
    'None':'None'
}

We define a User Defined Function, `udf`, to clean the bytes and convert to unicode.

In [486]:
bytes_cleaner = udf(lambda x: str(x).replace('\xa0', u''))

We'll want to extract the data from `mappings` and merge this with `psgc` table first to form all of the location information in one table.

In [487]:
mapper = (mappings.join(psgc.select('precinct_code',
                           'is_city',
                           'income_class', 
                           'population',
                           'land_area', 
                           'municipality_CM'),
              on=mappings.precinct_code==psgc.precinct_code,
              how='left')
              .drop(psgc.precinct_code)
)

We now merge this to the `president` and `vicepres` tables, and denormalize the tables by:
* grouping by `precinct_code`
* pivoting the `candidate_name` column so we get columns of candidates
* aggregating the `votes`, in this case, we use `max`, but it doesn't really matter because there is only one value for `votes` per candidate per precinct.
* we also create a new table with `timestamp` as the values and merge this to the resulting table as well so we can retain the `timestamp` of each precinct.
<br>
We then join these tables with the `mapper` table we created previously.

In [549]:
votes_president = (president.select('precinct_code',
                  'candidate_name', 
                  president.votes.astype('int'))
        .groupby('precinct_code')
        .pivot('candidate_name')
        .max('votes')
        .join(mapper, 
              on=president.precinct_code == mapper.precinct_code, 
              how='left')
        .drop(mapper.precinct_code)
        .withColumn('voter_turnout', mapper['ballots_cast'] / mapper['registered_voters'])
        .withColumn('income_class', bytes_cleaner(mapper.income_class))
        .replace(to_replace=income_class_map, subset=['income_class'])
        .join((president.select('precinct_code', 
                                'timestamp')
                    .groupby('precinct_code')
                    .agg(first('timestamp').alias('timestamp'))), on='precinct_code', how='left')
        .drop(president.precinct_code)
)

In [550]:
votes_vice = (vicepres.select('precinct_code',
                  'candidate_name', 
                  vicepres.votes.astype('int'))
        .groupby('precinct_code')
        .pivot('candidate_name')
        .max('votes')
        .join(mapper, 
              on=vicepres.precinct_code == mapper.precinct_code, 
              how='left')
        .drop(mapper.precinct_code)
        .withColumn('voter_turnout', mapper['ballots_cast'] / mapper['registered_voters'])
        .withColumn('income_class', bytes_cleaner(mapper.income_class))
        .replace(to_replace=income_class_map, subset=['income_class'])
        .join((vicepres.select('precinct_code', 
                               'timestamp')
                    .groupby('precinct_code')
                    .agg(first('timestamp').alias('timestamp'))), on='precinct_code', how='left')
        .drop(vicepres.precinct_code)
)

# Get Geospatial Matches
We also want to upload the shape file for GADM into Tableau. But to do this, we'll need to ensure that the fields match correctly for us to be able to join them together inside Tableau itself.
<br>
We can get the `NAME_1` and `NAME_2` fields from gadm and make sure that those match `province` and `municipality` respectively. We will be doing this in spark primarily, using UDFs.

In [551]:
from fuzzywuzzy import fuzz
from bs4 import BeautifulSoup
import requests
from time import sleep
import geopandas as gpd

In [552]:
ph3 = gpd.GeoDataFrame.from_file('gadm36_PHL_3.shp')

In [553]:
gadm = pd.DataFrame()
gadm['NAME_1'] = ph3['NAME_1']
gadm['NAME_2'] = ph3['NAME_2']
gadm = gadm.drop_duplicates()

In [554]:
# manual corrections for converting some items
province_mapper = {
    'DAVAO (DAVAO DEL NORTE)':'Davao del Norte',
    'NATIONAL CAPITAL REGION - MANILA':'Metropolitan Manila',
    'COTABATO (NORTH COT.)':'North Cotabato',
    'NATIONAL CAPITAL REGION - SECOND DISTRICT':'Metropolitan Manila', 
    'NATIONAL CAPITAL REGION - THIRD DISTRICT':'Metropolitan Manila',
    'NATIONAL CAPITAL REGION - FOURTH DISTRICT':'Metropolitan Manila', 
    'TAGUIG - PATEROS':'Metropolitan Manila',
    'DAVAO OCCIDENTAL':'Davao del Sur',
    'SAMAR (WESTERN SAMAR)':'Samar',
    'EUROPE':'Overseas',
    'MIDDLE EAST AND AFRICAS':'Overseas',
    'NORTH AND LATIN AMERICA':'Overseas',
    'ASIA':'Overseas'
}

We want to map the naming convention of GADM to new columns in `votes_president` and `votes_vice` spark dataframes. To facilitate this, we'll create a dictionary that can try to make the values if they are exact matches.

In [494]:
name1_dict = dict(zip([i.lower() for i in gadm['NAME_1']], gadm['NAME_1']))

In [495]:
name2_dict = {}
name2_dict['Overseas'] = {'Overseas':'Overseas'}
grouped = gadm.groupby('NAME_1')
for name in gadm['NAME_1'].unique():
    name2_dict[name] = dict(zip(grouped.get_group(name)['NAME_2'].apply(lambda x: str(x).lower()).to_list(),
                                grouped.get_group(name)['NAME_2']))

The next functions are `udf`'s that first look if there is an exact match between our values and the gadm values. If there is no, it looks at our `province_mapper` dictionary if the value needs to be manually converted, and finally, if it is not there, we use fuzzy matching by implementing the `fuzzywuzzy` library to get the highest match ratio score in order to match the closest possible match that we can get.

In [555]:
province_NAME = udf(lambda x: name1_dict[str(x).lower()] if str(x).lower() in name1_dict.keys() else \
                (sorted([(name1_dict[i], fuzz.ratio(str(x).lower(), i.lower())) \
                    for i in name1_dict.keys()], key=lambda k:-k[1])[0][0]\
                    if str(x) not in province_mapper.keys() else province_mapper[str(x)]))

In [556]:
municipality_NAME = udf(lambda NAME_1, x: name2_dict[NAME_1][str(x).lower()] \
                    if str(x).lower() in name2_dict[NAME_1].keys() else sorted(\
                    [(name2_dict[NAME_1][i], fuzz.ratio(str(x).lower(), i.lower())) for i in \
                    name2_dict[NAME_1].keys()], key=lambda k:-k[1])[0][0])

In [557]:
votes_vice = votes_vice.withColumn('NAME_1', province_NAME(votes_vice['province']))
votes_president = votes_president.withColumn('NAME_1', 
                province_NAME(votes_president['province']))
votes_vice = votes_vice.withColumn('NAME_2', 
                municipality_NAME(votes_vice.NAME_1, votes_vice.municipality))
votes_president = votes_president.withColumn('NAME_2', 
                municipality_NAME(votes_president.NAME_1, votes_president.municipality))

# Saving files
We extract these files to `.csv` format for easier transfer to Tableau.

In [558]:
(votes_president.repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("president"))

(votes_vice.repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("vice_president"))

In [567]:
import glob, os
os.rename(glob.glob('president/*.csv')[0], 'president.csv')
os.rename(glob.glob('vice_president/*.csv')[0], 'vice_president.csv')

# Get Congress Data

In [535]:
from IPython.display import clear_output
pd.set_option('display.max_rows', 200)

In [543]:
request = requests.get('http://www.congress.gov.ph/members/', 'lxml')
soup = BeautifulSoup(request.text) if request.status_code == 200 else False
if soup:
    congress = pd.read_html(str(soup.select('body > div > div > div.col-md-8 > table')[0]))[0]

In [544]:
congress.columns = ['name', 'location']
congress = congress.loc[1:]
congress['province'] = congress['location'].apply(lambda x: str(x).split(',')[0])
congress = congress.groupby('province', as_index=False)['location'].count()\
.rename(columns={'location':'count'})
congress = congress[~congress['province'].str.contains('Party')]

In [545]:
congress_name1 = {'Antipolo City': 'Rizal',
 'Bacolod City': 'Negros Occidental',
 'Baguio City': 'Benguet',
 'Biñan City': 'Laguna',
 'Cagayan de Oro City': 'Misamis Oriental',
 'Calamba City': 'Laguna',
 'Caloocan City': 'Metropolitan Manila',
 'Cebu City': 'Cebu',
 'Davao City': 'Davao del Sur',
 'Davao Occidental': 'Davao del Sur',
 'Davao de Oro': 'Compostela Valley',
 'Iligan City': 'Lanao del Norte',
 'Iloilo City': 'Iloilo',
 'Lapu-Lapu City': 'Cebu',
 'Las Piñas City': 'Metropolitan Manila',
 'Makati City': 'Metropolitan Manila',
 'Malabon City': 'Metropolitan Manila',
 'Mandaluyong City': 'Metropolitan Manila',
 'Manila': 'Metropolitan Manila',
 'Marikina City': 'Metropolitan Manila',
 'Muntinlupa City': 'Metropolitan Manila',
 'Navotas City': 'Metropolitan Manila',
 'Parañaque City': 'Metropolitan Manila',
 'Pasay City': 'Metropolitan Manila',
 'Pasig City': 'Metropolitan Manila',
 'Quezon City': 'Metropolitan Manila',
 'San Jose Del Monte City': 'Bulacan',
 'San Juan City': 'Metropolitan Manila',
 'Taguig City-Pateros': 'Metropolitan Manila',
 'Valenzuela City': 'Metropolitan Manila',
 'Zamboanga City': 'Zamboanga del Sur'}

In [546]:
congress['NAME_1'] = congress['province'].apply(lambda x: congress_name1[str(x)] \
                if str(x) in congress_name1.keys() else name1_dict[str(x).lower()])

In [563]:
congress = congress.drop('province', axis=1).groupby('NAME_1', as_index=False)['count'].sum()

In [564]:
congress

Unnamed: 0,NAME_1,count
0,Abra,1
1,Agusan del Norte,2
2,Agusan del Sur,2
3,Aklan,2
4,Albay,3
5,Antique,1
6,Apayao,1
7,Aurora,1
8,Basilan,1
9,Bataan,2
