# Project Title
### Data Engineering Capstone Project

#### Project Summary

You are part of an Analytics team and tasked up to analyze the World Development Indicators(WDI) data and find which country is the most suitable to start up a Business. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import os
from IPython.display import display, HTML
import pandas as pd

#Locating where pyspark is installed
import findspark
findspark.init()
import pyspark


#Settings for PySpark to work
driver_memory = '4g'
num_executors = 2
executor_memory = '1g'
#pyspark_submit_args = ' --driver-memory ' + driver_memory + ' --executor-memory ' + executor_memory + ' --num-executors ' + num_executors + ' pyspark-shell'
pyspark_submit_args = ' --driver-memory ' + driver_memory + ' pyspark-shell'

#Setting the required parameters to start up PySpark
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

#Import Modules Needed for PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DecimalType

### Step 1: Scope the Project and Gather Data

#### Scope 
I will be using the available data about World Development Indicators data and will be creating an ETL pipeline that will extract the data from downloaded datasets and performing initial Exploratory Data Analysis and do necessary transformations and filtering. This filtered and transformed data will be later exported as csv files for further analysis.

#### Describe and Gather Data 
For this purpose, I collected data from the following source:

World Development Indicators(WDI- Data): https://datacatalog.worldbank.org/dataset/world-development-indicators

Format: csv(comma-separated values)

Granularity: National

Number of Economies: 217

Temporal Coverage: 1960 - 2020

Release Date: June 11, 2010

Last Updated: December 16, 2020

In [2]:
def showDF(df, limitRows =  20, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
    else:
        pd.set_option('display.max_colwidth', None)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

In [3]:
#Creating a spark session
spark = SparkSession.builder.appName("Data Engineering Nanodegree").getOrCreate()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### Reading the Country wise data(WDICountry.csv)

In [4]:
#Read the file into a Spark Data Frame
wdicountry = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/jovyan/work/data/WDICountry.csv")

##### Inspecting the schema we have just read

In [5]:
wdicountry.printSchema()

root
 |-- Country Code: string (nullable = true)
 |-- Short Name: string (nullable = true)
 |-- Table Name: string (nullable = true)
 |-- Long Name: string (nullable = true)
 |-- 2-alpha code: string (nullable = true)
 |-- Currency Unit: string (nullable = true)
 |-- Special Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income Group: string (nullable = true)
 |-- WB-2 code: string (nullable = true)
 |-- National accounts base year: string (nullable = true)
 |-- National accounts reference year: integer (nullable = true)
 |-- SNA price valuation: string (nullable = true)
 |-- Lending category: string (nullable = true)
 |-- Other groups: string (nullable = true)
 |-- System of National Accounts: string (nullable = true)
 |-- Alternative conversion factor: string (nullable = true)
 |-- PPP survey year: string (nullable = true)
 |-- Balance of Payments Manual in use: string (nullable = true)
 |-- External debt Reporting status: string (nullable = true)
 |-- Sys

##### Chekcing few sample data

In [6]:
showDF(wdicountry, truncate = False)

Unnamed: 0,Country Code,Short Name,Table Name,Long Name,2-alpha code,Currency Unit,Special Notes,Region,Income Group,WB-2 code,...,Government Accounting concept,IMF data dissemination standard,Latest population census,Latest household survey,Source of most recent Income and expenditure data,Vital registration complete,Latest agricultural census,Latest industrial data,Latest trade data,Latest water withdrawal data
0,ABW,Aruba,Aruba,Aruba,AW,Aruban florin,SNA data for 2000-2011 are updated from official government statistics; 1994-1999 from UN databases. Base year has changed from 1995 to 2000.,Latin America & Caribbean,High income,AW,...,,Enhanced General Data Dissemination System (e-GDDS),2010,,,Yes,,,2016.0,
1,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,"Fiscal year end: March 20; reporting period for national accounts data is calendar year, estimated to insure consistency between national accounts and fiscal data. National accounts data are sourced from the IMF and differ from the Central Statistics Organization numbers due to exclusion of the opium economy.",South Asia,Low income,AF,...,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),1979,"Demographic and Health Survey, 2015","Integrated household survey (IHS), 2011",,,,2016.0,2000.0
2,AGO,Angola,Angola,People's Republic of Angola,AO,Angolan kwanza,,Sub-Saharan Africa,Lower middle income,AO,...,Budgetary central government,Enhanced General Data Dissemination System (e-GDDS),2014,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2008/09",,,,2016.0,2005.0
3,ALB,Albania,Albania,Republic of Albania,AL,Albanian lek,,Europe & Central Asia,Upper middle income,AL,...,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),2011,"Demographic and Health Survey, 2008/09","Living Standards Measurement Study Survey (LSMS), 2012",Yes,2012,2013.0,2016.0,2006.0
4,AND,Andorra,Andorra,Principality of Andorra,AD,Euro,WB-3 code changed from ADO to AND to align with ISO code.,Europe & Central Asia,High income,AD,...,,,2011. Population data compiled from administrative registers.,,,Yes,,,,
5,ARB,Arab World,Arab World,Arab World,1A,,Arab World aggregate. Arab World is composed of members of the League of Arab States.,,,1A,...,,,,,,,,,2016.0,
6,ARE,United Arab Emirates,United Arab Emirates,United Arab Emirates,AE,U.A.E. dirham,,Middle East & North Africa,High income,AE,...,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),2010,"World Health Survey, 2003",,,2012,1985.0,2016.0,2005.0
7,ARG,Argentina,Argentina,Argentine Republic,AR,Argentine peso,"National Institute of Statistics and Census revised national accounts from 2004-2015. Argentina, which was temporarily unclassified in July 2016 pending release of revised national accounts statistics, is classified as upper middle income for FY17 as of September 29, 2016.",,,,...,,,,,,,,,,
8,ARM,Armenia,Armenia,Republic of Armenia,AM,Armenian dram,,Europe & Central Asia,Lower middle income,AM,...,Consolidated central government,Special Data Dissemination Standard (SDDS),2011,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2015",Yes,2014,,2016.0,2012.0
9,ASM,American Samoa,American Samoa,American Samoa,AS,U.S. dollar,New base Year 2009,East Asia & Pacific,Upper middle income,AS,...,,,2010,,,Yes,2008,,2016.0,


##### Analysing few basic stas on the data we have read

In [7]:
wdicountry.count()

263

##### Examining Country Data Dimensions

##### How many different regions the countries belong to?

In [8]:
showDF(wdicountry.select('Region').distinct(), truncate = False)

Unnamed: 0,Region
0,South Asia
1,
2,Sub-Saharan Africa
3,Europe & Central Asia
4,North America
5,East Asia & Pacific
6,Middle East & North Africa
7,Latin America & Caribbean


##### How many different income groups do we have across countries?

In [9]:
showDF(wdicountry.select('Income Group').distinct(), truncate = False)

Unnamed: 0,Income Group
0,Lower middle income
1,
2,High income
3,Upper middle income
4,Low income


##### Reading Series data(WDISeries.csv)

In [10]:
wdiseries = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/jovyan/work/data/WDISeries.csv")

In [11]:
wdiseries.printSchema()

root
 |-- Series Code: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Short definition: string (nullable = true)
 |-- Long definition: string (nullable = true)
 |-- Unit of measure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Base Period: string (nullable = true)
 |-- Other notes: string (nullable = true)
 |-- Aggregation method: string (nullable = true)
 |-- Limitations and exceptions: string (nullable = true)
 |-- Notes from original source: string (nullable = true)
 |-- General comments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Statistical concept and methodology: string (nullable = true)
 |-- Development relevance: string (nullable = true)
 |-- Related source links: string (nullable = true)
 |-- Other web links: string (nullable = true)
 |-- Related indicators: string (nullable = true)
 |-- License Type: string (nullable = true)
 |-- _c20: string (nullable = tru

In [12]:
showDF(wdiseries)

Unnamed: 0,Series Code,Topic,Indicator Name,Short definition,Long definition,Unit of measure,Periodicity,Base Period,Other notes,Aggregation method,...,Notes from original source,General comments,Source,Statistical concept and methodology,Development relevance,Related source links,Other web links,Related indicators,License Type,_c20
0,AG.AGR.TRAC.NO,Environment: Agricultural production,"Agricultural machinery, tractors",,Agricultural machinery refers to the number of...,,Annual,,,Sum,...,,,"Food and Agriculture Organization, electronic ...",A tractor provides the power and traction to m...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
1,AG.CON.FERT.PT.ZS,Environment: Agricultural production,Fertilizer consumption (% of fertilizer produc...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,"Factors such as the green revolution, has led ...",,,,CC BY-4.0,
2,AG.CON.FERT.ZS,Environment: Agricultural production,Fertilizer consumption (kilograms per hectare ...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,"Factors such as the green revolution, has led ...",,,,CC BY-4.0,
3,AG.LND.AGRI.K2,Environment: Land use,Agricultural land (sq. km),,Agricultural land refers to the share of land ...,,Annual,,,Sum,...,,,"Food and Agriculture Organization, electronic ...",Agricultural land constitutes only a part of a...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
4,AG.LND.AGRI.ZS,Environment: Land use,Agricultural land (% of land area),,Agricultural land refers to the share of land ...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Agriculture is still a major sector in many ec...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
5,AG.LND.ARBL.HA,Environment: Land use,Arable land (hectares),,Arable land (in hectares) includes land define...,,Annual,,,,...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
6,AG.LND.ARBL.HA.PC,Environment: Land use,Arable land (hectares per person),,Arable land (hectares per person) includes lan...,,Annual,,,Weighted Average,...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers about one-third of th...,,,,CC BY-4.0,
7,AG.LND.ARBL.ZS,Environment: Land use,Arable land (% of land area),,Arable land includes land defined by the FAO a...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",Temporary fallow land refers to land left fall...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,
8,AG.LND.CREL.HA,Environment: Agricultural production,Land under cereal production (hectares),,Land under cereal production refers to harvest...,,Annual,,,Sum,...,,,"Food and Agriculture Organization, electronic ...","Cereals production includes wheat, rice, maize...",The cultivation of cereals varies widely in di...,,,,CC BY-4.0,
9,AG.LND.CROP.ZS,Environment: Land use,Permanent cropland (% of land area),,Permanent cropland is land cultivated with cro...,,Annual,,,Weighted average,...,,,"Food and Agriculture Organization, electronic ...",The data on Permanent cropland and land area a...,Agricultural land covers more than one-third o...,,,,CC BY-4.0,


In [13]:
wdiseries.count()

1593

##### Examining Series Data Dimensions

##### What are the different periodicities or aggregation methods we might expect to see in the data ?

In [14]:
showDF(wdiseries.select('Periodicity').distinct(), truncate = False)

Unnamed: 0,Periodicity
0,Annual
1,
2,Quarterly (represented as Annual)
3,"International Civil Aviation Organization, Civil Aviation Statistics of the World and ICAO staff estimates."


In [15]:
showDF(wdiseries.select('Aggregation Method').distinct(), truncate = False)

Unnamed: 0,Aggregation Method
0,
1,Weighted average
2,Simple average
3,Gap-filled total
4,Median
5,Unweighted average
6,Linear mixed-effect model estimates
7,Weighted Average
8,Sum


##### Reading the World Development Indicators data(WDIData.csv)

In [16]:
# Read the data
wdiindicators = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/jovyan/work/data/WDIData.csv")

In [17]:
# Inspect the schema
wdiindicators.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Indicator Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (null

In [18]:
# Look at sample records
showDF(wdiindicators)

Unnamed: 0,Country Name,Country Code,Indicator Name,Indicator Code,1960,1961,1962,1963,1964,1965,...,2012,2013,2014,2015,2016,2017,2018,2019,2020,_c65
0,Arab World,ARB,Access to clean fuels and technologies for coo...,EG.CFT.ACCS.ZS,,,,,,,...,83.120303,83.533457,83.897596,84.171599,84.510171,,,,,
1,Arab World,ARB,Access to electricity (% of population),EG.ELC.ACCS.ZS,,,,,,,...,87.51226,88.129881,87.275323,88.720097,89.308602,90.283638,89.286856,,,
2,Arab World,ARB,"Access to electricity, rural (% of rural popul...",EG.ELC.ACCS.RU.ZS,,,,,,,...,77.251714,78.165706,75.512153,78.211,79.065508,81.102134,79.2481,,,
3,Arab World,ARB,"Access to electricity, urban (% of urban popul...",EG.ELC.ACCS.UR.ZS,,,,,,,...,96.435957,96.772853,96.466705,96.936319,97.290083,97.467915,97.063959,,,
4,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.ZS,,,,,,,...,,,30.27713,,,37.165211,,,,
5,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.FE.ZS,,,,,,,...,,,22.07935,,,25.635403,,,,
6,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.MA.ZS,,,,,,,...,,,37.790764,,,48.328518,,,,
7,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.OL.ZS,,,,,,,...,,,34.216583,,,42.542046,,,,
8,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.40.ZS,,,,,,,...,,,22.77989,,,27.724781,,,,
9,Arab World,ARB,Account ownership at a financial institution o...,FX.OWN.TOTL.PL.ZS,,,,,,,...,,,21.278042,,,26.458111,,,,


In [19]:
# Get some basic stats
wdiindicators.count()

380160

In [1]:
from IPython.display import Image

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

##### Since the goal is data exploration, the data is normalized for query execution. Each table is given a primary key and data extracted as needed. I have built the conceptual data model based on the star schema because a star schema database has a small number of tables and clear join paths, queries run faster than they do against an OLTP system. Small single-table queries, usually of dimension tables, are almost instantaneous. Large join queries that involve multiple tables take only seconds or minutes to run. 

![alt text](datamodel.jpg "Title")

#### 3.2 Mapping Out Data Pipelines
##### Please find the below data transformation logics which will map out the data model mentioned above

#### Data Transformation Logic

#### Transform Country Data
- Select the columns that we will need for our data model
- Rename columns for data ingestion

In [20]:
wdicountryDim = wdicountry \
    .select("2-alpha code", "Country Code", "Short Name", "Long Name", "Region", "Income Group") \
    .withColumnRenamed("2-alpha code", "country_iso_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .withColumnRenamed("Short Name", "country_name") \
    .withColumnRenamed("Long Name", "country_long_name") \
    .withColumnRenamed("Region", "region") \
    .withColumnRenamed("Income Group", "income_group")
    
showDF(wdicountryDim)

Unnamed: 0,country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


In [30]:
#Lets you create a view that you can use in SQL queries
wdicountryDim.createOrReplaceTempView("wdicountryvw")

In [31]:
transformQuery = """
select 
    country_iso_code
    , wb_country_code
    , country_name as name
    , country_long_name as long_name
    , region
    , income_group
from 
    wdicountryvw
"""

showDF(spark.sql(transformQuery))

Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


In [26]:
transformQuery = """
select 
    count(*)
from 
    wdicountry
"""

showDF(spark.sql(transformQuery))

Unnamed: 0,count(1)
0,263


#### Transform the Series Dataset
- Filter only for series that have Annual periodicity
- Get the following columns and rename the selected columns to prepare further processing

In [37]:
wdiseriesDim = wdiseries \
    .select("Series Code", "Indicator Name", "Short Definition", "Periodicity", "Aggregation Method") \
    .withColumnRenamed("Series Code", "indicator_code") \
    .withColumnRenamed("Indicator Name", "indicator_name") \
    .withColumnRenamed("Periodicity", "periodicity") \
    .withColumnRenamed("Aggregation Method", "aggregation_method") \
    .filter(col("periodicity") == "Annual") 
    
showDF(wdiseriesDim)

wdiseriesDim.count()

Unnamed: 0,indicator_code,indicator_name,Short Definition,periodicity,aggregation_method
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors",,Annual,Sum
1,AG.CON.FERT.PT.ZS,Fertilizer consumption (% of fertilizer produc...,,Annual,Weighted average
2,AG.CON.FERT.ZS,Fertilizer consumption (kilograms per hectare ...,,Annual,Weighted average
3,AG.LND.AGRI.K2,Agricultural land (sq. km),,Annual,Sum
4,AG.LND.AGRI.ZS,Agricultural land (% of land area),,Annual,Weighted average
5,AG.LND.ARBL.HA,Arable land (hectares),,Annual,
6,AG.LND.ARBL.HA.PC,Arable land (hectares per person),,Annual,Weighted Average
7,AG.LND.ARBL.ZS,Arable land (% of land area),,Annual,Weighted average
8,AG.LND.CREL.HA,Land under cereal production (hectares),,Annual,Sum
9,AG.LND.CROP.ZS,Permanent cropland (% of land area),,Annual,Weighted average


1587

##### Our dataset has multiple types of metrics. The only ones that we care about are simple aggregates.

In [38]:
simpleAggInd = wdiseriesDim \
    .filter("lower(aggregation_method) = 'sum'") \
    .select("indicator_code", "indicator_name") \
    .orderBy("indicator_code")

showDF(simpleAggInd, limitRows = 500, truncate = False)

Unnamed: 0,indicator_code,indicator_name
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors"
1,AG.LND.AGRI.K2,Agricultural land (sq. km)
2,AG.LND.CREL.HA,Land under cereal production (hectares)
3,AG.LND.EL5M.RU.K2,Rural land area where elevation is below 5 meters (sq. km)
4,AG.LND.EL5M.UR.K2,Urban land area where elevation is below 5 meters (sq. km)
5,AG.LND.FRST.K2,Forest area (sq. km)
6,AG.LND.TOTL.K2,Land area (sq. km)
7,AG.LND.TOTL.RU.K2,Rural land area (sq. km)
8,AG.LND.TOTL.UR.K2,Urban land area (sq. km)
9,AG.PRD.CREL.MT,Cereal production (metric tons)


##### Only keep the indicators that are relevant to requirements i.e. Population indicators and Cellular and Broadband penetration

In [39]:
targetInd = simpleAggInd \
    .filter("lower(indicator_name) like '%population%total%' " + 
            " or lower(indicator_name) like '%cellular%' " +
            " or lower(indicator_name) like '%broadband%'") \
    .filter("lower(indicator_name) not like '%refugee%'")

showDF(targetInd)

Unnamed: 0,indicator_code,indicator_name
0,IT.CEL.SETS,Mobile cellular subscriptions
1,IT.NET.BBND,Fixed broadband subscriptions
2,SP.POP.0014.TO,"Population ages 0-14, total"
3,SP.POP.1564.TO,"Population ages 15-64, total"
4,SP.POP.65UP.TO,"Population ages 65 and above, total"
5,SP.POP.TOTL,"Population, total"


##### Now that we have identified the various indicators of interest, we can continue with getting the metrics for these indicators

In [40]:
# Keep the columns that are relevant for further transformations
indData = wdiindicators \
    .withColumnRenamed("Indicator Code", "indicator_code") \
    .withColumnRenamed("Country Code", "wb_country_code") \
    .drop("Indicator Name") \
    .drop("Country Name") \
    .drop("_c62")

In [42]:
#Keep only the indicators that we care about
targetIndData = indData.join(targetInd, indData.indicator_code == targetInd.indicator_code).drop(targetInd.indicator_code)

In [43]:
showDF(targetIndData)

Unnamed: 0,wb_country_code,indicator_code,1960,1961,1962,1963,1964,1965,1966,1967,...,2013,2014,2015,2016,2017,2018,2019,2020,_c65,indicator_name
0,ARB,IT.NET.BBND,,,,,,,,,...,11423550.0,13345100.0,19216250.0,21308220.0,29518910.0,30683980.0,33227420.0,,,Fixed broadband subscriptions
1,ARB,IT.CEL.SETS,0.0,,,,,0.0,,,...,407704600.0,415029300.0,419014200.0,417212300.0,416484300.0,420743900.0,423704000.0,,,Mobile cellular subscriptions
2,ARB,SP.POP.0014.TO,39900284.0,41339703.0,42792877.0,44248946.0,45685285.0,47089854.0,48668380.0,50184681.0,...,126213700.0,128449800.0,130629500.0,133190600.0,135468700.0,137609200.0,139782900.0,,,"Population ages 0-14, total"
3,ARB,SP.POP.1564.TO,49063244.0,50032198.0,51072095.0,52200457.0,53449205.0,54836642.0,56150911.0,57648467.0,...,237528400.0,242991700.0,248365400.0,253159500.0,258053200.0,263047900.0,268157900.0,,,"Population ages 15-64, total"
4,ARB,SP.POP.65UP.TO,3234225.0,3352609.0,3469470.0,3584776.0,3698270.0,3809935.0,3939319.0,4066216.0,...,15963580.0,16466320.0,17033360.0,17674290.0,18377120.0,19133540.0,19929500.0,,,"Population ages 65 and above, total"
5,ARB,SP.POP.TOTL,92197753.0,94724510.0,97334442.0,100034179.0,102832760.0,105736431.0,108758610.0,111899364.0,...,379705700.0,387907700.0,396028300.0,404024400.0,411899000.0,419790600.0,427870300.0,,,"Population, total"
6,CSS,IT.NET.BBND,,,,,,,,,...,674419.0,753368.0,897134.0,971854.0,1003591.0,1083150.0,1135153.0,,,Fixed broadband subscriptions
7,CSS,IT.CEL.SETS,0.0,,,,,0.0,,,...,7738402.0,7952809.0,8158691.0,8483363.0,8225310.0,8108229.0,8374284.0,,,Mobile cellular subscriptions
8,CSS,SP.POP.0014.TO,1764314.0,1813810.0,1861239.0,1905670.0,1946434.0,1983382.0,2024504.0,2062386.0,...,1766079.0,1753162.0,1741354.0,1733030.0,1724191.0,1715605.0,1708076.0,,,"Population ages 0-14, total"
9,CSS,SP.POP.1564.TO,2150293.0,2172956.0,2197694.0,2224437.0,2252267.0,2280021.0,2301102.0,2321449.0,...,4676982.0,4725567.0,4771265.0,4810375.0,4847983.0,4882860.0,4913712.0,,,"Population ages 15-64, total"


##### Let us start by getting the list of years that we have metrics for

In [45]:
indSample = targetIndData \
    .select(col("wb_country_code")
            , col("indicator_code")
            , lit("1960").alias("year")
            , col("1960").alias("indicator_value")) \
    .filter("indicator_value >= 0.0")

showDF(indSample)

Unnamed: 0,wb_country_code,indicator_code,year,indicator_value
0,ARB,IT.CEL.SETS,1960,0.0
1,ARB,SP.POP.0014.TO,1960,39900284.0
2,ARB,SP.POP.1564.TO,1960,49063244.0
3,ARB,SP.POP.65UP.TO,1960,3234225.0
4,ARB,SP.POP.TOTL,1960,92197753.0
5,CSS,IT.CEL.SETS,1960,0.0
6,CSS,SP.POP.0014.TO,1960,1764314.0
7,CSS,SP.POP.1564.TO,1960,2150293.0
8,CSS,SP.POP.65UP.TO,1960,168897.0
9,CSS,SP.POP.TOTL,1960,4194713.0


In [46]:
yrList = [x for x in targetIndData.schema.names \
             if x != 'wb_country_code' and x != 'indicator_code' and x != 'indicator_name'] 

print(yrList)

['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969', '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979', '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989', '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '_c65']


In [48]:
#Cheat for creating a dataframe with no rows 
indDF = indSample.filter('1 = 0')

#Iterate through the list of years and store the rows in the DataFrame we created above
for indicatorYear in yrList:
    print("Processing indicators for " + indicatorYear)
    yrIndDF = targetIndData \
        .select(col("wb_country_code")
                , col("indicator_code")
                , lit(indicatorYear).alias("year")
                , col(indicatorYear).alias("indicator_value").cast(IntegerType())) \
        .filter("indicator_value >= 0")
    indDF = indDF.union(yrIndDF)

Processing indicators for 1960
Processing indicators for 1961
Processing indicators for 1962
Processing indicators for 1963
Processing indicators for 1964
Processing indicators for 1965
Processing indicators for 1966
Processing indicators for 1967
Processing indicators for 1968
Processing indicators for 1969
Processing indicators for 1970
Processing indicators for 1971
Processing indicators for 1972
Processing indicators for 1973
Processing indicators for 1974
Processing indicators for 1975
Processing indicators for 1976
Processing indicators for 1977
Processing indicators for 1978
Processing indicators for 1979
Processing indicators for 1980
Processing indicators for 1981
Processing indicators for 1982
Processing indicators for 1983
Processing indicators for 1984
Processing indicators for 1985
Processing indicators for 1986
Processing indicators for 1987
Processing indicators for 1988
Processing indicators for 1989
Processing indicators for 1990
Processing indicators for 1991
Processi

In [49]:
indDF.printSchema()

root
 |-- wb_country_code: string (nullable = true)
 |-- indicator_code: string (nullable = true)
 |-- year: string (nullable = false)
 |-- indicator_value: double (nullable = true)



In [50]:
indDF.count()

75040

In [51]:
yrPivot = indDF.groupBy('year').pivot('indicator_code').sum('indicator_value')

In [52]:
showDF(yrPivot.orderBy('year'))

Unnamed: 0,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,1960,0.0,,11663090000.0,17585730000.0,1409319000.0,29532070000.0
1,1961,,,11882330000.0,17746700000.0,1441242000.0,29844000000.0
2,1962,,,12173690000.0,17974040000.0,1473262000.0,30229310000.0
3,1963,,,12516640000.0,18283050000.0,1506747000.0,30687900000.0
4,1964,,,12833480000.0,18625090000.0,1538687000.0,31150210000.0
5,1965,0.0,,13119150000.0,19015740000.0,1570263000.0,31623210000.0
6,1966,,,13446890000.0,19380290000.0,1623280000.0,32105290000.0
7,1967,,,13724310000.0,19792680000.0,1675977000.0,32550400000.0
8,1968,,,13974660000.0,20246090000.0,1728541000.0,33001980000.0
9,1969,,,14229400000.0,20731970000.0,1781782000.0,33477410000.0


In [53]:
yrPivot.printSchema()

root
 |-- year: string (nullable = false)
 |-- IT.CEL.SETS: double (nullable = true)
 |-- IT.NET.BBND: double (nullable = true)
 |-- SP.POP.0014.TO: double (nullable = true)
 |-- SP.POP.1564.TO: double (nullable = true)
 |-- SP.POP.65UP.TO: double (nullable = true)
 |-- SP.POP.TOTL: double (nullable = true)



In [55]:
yrPivotDF = yrPivot.orderBy('year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population')

In [56]:
wdicountryDimFinal = wdicountryDim.filter("country_iso_code is not null")

showDF(wdicountryDimFinal)

Unnamed: 0,country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


In [58]:
regionalInd = indDF.join(wdicountryDimFinal
                                       , indDF.wb_country_code == wdicountryDim.wb_country_code
                                       , "inner") \
    .select(wdicountryDim.region
            , indDF.wb_country_code
            , indDF.year
            , indDF.indicator_code
            , indDF.indicator_value)

In [59]:
showDF(regionalInd)

Unnamed: 0,region,wb_country_code,year,indicator_code,indicator_value
0,,ARB,1960,IT.CEL.SETS,0.0
1,,ARB,1960,SP.POP.0014.TO,39900284.0
2,,ARB,1960,SP.POP.1564.TO,49063244.0
3,,ARB,1960,SP.POP.65UP.TO,3234225.0
4,,ARB,1960,SP.POP.TOTL,92197753.0
5,,CSS,1960,IT.CEL.SETS,0.0
6,,CSS,1960,SP.POP.0014.TO,1764314.0
7,,CSS,1960,SP.POP.1564.TO,2150293.0
8,,CSS,1960,SP.POP.65UP.TO,168897.0
9,,CSS,1960,SP.POP.TOTL,4194713.0


In [60]:
regionalPivot = regionalInd.groupBy('region', 'year').pivot('indicator_code').sum('indicator_value')

In [61]:
showDF(regionalPivot.orderBy('region', 'year'), limitRows=100)

Unnamed: 0,region,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,,1960,0.0,,10549930000.0,15855410000.0,1260558000.0,26538430000.0
1,,1961,,,10748730000.0,15999950000.0,1289075000.0,26810070000.0
2,,1962,,,11013620000.0,16204970000.0,1317713000.0,27143150000.0
3,,1963,,,11325660000.0,16484080000.0,1347686000.0,27537400000.0
4,,1964,,,11614030000.0,16793280000.0,1376253000.0,27934980000.0
5,,1965,0.0,,11874040000.0,17146820000.0,1404493000.0,28341840000.0
6,,1966,,,12172440000.0,17476870000.0,1452135000.0,28754670000.0
7,,1967,,,12425090000.0,17850500000.0,1499495000.0,29130860000.0
8,,1968,,,12653270000.0,18261750000.0,1546760000.0,29512780000.0
9,,1969,,,12885330000.0,18701980000.0,1594580000.0,29914420000.0


##### Regional metrics for each year and country 

In [62]:
countryInd = indDF.join(wdicountryDimFinal
                                       , indDF.wb_country_code == wdicountryDim.wb_country_code
                                       , "inner") \
    .select(indDF.wb_country_code
            , wdicountryDim.country_iso_code
            , wdicountryDim.country_name
            , indDF.year
            , indDF.indicator_code
            , indDF.indicator_value)

showDF(countryInd)

Unnamed: 0,wb_country_code,country_iso_code,country_name,year,indicator_code,indicator_value
0,ARB,1A,Arab World,1960,IT.CEL.SETS,0.0
1,ARB,1A,Arab World,1960,SP.POP.0014.TO,39900284.0
2,ARB,1A,Arab World,1960,SP.POP.1564.TO,49063244.0
3,ARB,1A,Arab World,1960,SP.POP.65UP.TO,3234225.0
4,ARB,1A,Arab World,1960,SP.POP.TOTL,92197753.0
5,CSS,S3,Caribbean small states,1960,IT.CEL.SETS,0.0
6,CSS,S3,Caribbean small states,1960,SP.POP.0014.TO,1764314.0
7,CSS,S3,Caribbean small states,1960,SP.POP.1564.TO,2150293.0
8,CSS,S3,Caribbean small states,1960,SP.POP.65UP.TO,168897.0
9,CSS,S3,Caribbean small states,1960,SP.POP.TOTL,4194713.0


In [63]:
countryPivot = countryInd.groupBy('country_iso_code', 'country_name', 'year') \
    .pivot('indicator_code').sum('indicator_value')

In [64]:
showDF(countryPivot.orderBy('country_iso_code', 'country_name', 'year'), limitRows=100)

Unnamed: 0,country_iso_code,country_name,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,1A,Arab World,1960,0.0,,39900280.0,49063240.0,3234225.0,92197750.0
1,1A,Arab World,1961,,,41339700.0,50032200.0,3352609.0,94724510.0
2,1A,Arab World,1962,,,42792880.0,51072100.0,3469470.0,97334440.0
3,1A,Arab World,1963,,,44248950.0,52200460.0,3584776.0,100034200.0
4,1A,Arab World,1964,,,45685280.0,53449200.0,3698270.0,102832800.0
5,1A,Arab World,1965,0.0,,47089850.0,54836640.0,3809935.0,105736400.0
6,1A,Arab World,1966,,,48668380.0,56150910.0,3939319.0,108758600.0
7,1A,Arab World,1967,,,50184680.0,57648470.0,4066216.0,111899400.0
8,1A,Arab World,1968,,,51657740.0,59286800.0,4191645.0,115136200.0
9,1A,Arab World,1969,,,53108290.0,61012250.0,4316656.0,118437200.0


In [65]:
recentInd = indData \
    .select("wb_country_code", "indicator_code", "2019") \
    .filter(col('indicator_code').isin('IC.REG.COST.PC.ZS', 'IC.REG.DURS', 'IC.REG.PROC', \
        'NY.GNP.ATLS.CD', 'NY.GDP.MKTP.KD', 'NY.GDP.PCAP.KD', 'IQ.CPA.BREG.XQ', 'IC.BUS.EASE.XQ')) \
    .withColumnRenamed("2019", "indicator_value") \
    .withColumn("indicator_value", col("indicator_value").cast(DecimalType(38, 2)))

showDF(recentInd)

Unnamed: 0,wb_country_code,indicator_code,indicator_value
0,ARB,IC.REG.COST.PC.ZS,27.8
1,ARB,IQ.CPA.BREG.XQ,2.42
2,ARB,IC.BUS.EASE.XQ,
3,ARB,NY.GDP.MKTP.KD,2750603986533.94
4,ARB,NY.GDP.PCAP.KD,6428.59
5,ARB,NY.GNP.ATLS.CD,2777310723208.36
6,ARB,IC.REG.PROC,6.64
7,ARB,IC.REG.DURS,19.65
8,CSS,IC.REG.COST.PC.ZS,18.08
9,CSS,IQ.CPA.BREG.XQ,3.2


In [66]:
businessIndexInd = indData \
    .select("wb_country_code", "indicator_code", "2020") \
    .filter(col('indicator_code').isin('IC.BUS.EASE.XQ')) \
    .withColumnRenamed("2020", "indicator_value") \
    .withColumn("indicator_value", col("indicator_value").cast(DecimalType(38, 2)))

In [68]:
allInd = recentInd.union(businessIndexInd)

In [69]:
countryBusinessStrtupPivot = allInd.join(wdicountryDimFinal
                                       , recentInd.wb_country_code == wdicountryDim.wb_country_code
                                       , "inner") \
    .select(wdicountryDimFinal.country_iso_code
            , wdicountryDimFinal.country_name
            , recentInd.indicator_code
            , recentInd.indicator_value) \
    .groupBy('country_iso_code', 'country_name').pivot('indicator_code').sum('indicator_value') \
    .withColumnRenamed('country_iso_code', 'Country ISO Code') \
    .withColumnRenamed('country_name', 'Country Name') \
    .withColumnRenamed('NY.GNP.ATLS.CD', 'GNI') \
    .withColumnRenamed('IC.REG.DURS', 'Startup Time') \
    .withColumnRenamed('IC.REG.PROC', 'Startup Procedures') \
    .withColumnRenamed('IC.REG.COST.PC.ZS', 'Startup Cost Pct of GNI') \
    .withColumnRenamed('NY.GDP.MKTP.KD', 'GDP') \
    .withColumnRenamed('NY.GDP.PCAP.KD', 'GDP Per Capita') \
    .withColumnRenamed('IQ.CPA.BREG.XQ', 'Business Regulation') \
    .withColumnRenamed('IC.BUS.EASE.XQ', 'Ease of business') \
    .withColumn('Startup Cost', (col('GNI') * col('Startup Cost Pct of GNI') / lit(100.0)).cast(DecimalType(38, 2))) \
    .filter(col('GNI') > 0) \
    .filter(col('Startup Time').isNotNull()) \
    .filter(col('Startup Procedures').isNotNull()) \
    .filter(col('Startup Cost').isNotNull())

In [70]:
showDF(countryBusinessStrtupPivot, limitRows = 500)

Unnamed: 0,Country ISO Code,Country Name,Ease of business,Startup Cost Pct of GNI,Startup Time,Startup Procedures,Business Regulation,GDP,GDP Per Capita,GNI,Startup Cost
0,BJ,Benin,149.0,3.5,8.5,6.0,3.5,14867185272.71,1259.81,14799214859.05,517972520.07
1,XC,Euro area,,3.21,9.82,5.05,,14169747200125.6,41359.73,13730527132659.5,440749920958.37
2,LY,Libya,186.0,24.6,35.0,10.0,,55047620625.16,8122.17,51755222833.29,12731784816.99
3,KZ,Kazakhstan,25.0,0.2,5.0,4.0,,213250078007.79,11518.36,163361260005.36,326722520.01
4,JM,Jamaica,71.0,4.2,3.0,2.0,,14349236596.0,4866.99,15695263912.86,659201084.34
5,NO,Norway,9.0,0.8,4.0,4.0,,494981582297.86,92556.32,441210211631.37,3529681693.05
6,AG,Antigua and Barbuda,113.0,8.0,19.0,9.0,,1525166899.84,15704.27,1611791996.79,128943359.74
7,CG,Congo,180.0,62.2,49.5,11.0,2.0,11654686638.11,2166.09,9248582343.48,5752618217.64
8,AR,Argentina,126.0,5.0,11.5,12.0,,437813398153.62,9742.46,500111583144.61,25005579157.23
9,HT,Haiti,179.0,179.7,97.0,12.0,2.0,14022671611.23,1245.01,15027198078.38,27003874946.85


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [71]:
# Here we are going to write the country dimension to an output csv file
wdicountryDimFinal \
    .coalesce(1) \
    .write.csv('/home/jovyan/work/data/CountryDim', mode='overwrite', header='true')

In [72]:
#Write the yearly totals to a CSV File
yrPivotDF \
    .select(col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('/home/jovyan/work/data/YearlyStats', mode='overwrite', header='true')

In [74]:
#Write the regional-yearly totals to a CSV File
countryPivot.filter('country_iso_code is not null') \
    .orderBy('country_iso_code','country_name', 'year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select(col('country_iso_code')
            , col('country_name')
            , col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('/home/jovyan/work/data/CountryStats', mode='overwrite', header='true')

In [75]:
countryBusinessStrtupPivot \
    .select("Country ISO Code", "Country Name", "GDP", "GDP Per Capita", "GNI", \
            "Startup Cost", "Startup Cost Pct of GNI", "Startup Time", "Startup Procedures", \
            "Business Regulation", "Ease of business") \
    .coalesce(1) \
    .write.csv('/home/jovyan/work/data/BusinessStartupData', mode='overwrite', header='true')

In [95]:
#Write the regional-yearly totals to a CSV File
regionalPivot.filter('region is not null') \
    .orderBy('region','year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select(col('region')
            , col('year')
            , col('population').cast(DecimalType(38, 2))
            , col('population_age_0_to_14').cast(DecimalType(38, 2))
            , col('population_age_15_64').cast(DecimalType(38, 2))
            , col('population_age_65_and_above').cast(DecimalType(38, 2))
            , col('broadband_subscriptions').cast(DecimalType(38, 2))
            , col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('/home/jovyan/work/data/RegionalStats', mode='overwrite', header='true')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### Do all countries have 2 character country_iso_codes ?

In [78]:
countryCodeLengthQuery = """
select 
    length(country_iso_code) as column_length
    , count(1) as cnt
from 
    wdicountry
group by 
    length(country_iso_code)
having 
    count(1) > 1
"""

showDF(spark.sql(countryCodeLengthQuery))

Unnamed: 0,column_length,cnt
0,2,262


##### Do we have duplicate records for any of the key columns ?

In [79]:
targetcnt = wdicountryDim.count()
sourcecnt = wdicountry.count()

if targetcnt != sourcecnt :
    raise ValueError('Count is not matching with source')
else:
    print('Data quality check is passed')

Data quality check is passed


In [81]:
showDF(wdicountryDim.groupBy("country_iso_code").agg(count("*").alias("cnt")).filter("cnt > 1"))

showDF(wdicountryDim.groupBy("wb_country_code").agg(count("*").alias("cnt")).filter("cnt > 1"))

showDF(wdicountryDim.groupBy("country_name").agg(count("*").alias("cnt")).filter("cnt > 1"))

Unnamed: 0,country_iso_code,cnt


Unnamed: 0,wb_country_code,cnt


Unnamed: 0,country_name,cnt


##### Any country with Country_iso_code is null?

In [82]:
wdicountryDimFinal = wdicountryDim.filter("country_iso_code is not null")

showDF(wdicountryDimFinal)

Unnamed: 0,country_iso_code,wb_country_code,country_name,country_long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,AF,AFG,Afghanistan,Islamic State of Afghanistan,South Asia,Low income
2,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
3,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
4,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
5,1A,ARB,Arab World,Arab World,,
6,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
7,AR,ARG,Argentina,Argentine Republic,,
8,AM,ARM,Armenia,Republic of Armenia,Europe & Central Asia,Lower middle income
9,AS,ASM,American Samoa,American Samoa,East Asia & Pacific,Upper middle income


In [83]:
wdicountryDimFinal.count()

262

In [84]:
# You can iterate over a dataframe that is already computed by caching it onces and using it repeatedly
yrPivotDF.cache()

#Forces the data to be cached
yrPivotDF.count()

60

##### Do we have invalid records for any of the key population_age column ?

In [85]:
yrPivotDF.filter('population_age_0_to_14 < 0').count()

0

In [86]:
yrPivotDF.filter('population_age_15_64 < 0').count()

0

In [87]:
yrPivotDF.filter('population_age_0_to_14 < 0').count()

0

In [88]:
yrPivotDF.filter('population_age_65_and_above < 0').count()

0

In [89]:
yrPivotDF.filter('population < 0').count()

0

In [90]:
yrPivotDF.filter('cellular_subscriptions < 0').count()

0

In [91]:
yrPivotDF.filter('broadband_subscriptions < 0').count()

0

In [92]:
yrPivotDF.filter('population_age_0_to_14 > population').count()

0

In [93]:
yrPivotDF.filter('population_age_15_64 > population').count()

0

In [94]:
yrPivotDF.filter('population_age_65_and_above > population').count()

0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### The data is organized in the following five tables.

##### countryDim

#### This table contains details about all the countries information such as ISO country code, World Bank country code and income group that countries falls into

##### columns:
- country_iso_code : The ISO country codes are internationally recognized codes that designate every country and it is like an                      acronym, that stands for a country or a state.
- wb_country_code  : World Bank specific country code
- name	           : Name of the country
- long_name	       : Long Name of the country
- region	       : Region in which the country falls in in
- income_group	   : Income group of the country's people

##### countryStats

#### This table contains details about the total population count, age wise population count, number of broadband connections and cellular subscriptions by country

##### columns:
- country_iso_code           : The ISO country codes are internationally recognized codes that designate every country and itis                                like an acronym, that stands for a country or a state.
- country_name               : Name of the country
- year                       : Year for which the statistics belongs to
- population                 : Total population in the country
- population_age_0_to_14     : Population of the people age from 0 to 14
- population_age_15_64       : Population of the people age from 15 to 64
- population_age_65_and_above: Population of the people age above 65
- broadband_subscriptions    : Total number of broadband subscriptions in the country
- cellular_subscriptions     : Total number of cellular subscriptions

##### RegionalStats

#### This table contains same details as CountryStats but at the regional wise granularity instead of country wise

##### columns:
- region                     : Name of the region under which the countries falls into
- year                       : Year for which the statistics belongs to
- population                 : Total population in the country
- population_age_0_to_14     : Population of the people age from 0 to 14
- population_age_15_64       : Population of the people age from 15 to 64
- population_age_65_and_above: Population of the people age above 65
- broadband_subscriptions    : Total number of broadband subscriptions in the country
- cellular_subscriptions     : Total number of cellular subscriptions

##### YearlyStats

#### This table contains same details as CountryStats but at the year wise granularity instead of country wise

- year                       : Year for which the statistics belongs to
- population                 : Total population in the country
- population_age_0_to_14     : Population of the people age from 0 to 14
- population_age_15_64       : Population of the people age from 15 to 64
- population_age_65_and_above: Population of the people age above 65
- broadband_subscriptions    : Total number of broadband subscriptions in the country
- cellular_subscriptions     : Total number of cellular subscriptions

##### BusinessStartupData

#### This table contains details about the start up statistics of the countries along with GDP and GDP per capita details

- Country ISO Code         : The ISO country codes are internationally recognized codes that designate every country and itis                                like an acronym, that stands for a country or a state.
- Country Name             : Name of the country
- GDP                      : Gross Domestic Product. GDP is the final value of the goods and services produced within the                                    geographic boundaries of a country during a specified period of time, normally a year.
- GDP Per Capita           : GDP per capita, is a measure of a country's economic output that accounts for its number of                                    people.
- GNI                      : Gross national income (GNI), the sum of a country's gross domestic product (GDP) plus net income                                (positive or negative) from abroad.
- Startup Cost             : Cost of starting a new business in the country
- Startup Cost Pct of GNI  : Startup cost percentage compared to GNI
- Startup Time             : Time required for starting up a Business
- Startup Procedures       : Indicator denoting Procedures involved in Starting up a Business
- Business Regulation      : Indicator denoting Business regulation for starting up a Business
- Ease of business         : Indicator denoting ease of doing Business in the country



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 ##### The whole ETL pipeline can be configured to load the data in Amazon Redshift and built on Amazon EMR cluster
 - Redshift: Analytical database, optimized for aggregation, also good performance for read-heavy workloads
 - Increase EMR cluster size to handle bigger volume of data
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 ##### The ETL pipeline can be set up as Airflow DAGs and be triggered based on the SLA time
 - DAG is scheduled to run every 10 minutes and can be configured to run every morning at 7 AM if required.
 - Data quality operators are used at appropriate position. In case of DAG failures email triggers can be configured to let the    team know about pipeline failures.
 * The database needed to be accessed by 100+ people.
 - We can set the concurrency limit for your Amazon Redshift cluster. While the concurrency limit is 50 parallel queries for a    single period of time, this is on a per cluster basis, meaning you can launch as many clusters as fit for you business.
 ##### Spark for ETL
 - Spark helps data scientists by supporting the entire data science workflow, from data access and integration to machine        learning and visualization using the language of choice—which is typically Python. It also provides a growing library of        machine-learning algorithms through its machine-learning library (MLlib).
 ##### Pandas
 - Pandas is the most popular python library that is used for data analysis. It provides highly optimized performance with        back-end source code is purely written in C or Python.
   - We can analyze data in pandas with:
     - Series
     - DataFrames