# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline using The World Development Indicator data (The World Development Indicators from the World Bank contain over a thousand annual indicators of economic development from hundreds of countries around the world).. This database can be used to answer questions relating how the development in the certain country is measure and what are the various censis 


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [99]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
import configparser
from datetime import datetime
import os
import sql_queries as sql
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project using the dataset given contain the 6 dataset that has certain factor to measure the various thing so the data is mapped with each other by certern key columns which are present in the data

#### Describe and Gather Data 
 The World Development Indicators from the World Bank contain over a thousand annual indicators of economic development from hundreds of countries around the world. Below are the six dataset that were given
Country.csv


CountryNotes.csv


Footnotes.csv


Indicators.csv


Series.csv


SeriesNotes.csv


In [3]:
config = configparser.ConfigParser()
config.read('capstone.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession.builder\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [5]:
sqlContext = SQLContext(spark.sparkContext)

In [6]:
csv_country = "WorldInd/Country.csv"
csv_cnotes = "WorldInd/CountryNotes.csv"
csv_fnotes = "WorldInd/Footnotes.csv"
csv_indicator = "WorldInd/Indicators.csv"
csv_series = "WorldInd/Series.csv"
csv_snotes = "WorldInd/SeriesNotes.csv"

In [7]:
df_country = pd.read_csv(csv_country)
df_cnotes = pd.read_csv(csv_cnotes)
df_fnotes = pd.read_csv(csv_fnotes)
df_indicator = pd.read_csv(csv_indicator)
df_series = pd.read_csv(csv_series)
df_snotes = pd.read_csv(csv_snotes)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Converting the pandas dataframe to spark dataframe (doing this because the directly reading tfrom the spark was causing the wrong data to be in the wrong column)
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
checking data taking only required column and converting the data to proper datatype.
Drop the columns which are not require

In [8]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)


# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
        struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)


In [9]:
df_country = pandas_to_spark(df_country)

In [10]:
df_cnotes = pandas_to_spark(df_cnotes)
df_fnotes = pandas_to_spark(df_fnotes.head(5000))
df_indicator = pandas_to_spark(df_indicator.head(5000))
df_series = pandas_to_spark(df_series)
df_snotes = pandas_to_spark(df_snotes)


In [11]:
# PK in country = (CountryCode)

In [12]:
df_country.printSchema()

root
 |-- CountryCode: string (nullable = true)
 |-- ShortName: string (nullable = true)
 |-- TableName: string (nullable = true)
 |-- LongName: string (nullable = true)
 |-- Alpha2Code: string (nullable = true)
 |-- CurrencyUnit: string (nullable = true)
 |-- SpecialNotes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)
 |-- Wb2Code: string (nullable = true)
 |-- NationalAccountsBaseYear: string (nullable = true)
 |-- NationalAccountsReferenceYear: string (nullable = true)
 |-- SnaPriceValuation: string (nullable = true)
 |-- LendingCategory: string (nullable = true)
 |-- OtherGroups: string (nullable = true)
 |-- SystemOfNationalAccounts: string (nullable = true)
 |-- AlternativeConversionFactor: string (nullable = true)
 |-- PppSurveyYear: string (nullable = true)
 |-- BalanceOfPaymentsManualInUse: string (nullable = true)
 |-- ExternalDebtReportingStatus: string (nullable = true)
 |-- SystemOfTrade: string (nullable = true)


In [13]:
df_country.count()

247

In [14]:
# PK in cnotes = (CountryCode, SeriesCode)

In [15]:
df_cnotes.printSchema()

root
 |-- Countrycode: string (nullable = true)
 |-- Seriescode: string (nullable = true)
 |-- Description: string (nullable = true)



In [16]:
df_cnotes.count()

4857

In [17]:
# PK for fnotes = (Countrycode,Year)

In [18]:
df_fnotes.printSchema()

root
 |-- Countrycode: string (nullable = true)
 |-- Seriescode: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Description: string (nullable = true)



In [19]:
df_fnotes.count()

5000

In [20]:
# PK in indicator = (CountryCode, IndicatorCode)

In [21]:
df_indicator.printSchema()

root
 |-- CountryName: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- IndicatorName: string (nullable = true)
 |-- IndicatorCode: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Value: float (nullable = true)



In [22]:
# Pk in Series = (Seriescode,Indicator)

In [23]:
df_series.printSchema()

root
 |-- SeriesCode: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- IndicatorName: string (nullable = true)
 |-- ShortDefinition: string (nullable = true)
 |-- LongDefinition: string (nullable = true)
 |-- UnitOfMeasure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- BasePeriod: string (nullable = true)
 |-- OtherNotes: float (nullable = true)
 |-- AggregationMethod: string (nullable = true)
 |-- LimitationsAndExceptions: string (nullable = true)
 |-- NotesFromOriginalSource: string (nullable = true)
 |-- GeneralComments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- StatisticalConceptAndMethodology: string (nullable = true)
 |-- DevelopmentRelevance: string (nullable = true)
 |-- RelatedSourceLinks: string (nullable = true)
 |-- OtherWebLinks: float (nullable = true)
 |-- RelatedIndicators: float (nullable = true)
 |-- LicenseType: string (nullable = true)



In [24]:
# Pk in SeriesNotes = (Seriescode,Year)

In [25]:
df_snotes.printSchema()

root
 |-- Seriescode: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Description: string (nullable = true)



In [26]:
df = df_country.toPandas()

In [27]:
df.head(1)

Unnamed: 0,CountryCode,ShortName,TableName,LongName,Alpha2Code,CurrencyUnit,SpecialNotes,Region,IncomeGroup,Wb2Code,...,GovernmentAccountingConcept,ImfDataDisseminationStandard,LatestPopulationCensus,LatestHouseholdSurvey,SourceOfMostRecentIncomeAndExpenditureData,VitalRegistrationComplete,LatestAgriculturalCensus,LatestIndustrialData,LatestTradeData,LatestWaterWithdrawalData
0,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,Fiscal year end: March 20; reporting period fo...,South Asia,Low income,AF,...,Consolidated central government,General Data Dissemination System (GDDS),1979,"Multiple Indicator Cluster Survey (MICS), 2010/11","Integrated household survey (IHS), 2008",,2013/14,,2013.0,2000.0


In [28]:
df.CountryCode.count()

247

#### Droping unrequired data

In [29]:
not_req_and_mostly_null = ['ShortDefinition','UnitOfMeasure','BasePeriod',
                           'OtherNotes','AggregationMethod','OtherNotes','AggregationMethod',
                           'LimitationsAndExceptions','NotesFromOriginalSource','GeneralComments',
                          'StatisticalConceptAndMethodology','DevelopmentRelevance','RelatedSourceLinks'
                          ,'OtherWebLinks','RelatedIndicators']

In [30]:
df_series = df_series.drop(*not_req_and_mostly_null)

In [31]:
df_series.printSchema()

root
 |-- SeriesCode: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- IndicatorName: string (nullable = true)
 |-- LongDefinition: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- LicenseType: string (nullable = true)



In [33]:
# clean country

In [34]:
not_req_country = ['SpecialNotes','NationalAccountsReferenceYear','LendingCategory',
                    'OtherGroups','AlternativeConversionFactor','PppSurveyYear',
                    'ExternalDebtReportingStatus','ExternalDebtReportingStatus',
                    'LatestHouseholdSurvey','SourceOfMostRecentIncomeAndExpenditureData',
                    'VitalRegistrationComplete','LatestAgriculturalCensus']

In [35]:
df_country = df_country.drop(*not_req_country)

In [36]:
df_country.printSchema()

root
 |-- CountryCode: string (nullable = true)
 |-- ShortName: string (nullable = true)
 |-- TableName: string (nullable = true)
 |-- LongName: string (nullable = true)
 |-- Alpha2Code: string (nullable = true)
 |-- CurrencyUnit: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)
 |-- Wb2Code: string (nullable = true)
 |-- NationalAccountsBaseYear: string (nullable = true)
 |-- SnaPriceValuation: string (nullable = true)
 |-- SystemOfNationalAccounts: string (nullable = true)
 |-- BalanceOfPaymentsManualInUse: string (nullable = true)
 |-- SystemOfTrade: string (nullable = true)
 |-- GovernmentAccountingConcept: string (nullable = true)
 |-- ImfDataDisseminationStandard: string (nullable = true)
 |-- LatestPopulationCensus: string (nullable = true)
 |-- LatestIndustrialData: float (nullable = true)
 |-- LatestTradeData: float (nullable = true)
 |-- LatestWaterWithdrawalData: float (nullable = true)



#### Datatype assigning step

In [37]:
def cast_type(df, cols):
    """
    Convert the types of the columns according to the configuration supplied in the cols dictionary in the format {"column_name": type}
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`dict`): Dictionary in the format of {"column_name": type} indicating what columns and types they should be converted to
    """
    for k,v in cols.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df

In [38]:
int_cols_country = ['NationalAccountsBaseYear','LatestPopulationCensus','LatestIndustrialData'
                   ,'LatestTradeData','LatestWaterWithdrawalData']

In [39]:
df_country = cast_type(df_country, dict(zip(int_cols_country, len(int_cols_country)*[IntegerType()])))

In [40]:
df_country.printSchema()

root
 |-- CountryCode: string (nullable = true)
 |-- ShortName: string (nullable = true)
 |-- TableName: string (nullable = true)
 |-- LongName: string (nullable = true)
 |-- Alpha2Code: string (nullable = true)
 |-- CurrencyUnit: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)
 |-- Wb2Code: string (nullable = true)
 |-- NationalAccountsBaseYear: integer (nullable = true)
 |-- SnaPriceValuation: string (nullable = true)
 |-- SystemOfNationalAccounts: string (nullable = true)
 |-- BalanceOfPaymentsManualInUse: string (nullable = true)
 |-- SystemOfTrade: string (nullable = true)
 |-- GovernmentAccountingConcept: string (nullable = true)
 |-- ImfDataDisseminationStandard: string (nullable = true)
 |-- LatestPopulationCensus: integer (nullable = true)
 |-- LatestIndustrialData: integer (nullable = true)
 |-- LatestTradeData: integer (nullable = true)
 |-- LatestWaterWithdrawalData: integer (nullable = true)



In [41]:
df_country=df_country.withColumnRenamed("CountryName","MediumName")

### Check for null after removing the columns not required

In [42]:
def countNull(df):
    nulls_l=[]
    for col in df.columns:
        nulls_d={}
        n_null=df.select(col).filter(F.col(col).isNull()).count()
        if n_null > 0:
            nulls_d["Column"]=col
            nulls_d["Nulls"]=n_null
            nulls_l.append(nulls_d)
    if len(nulls_l)>0:
        display(pd.DataFrame(nulls_l))
    else: print("No Nulls")

In [43]:
countNull(df_country)

Unnamed: 0,Column,Nulls
0,NationalAccountsBaseYear,109
1,LatestPopulationCensus,39


In [44]:
countNull(df_cnotes)

No Nulls


In [45]:
countNull(df_fnotes)

No Nulls


In [46]:
countNull(df_indicator)

No Nulls


In [47]:
countNull(df_series)

No Nulls


In [48]:
countNull(df_snotes)

No Nulls


### checking the nulls in detail in  the country data

In [49]:
df_country.withColumn('numNulls', sum(df_country[col].isNull().cast('int') for col in df_country.columns))\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().toPandas()

Unnamed: 0,numNulls,count
0,2,37
1,1,74
2,0,136


In [50]:
df_country.withColumn('numNulls', sum(df_country[col].isNull().cast('int') for col in df_country.columns)).orderBy(F.desc("numNulls")).limit(10).toPandas()

Unnamed: 0,CountryCode,ShortName,TableName,LongName,Alpha2Code,CurrencyUnit,Region,IncomeGroup,Wb2Code,NationalAccountsBaseYear,...,SystemOfNationalAccounts,BalanceOfPaymentsManualInUse,SystemOfTrade,GovernmentAccountingConcept,ImfDataDisseminationStandard,LatestPopulationCensus,LatestIndustrialData,LatestTradeData,LatestWaterWithdrawalData,numNulls
0,EAS,East Asia & Pacific (all income levels),East Asia & Pacific (all income levels),East Asia & Pacific (all income levels),Z4,,,,Z4,,...,,,,,,,0,0,0,2
1,HIC,High income,High income,High income,XD,,,,XD,,...,,,,,,,0,0,0,2
2,EAP,East Asia & Pacific (developing only),East Asia & Pacific,East Asia & Pacific (developing only),4E,,,,4E,,...,,,,,,,0,0,0,2
3,CEB,Central Europe and the Baltics,Central Europe and the Baltics,Central Europe and the Baltics,B8,,,,B8,,...,,,,,,,0,0,0,2
4,EMU,Euro area,Euro area,Euro area,XC,,,,XC,,...,,,,,,,0,0,0,2
5,CSS,Caribbean small states,Caribbean small states,Caribbean small states,S3,,,,S3,,...,,,,,,,0,0,0,2
6,ECS,Europe & Central Asia (all income levels),Europe & Central Asia (all income levels),Europe & Central Asia (all income levels),Z7,,,,Z7,,...,,,,,,,0,0,0,2
7,EUU,European Union,European Union,European Union,EU,,,,EU,,...,,,,,,,0,0,0,2
8,FCS,Fragile and conflict affected situations,Fragile and conflict affected situations,Fragile situations,F1,,,,F1,,...,,,,,,,0,0,0,2
9,ARB,Arab World,Arab World,Arab World,1A,,,,1A,,...,,,,,,,0,0,0,2


In [51]:
df_country=df_country.filter(df_country.CurrencyUnit.isNotNull())

In [52]:
df_country=df_country.filter(df_country.CountryCode.isNotNull())

In [53]:
df_country.count()

247

In [54]:
df_country.printSchema()

root
 |-- CountryCode: string (nullable = true)
 |-- ShortName: string (nullable = true)
 |-- TableName: string (nullable = true)
 |-- LongName: string (nullable = true)
 |-- Alpha2Code: string (nullable = true)
 |-- CurrencyUnit: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- IncomeGroup: string (nullable = true)
 |-- Wb2Code: string (nullable = true)
 |-- NationalAccountsBaseYear: integer (nullable = true)
 |-- SnaPriceValuation: string (nullable = true)
 |-- SystemOfNationalAccounts: string (nullable = true)
 |-- BalanceOfPaymentsManualInUse: string (nullable = true)
 |-- SystemOfTrade: string (nullable = true)
 |-- GovernmentAccountingConcept: string (nullable = true)
 |-- ImfDataDisseminationStandard: string (nullable = true)
 |-- LatestPopulationCensus: integer (nullable = true)
 |-- LatestIndustrialData: integer (nullable = true)
 |-- LatestTradeData: integer (nullable = true)
 |-- LatestWaterWithdrawalData: integer (nullable = true)



### Building the database
1. creating tables 
2. showing the example to insert it 

In [55]:
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS capstone_proj")
cur.execute("CREATE DATABASE capstone_proj WITH ENCODING 'utf8' TEMPLATE template0")
conn.close()
cur.close()

In [56]:
conn = psycopg2.connect("host=127.0.0.1 dbname=capstone_proj user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

In [57]:
cur.execute(sql.country_create_table)
cur.execute(sql.getStruct,("country",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,country_code,character varying,NO
1,short_name,character varying,NO
2,medium_name,character varying,NO
3,long_name,character varying,YES
4,alpha2_code,character varying,YES
5,currency_unit,character varying,NO
6,region,character varying,YES
7,income_group,character varying,YES
8,wb2_code,character varying,NO
9,national_account_base,integer,YES


In [58]:
cur.execute(sql.cnotes_create_table)
cur.execute(sql.getStruct,("country_notes",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,country_code,character varying,NO
1,series_code,character varying,NO
2,description,character varying,YES


In [59]:
cur.execute(sql.fnotes_create_table)
cur.execute(sql.getStruct,("foot_notes",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,country_code,character varying,NO
1,series_code,character varying,NO
2,year,character varying,NO
3,description,character varying,YES


In [60]:
cur.execute(sql.indicator_create_table)
cur.execute(sql.getStruct,("indicator",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,country_name,character varying,NO
1,country_code,character varying,NO
2,indicator_name,character varying,NO
3,indicator_code,character varying,NO
4,year,integer,YES
5,value,double precision,YES


In [61]:
cur.execute(sql.series_create_table)
cur.execute(sql.getStruct,("series",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,series_code,character varying,NO
1,topic,character varying,YES
2,indicator_name,character varying,NO
3,long_definition,character varying,YES
4,periodicity,character varying,YES
5,source,character varying,YES
6,license_type,character varying,YES


In [62]:
cur.execute(sql.snotes_create_table)
cur.execute(sql.getStruct,("series_notes",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,series_code,character varying,NO
1,year,character varying,NO
2,description,character varying,YES


In [63]:
conn.close()
cur.close()

In [69]:
conn = psycopg2.connect("host=127.0.0.1 dbname=capstone_proj user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

In [73]:
# for example inserting only series_note data to the sql
cols = ",".join([str(i) for i in df_snotes.columns])

In [78]:
for i,row in df_snotes.toPandas().iterrows():
    sql = "INSERT INTO series_notes (series_code,year,description) VALUES (" + "%s,"*(len(row)-1) + "%s)"
    cur.execute(sql, tuple(row))

In [85]:

try: 
    cur.execute("SELECT * FROM series_notes LIMIT 5;")
    
    
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()


('SP.ADO.TFRT', 'YR1960', 'Interpolated using data for 1957 and 1962.')
('SP.DYN.AMRT.FE', 'YR1960', 'Interpolated using data for 1957 and 1962, if the data source is United Nations World Population Prospects.')
('SP.DYN.AMRT.MA', 'YR1960', 'Interpolated using data for 1957 and 1962, if the data source is United Nations World Population Prospects.')
('SP.DYN.TO65.FE.ZS', 'YR1960', 'Interpolated using data for 1957 and 1962.')
('SP.DYN.TO65.MA.ZS', 'YR1960', 'Interpolated using data for 1957 and 1962.')


### Step 3: Showing above defined the Data Mode

##### Conceptual Data Model 

We have total of six tables
- *Country*_
       country_code VARCHAR NOT NULL PRIMARY KEY,
       short_name VARCHAR NOT NULL,
       medium_name VARCHAR NOT NULL,
       long_name VARCHAR,
       alpha2_code VARCHAR,
       currency_unit VARCHAR NOT NULL,
       region VARCHAR,
       income_group VARCHAR,
       wb2_code VARCHAR NOT NULL,
       national_account_Base INT,
       Sna_price_valuation VARCHAR,
       system_national_accounts VARCHAR,
       balance_pay_manual VARCHAR,
       system_trade VARCHAR,
       gov_account_concept VARCHAR,
       imf_data VARCHAR,
       latest_pop_census INT,
       latest_industrial_data INT,
       latest_trade_data INT,
       latest_water_with_Data INT

- *CountryNotes*_ 
       country_code VARCHAR NOT NULL,
       series_code VARCHAR NOT NULL,
       description VARCHAR,
       PRIMARY KEY (country_code,series_code)
       
Dimension table        
- *Footnotes*_
       country_code VARCHAR NOT NULL,
       series_code VARCHAR NOT NULL,
       year VARCHAR NOT NULL,
       description VARCHAR,
       PRIMARY KEY (country_code,year)

The fact table contain the indicators
- *Indicators*_
       country_name  VARCHAR NOT NULL,
       country_code VARCHAR NOT NULL,
       indicator_name VARCHAR NOT NULL,
       indicator_code VARCHAR NOT NULL,
       year INT,
       value FLOAT,
       PRIMARY KEY (indicator_code,country_code)
       


The fact table Series join the two table Footnotes and seriesnotes with series_code 
unique_id 
- *Series*_
       series_code VARCHAR NOT NULL PRIMARY KEY,
       topic VARCHAR,
       indicator_name VARCHAR NOT NULL
       long_definition VARCHAR,
       periodicity VARCHAR,
       source VARCHAR,
       license_type VARCHAR

Dimension table series_notes 
- *SeriesNotes*_
       series_code VARCHAR NOT NULL,
       year VARCHAR NOT NULL,
       description VARCHAR,
       PRIMARY KEY (year,series_code)
       


I didn't split them up coz these data are connected based on the unique and match the columns present in the data. There is a lot of information in each table but at most point they are about very different specific topics in each column. 

##### Mapping Out Data Pipelines

1. Cleaning already done above for all the tables

2. Create tables that are mention above

### Step 4: Run Pipelines to Model the Data¶


In [1]:
# here below data is parquet localy can be done to s3 bucket 
# change output_path to "s3a://bucket-name/folder"

In [89]:
output_path="result/"

In [90]:
df_country.write.mode("append").partitionBy("CountryCode").parquet(os.path.join(output_path,'country/country.parquet'),'overwrite')

In [91]:
df_cnotes.write.mode("append").partitionBy("CountryCode","SeriesCode").parquet(os.path.join(output_path,'country_notes/country_notes.parquet'),'overwrite')

In [94]:
df_fnotes.write.mode("append").partitionBy("CountryCode","Year").parquet(os.path.join(output_path,'foot_notes/foot_notes.parquet'),'overwrite')

In [None]:
df_indicator.write.mode("append").partitionBy("IndicatorCode","CountryCode").parquet(os.path.join(output_path,'indicator/indicator.parquet'),'overwrite')

In [None]:
df_series.write.mode("append").partitionBy("SeriesCode").parquet(os.path.join(output_path,'series/series.parquet'),'overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:

In [98]:
def quality_check(df, description):
    '''
    Arguments
    df: Spark dataframe, description of Spark datafram
    
    Return
    Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_country, "country table")
quality_check(df_cnotes, "country_notes table")
quality_check(df_fnotes, "foot_notes table")
quality_check(df_indicator, "indicator table")
quality_check(df_series, "series table")
quality_check(df_snotes, "series notes table")

Data quality check passed for country table with 247 records
Data quality check passed for country_notes table with 4857 records
Data quality check passed for foot_notes table with 5000 records
Data quality check passed for indicator table with 5000 records
Data quality check passed for series table with 1345 records
Data quality check passed for series notes table with 369 records


0

#### 4.3 Data dictionary 
Table is generated from the df-country dataframe
- *Country*_
       country_code= unique char(3) value,
       short_name = unique short name of country,
       medium_name = unique medium name of the country,
       long_name = unique long name of the country,
       alpha2_code = unique char(2) value,
       currency_unit = currency unit of the country,
       region = area of the region,
       income_group =  income group middle/low/high ,
       wb2_code = unique char(2) value,
       national_account_Base = coantin the year ,
       Sna_price_valuation = price valuation,
       system_national_accounts = history of account,
       balance_pay_manual = balance detail,
       system_trade = trade description,
       gov_account_concept = account concept of the gov,
       imf_data = imf data value,
       latest_pop_census = population census,
       latest_industrial_data = industry census,
       latest_trade_data = trade census,
       latest_water_with_Data = water census

- *CountryNotes*_ 
       country_code unique char(3) value,
       series_code = Serial code,
       description = description of the country,
       PRIMARY KEY (country_code,series_code)
       
Table is generated from the df-fnotes dataframe
- *Footnotes*_
       country_code =  unique char(3) value,
       series_code = Serial code,
       year = year,
       description = description of the country ,
       PRIMARY KEY (country_code,year)

Table is generated from the df-indicator dataframe
- *Indicators*_
       country_name = country name,
       country_code  = unique char(3) value,
       indicator_name = indicator name in the country,
       indicator_code = indicator code of the indicator,
       year = year,
       value = value of the indicator,
       PRIMARY KEY (indicator_code,country_code)
       


Table is generated from the df-series dataframe
 
- *Series*_
       series_code = Serial code,
       topic = topic of series,
       indicator_name indicator name in the country,
       long_definition = long def,
       periodicity = period of the series,
       source = source of the data,
       license_type = type of license in the country

Table is generated from the df-snotes dataframe
- *SeriesNotes*_
       series_code VARCHAR NOT NULL,
       year = year,
       description = description,
       PRIMARY KEY (year,series_code)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
 
 
 
#### Techonlogies used
- <strong>Spark</strong> - fast data transformation of the large data in distributed enviroment
- <strong>python</strong> - for easy functionality of the python with spark using pyspark
- <strong>Postgres</strong> - building a relational database

#### Updating cycle
- country = popuation, industrial, and other resources data change every year/monthly
- CountryNotes = mostly remain constant
- footnotes = data change every year by unesco 
- indicator = update the various indicator present every year/monthly
- series = mostly remain constant 
- seriesnotes = constant

#### Scenarios
* <strong>The data was increased by 100x.</strong> 
  if data increase than the create more node using spark cluster
* <strong>The data populates a dashboard that must be updated on a daily basis by 7am every day.</strong>
  build a data pipeline that run on a specific schedule
* <strong>The database needed to be accessed by 100+ people.</strong>
  using external service of the handling the server so that the data is conserve and live at all the time and can reduce latency