# Data Transformation, Modeling and Quality

In this notebook we'll look at how to transform the input datasets to be consumed into a Data Model.

### Step 1 - What inputs do you have?

In [None]:
%%bash
ls ../data

### Step 2 - Get your tools setup

As part of this we will be using [PySpark](http://spark.apache.org/docs/2.1.1/api/python/index.html) to inspect the data on hand and also gather some basic details.

In [None]:
import os
from IPython.display import display, HTML
import pandas as pd

#Locating where pyspark is installed
import findspark
findspark.init()
import pyspark

#Settings for PySpark to work
driver_memory = '4g'
num_executors = 2
executor_memory = '1g'
#pyspark_submit_args = ' --driver-memory ' + driver_memory + ' --executor-memory ' + executor_memory + ' --num-executors ' + num_executors + ' pyspark-shell'
pyspark_submit_args = ' --driver-memory ' + driver_memory + ' pyspark-shell'

#Setting the required parameters to start up PySpark
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

#Import Modules Needed for PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  20, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
    else:
        pd.set_option('display.max_colwidth', -1)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

In [None]:
#Creating a spark session
spark = SparkSession.builder.appName("Data Transformation").getOrCreate()

### Step 3 - Read the data

We will be using the WDICountry, WDISeries and WDIData files for all the activity below.

In [None]:
#Read the file into a Spark Data Frame
country = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/WDICountry.csv")

country.count()

In [None]:
series = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/WDISeries.csv")

series.count()

In [None]:
indicators = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/WDIData.csv")

indicators.count()

### Step 4 - Transform the data

#### Transform the Country Dataset
- Select the columns that we will need for our data model
- Rename columns for data ingestion

We'll be using the various operations supported for a DataFrame.  You can view the complete list [here](http://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame) .

In [None]:
countryDim = country \
    .select("2-alpha code", "Country Code", "Short Name", "Long Name", "Region", "Income Group") \
    .withColumnRenamed("2-alpha code", "country_iso_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .withColumnRenamed("Short Name", "country_name") \
    .withColumnRenamed("Long Name", "country_long_name") \
    .withColumnRenamed("Region", "region") \
    .withColumnRenamed("Income Group", "income_group")
    
showDF(countryDim)

#### You can also do the similar transformations in sql
Let's rename the country_name and country_long_name columns

In [None]:
#Lets you create a view that you can use in SQL queries
countryDim.createOrReplaceTempView("country")

In [None]:
transformQuery = """
select 
    country_iso_code
    , wb_country_code
    , country_name as name
    , country_long_name as long_name
    , region
    , income_group
from 
    country
"""

showDF(spark.sql(transformQuery))

### Step 5 - Check the data quality

##### Do all countries have 2 character country_iso_codes ?

We will use some sql functions supported in PySpark for this exercise.  You can find a complete list of functions supported [here](http://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#module-pyspark.sql.functions) .

In [None]:
showDF( \
    countryDim \
       .select(col("country_iso_code"), \
               length(col("country_iso_code")).alias("column_length")) \
       .groupBy("column_length") \
       .agg(count("*").alias("cnt")) \
       .filter("cnt > 1") \
)

In [None]:
#You can also do this in SQL
countryCodeLengthQuery = """
select 
    length(country_iso_code) as column_length
    , count(1) as cnt
from 
    country
group by 
    length(country_iso_code)
having 
    count(1) > 1
"""

showDF(spark.sql(countryCodeLengthQuery))

## Quick Exercise
Do you notice a difference in the counts returned by the Quality check for the 2-character country_iso_code vs the original row count for the WDICountry.csv ?

In [None]:
# Paste your solution here


##### Do we have duplicate records for any of the key columns ?

In [None]:
showDF(countryDim.groupBy("country_iso_code").agg(count("*").alias("cnt")).filter("cnt > 1"))

showDF(countryDim.groupBy("wb_country_code").agg(count("*").alias("cnt")).filter("cnt > 1"))

showDF(countryDim.groupBy("country_name").agg(count("*").alias("cnt")).filter("cnt > 1"))

### Step 6 - Fix any issues with data quality

In [None]:
countryDimFinal = countryDim.filter("country_iso_code is not null")

showDF(countryDimFinal)

In [None]:
countryDimFinal.count()

### Step 7 - Write the data to the destination

In [None]:
# Here we are going to write the country dimension to an output csv file
countryDimFinal \
    .coalesce(1) \
    .write.csv('../output/CountryDim', mode='overwrite', header='true')

### Step 8 - Check if the output was written out as expected

In [None]:
%%bash
cat ../output/CountryDim/*csv | head

## Exercise

#### Transform the Series Dataset and make it available in the dataframe seriesDim
- Filter only for series that have Annual periodicity
- Get the following columns and rename the selected columns to prepare further processing  

| Name in CSV | Column Name |
| ------------- |:-------------:|
| Series Code | indicator_code |
| Indicator Name | indicator_name |
| Periodicity | periodicity |
| Aggregation Method | aggregation_method |


##### How many series do you end up with ?

In [None]:
#Your solution here
#seriesDim = ??? 

#### What data quality checks were you able to perform ?

- Are there any duplicate codes ?
- Are all indicator_codes following the same pattern ?
- Is the case on columns consistent ? 

In [None]:
#Your solution here


### Complex Transformations
##### Problem Statement
We want to measure the cellular and broadband penetration in comparison to the population demographics for every country.  It'll also be helpful to get some insights on annual global aggregates.

###### Our dataset has multiple types of metrics.  The only ones that we care about are simple aggregates.

In [None]:
simpleAggIndicators = seriesDim \
    .filter("lower(aggregation_method) = 'sum'") \
    .select("indicator_code", "indicator_name") \
    .orderBy("indicator_code")

showDF(simpleAggIndicators, limitRows = 500, truncate = False)

##### Only keep the indicators that are relevant to requirements i.e. Population indicators and Cellular and Broadband penetration

In [None]:
targetIndicators = simpleAggIndicators \
    .filter("lower(indicator_name) like '%population%total%' " + 
            " or lower(indicator_name) like '%cellular%' " +
            " or lower(indicator_name) like '%broadband%'") \
    .filter("lower(indicator_name) not like '%refugee%'")

showDF(targetIndicators)

##### Now that we have identified the various indicators of interest, we can continue with getting the metrics for these indicators

In [None]:
# Keep the columns that are relevant for further transformations
indicatorsData = indicators \
    .withColumnRenamed("Indicator Code", "indicator_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .drop("Indicator Name") \
    .drop("Country Name") \
    .drop("_c62")

In [None]:
#Keep only the indicators that we care about
targetIndicatorsData = indicatorsData.join(targetIndicators \
                                         , indicatorsData.indicator_code == targetIndicators.indicator_code) \
    .drop(targetIndicators.indicator_code)

In [None]:
showDF(targetIndicatorsData)

#### The output that we see currently isn't the most ideal from a modelling perspective.  
A well modeled dataset, should allow for data to be easily augmented.  e.g. instead of having a column for each year we would ideally prefer a row for each year to be able to add more rows in the future similar to the output of the following code block.

In [None]:
indicatorsSample = targetIndicatorsData \
    .select(col("wb_country_code")
            , col("indicator_code")
            , lit("1960").alias("year")
            , col("1960").alias("indicator_value")) \
    .filter("indicator_value >= 0.0")

showDF(indicatorsSample)

##### Let us start by getting the list of years that we have metrics for

In [None]:
yearList = [x for x in targetIndicatorsData.schema.names \
             if x != 'wb_country_code' and x != 'indicator_code' and x != 'indicator_name'] 

print(yearList)

In [None]:
#Cheat for creating a dataframe with no rows 
indicatorsDF = indicatorsSample.filter('1 = 0')

#Iterate through the list of years and store the rows in the DataFrame we created above
for indicatorYear in yearList:
    print("Processing indicators for " + indicatorYear)
    yearIndicatorDF = targetIndicatorsData \
        .select(col("wb_country_code")
                , col("indicator_code")
                , lit(indicatorYear).alias("year")
                , col(indicatorYear).alias("indicator_value")) \
        .filter("indicator_value >= 0")
    indicatorsDF = indicatorsDF.union(yearIndicatorDF)    

#### Let's cache the dataset to iterate over it

In [None]:
# You can iterate over a dataframe that is already computed by caching it onces and using it repeatedly
indicatorsDF.cache()

#Force the data to be cached
indicatorsDF.count()

In [None]:
#Check the indicator counts per year
#showDF(indicatorsDF.groupBy('year').agg(count("*")).orderBy("year"), limitRows=100)

#### Getting yearly indicator totals

In [None]:
yearPivot = indicatorsDF.groupBy('year').pivot('indicator_code').sum('indicator_value') 

In [None]:
showDF(yearPivot.orderBy('year'))

In [None]:
yearPivot.printSchema()

In [None]:
yearPivotDF = yearPivot.orderBy('year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population')

#### Data Quality Checkpoint 

In [None]:
# You can iterate over a dataframe that is already computed by caching it onces and using it repeatedly
yearPivotDF.cache()

#Forces the data to be cached
yearPivotDF.count()

In [None]:
yearPivotDF.filter('population_age_0_to_14 < 0').count()

In [None]:
yearPivotDF.filter('population_age_15_64 < 0').count()

In [None]:
yearPivotDF.filter('population_age_0_to_14 < 0').count()

In [None]:
yearPivotDF.filter('population_age_65_and_above < 0').count()

In [None]:
yearPivotDF.filter('population < 0').count()

In [None]:
yearPivotDF.filter('cellular_subscriptions < 0').count()

In [None]:
yearPivotDF.filter('broadband_subscriptions < 0').count()

In [None]:
yearPivotDF.filter('population_age_0_to_14 > population').count()

In [None]:
yearPivotDF.filter('population_age_15_64 > population').count()

In [None]:
yearPivotDF.filter('population_age_65_and_above > population').count()

In [None]:
yearPivotDF.filter('(population_age_0_to_14 + population_age_15_64 + population_age_65_and_above) > population').count()

In [None]:
#Write the yearly totals to a CSV File
yearPivotDF \
    .select(col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('../output/YearlyStats', mode='overwrite', header='true')

#### Getting yearly regional totals

In [None]:
regionalIndicators = indicatorsDF.join(countryDimFinal
                                       , indicatorsDF.wb_country_code == countryDim.wb_country_code
                                       , "inner") \
    .select(countryDim.region
            , indicatorsDF.wb_country_code
            , indicatorsDF.year
            , indicatorsDF.indicator_code
            , indicatorsDF.indicator_value)

In [None]:
showDF(regionalIndicators)

In [None]:
regionalPivot = regionalIndicators.groupBy('region', 'year').pivot('indicator_code').sum('indicator_value')

In [None]:
showDF(regionalPivot.orderBy('region', 'year'), limitRows=100)

In [None]:
#Write the regional-yearly totals to a CSV File
regionalPivot.filter('region is not null') \
    .orderBy('region','year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select(col('region')
            , col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('../output/RegionalStats', mode='overwrite', header='true')

## Short Exercise

Becky finds the regional metrics interesting, but she wants to look at these metrics at a country level for each year.  Can you adapt the regional pivot that we computed earlier to get the metrics for each year and country ?

In [None]:
#Your solution here

## Exercise

Kat wants to identify the countries that are conducive to start a business.  She thinks that it would suffice to look at the most recent metrics for the following:
- Gross National Income (GNI)
- Cost of business start-up procedures
- Number of days required to start a business
- Number of start-up procedures to register a business
- GDP
- GDP per capita
- Business Regulatory Environment
- Ease of doing business index (Only available in 2017)

Write the data to a csv file.

Hint - Start by matching up indicators that might have descriptions matching what Kat is looking for. e.g. <br>
`showDF(seriesDim.filter("indicator_name like '%GNI%'").orderBy("indicator_code"), limitRows = 500)`