# Project Title
### Data Engineering Capstone Project

#### Project Summary
This is my capstone project of the Udacity Data Engineering Nanodegree. The choice was given to complete the project with data provided by Udacity, or to define your own scope and data. In both cases the same outline was to be followed, being: 
1) Gather data
2) Explore and Assess the data
3) Define the Data model
4) Run ETL to Model the Data
5) Write-up the project
I decided to define my own scope and data. I chose to combine immigration data from the Netherlands with data from the yearly World Happiness Report and make a Data model such that questions could be answered like:
- Is there a link between the happiness score and the number of immigrants with that country of birth?
- What are the demographics of people immigrating into the Netherlands? 
- Is here a trend in the immigration into the Netherlands? If so, what kind of? Volume of people? Country of origin? Gender? Age
The final Data Model was a star schema with the immigration data as the fact table and the happiness data as the dimension table. Because the immigration data was provided with coded entries, tables with meta-data were required to transform the immigration into information in English. The country of birth in the immigration data was given with the Dutch naming of the countries. To enable comparison with the Happiness Report data, a table with the translation of the countries was required. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import json
import csv
import os
import glob

### Project scope and data gathering

#### Scope
In this project I will construct analytics tables that can be used by a data analyst to answer questions about immigration into the Netherlands. Examples are:
- Is there a link between the happiness score and the number of immigrants with that country of birth?
- What are the demographics of people immigrating into the Netherlands? 
- Is here a trend in the immigration into the Netherlands? If so, what kind of? Volume of people? Country of origin? Gender? Age?

Rather than using the data provided in the Udacity Project, I decided to use data from the Netherlands because I live there. It also gives an opportunity to work with data gathered from Dutch data sources which could be data sources I would use in a possible future carreer in data engineering/analytics.

The end solution a star schema with the two tables: a fact table with the immigration data and a dimension table with the World Happiness Report data. Whereas the result is very simple, some ETL is required since:
- The immigration data contained all the information in codes. The codes had to be transformed into English with the provided meta-data.
- The Dutch immigration data contained the country names in Dutch. They had to be converted into English to enable comparisons with the Happiness Report data. This was done through a translation table.

The tools I used were Spark and the final tables will be written to a bucket in S3. Spark was selected because there are more than 1 million entries and Spark is suitable for processing big data. However, I didn't manage to fully leverage Spark because I didn't manage to read the json-files due to its nested nature. Advice is welcome. The final tables will be written to a bucket in S3 to make it publically accessible for querying.

#### Data gathering
Two data sets plus corresponding metadata were gathered:
1. The immigration data was gathered through the dataportal "Dataportaal" of Centraal Bureau voor de Statistiek (CBS) which is the Central Bureau of Statistics of the Netherlands. The webpage was https://opendata.cbs.nl/portal.html?_la=nl&_catalog=CBS&tableId=03742&_theme=66. The data was downloaded through "Onbewerkte dataset"= raw dataset, and by selecting the years of interest. The file format selected was json. The data gathered ranged from 2018 up and until 2020. Each year was in a separate json-file. The data included: ID number, gender, age on 31st of December, marital status, country of birth and period/year.
2. Metadata of the immigration dataset was gathered from the same website. It was manually transformed in tables containing keys to convert the codes used in the immigration data into English. As an example, the gender was either T001038, 3000 or 4000 which translate into "total of both males and females", male or female.
3. World Happiness Report data from the years 2018 up and until 2020 were gathered from Kaggle https://www.kaggle.com/datasets https://www.kaggle.com/unsdsn/world-happiness. Each year was in a separate csv-file. The data contained at least: country name, happiness score and parameters that can affect happiness, e.g. life expectancy. 

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
spark

### Step 2: Explore and Assess the Data

#### Loading and Exploring the data

##### Immigration data

The data of each year is stored in a separate file. First, load one file to assess the structure of the file and the type of data in it.

In [4]:
# Path to the immigration data
file= 'Immigration_data/03742_TypedDataSet_2019.json'

In [5]:
# Import the json-files with immigration data
immigration_json= []
for line in open(file, 'r'):
    immigration_json.append(json.loads(line))

Assess the structure of the file.

In [6]:
#immigration_json

In [7]:
type(immigration_json)

list

In [8]:
len(immigration_json)

1

In [9]:
type(immigration_json[0])

dict

In [10]:
immigration_json[0].keys()

dict_keys(['value'])

In [11]:
immigration_dict= immigration_json[0].get('value')

In [12]:
imm_keys= immigration_dict.keys()

In [13]:
# Look at an example
immigration_dict.get('24')

{'ID': 24,
 'Geslacht': 'T001038',
 'LeeftijdOp31December': '10000',
 'BurgerlijkeStaat': 'T001019',
 'Geboorteland': 'T001175',
 'Perioden': '2019JJ00',
 'Immigratie_1': 269064}

In [14]:
immigration_dict.get('24').keys()

dict_keys(['ID', 'Geslacht', 'LeeftijdOp31December', 'BurgerlijkeStaat', 'Geboorteland', 'Perioden', 'Immigratie_1'])

In [15]:
# Create an empty dataframe to store the immigration data in
df_immigration= pd.DataFrame(columns=['ID', 'Geslacht', 'LeeftijdOp31December', 'BurgerlijkeStaat', 'Geboorteland', 'Perioden', 'Immigratie_1'])
df_immigration

Unnamed: 0,ID,Geslacht,LeeftijdOp31December,BurgerlijkeStaat,Geboorteland,Perioden,Immigratie_1


Test loading the data with a few entries.

In [16]:
# Append all the immigration data to that dataframe
a= 100000 #100000 #300000 # 400000
for key in list(imm_keys)[a: a+5]:
#for key in imm_keys:
    print(key)
    df_immigration = df_immigration.append(immigration_dict.get(key), ignore_index=True)

2600024
2600050
2600076
2600102
2600128


In [17]:
df_immigration

Unnamed: 0,ID,Geslacht,LeeftijdOp31December,BurgerlijkeStaat,Geboorteland,Perioden,Immigratie_1
0,2600024,T001038,17500,T001019,G008586,2019JJ00,0
1,2600050,T001038,17500,T001019,G008587,2019JJ00,0
2,2600076,T001038,17500,T001019,G008588,2019JJ00,0
3,2600102,T001038,17500,T001019,G008589,2019JJ00,0
4,2600128,T001038,17500,T001019,G008590,2019JJ00,0


In [18]:
df_immigration.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 7 columns):
ID                      5 non-null object
Geslacht                5 non-null object
LeeftijdOp31December    5 non-null object
BurgerlijkeStaat        5 non-null object
Geboorteland            5 non-null object
Perioden                5 non-null object
Immigratie_1            5 non-null object
dtypes: object(7)
memory usage: 360.0+ bytes


In [19]:
# Translate the column names into English
df_immigration.rename(columns= {'Geslacht' : 'Gender', 'LeeftijdOp31December': 'Age', 'BurgerlijkeStaat': 'Marital_status', 'Geboorteland': 'Country_birth', 
                                'Perioden': 'Period', 'Immigratie_1' : 'ImmigrationVolume'}, inplace= True)

In [20]:
df_immigration.columns

Index(['ID', 'Gender', 'Age', 'Marital_status', 'Country_birth', 'Period',
       'ImmigrationVolume'],
      dtype='object')

In [21]:
df_immigration['ImmigrationVolume'].unique()

array([0], dtype=object)

In [22]:
df_immigration.isna().sum()

ID                   0
Gender               0
Age                  0
Marital_status       0
Country_birth        0
Period               0
ImmigrationVolume    0
dtype: int64

Observations:
- There are rows with zero people immigrating.
- There are rows with None in number of people immigrating.

Loading of the data with only the entries that have >0 people.

In [23]:
# Create an empty dataframe to store the immigration data in
df_immigration= pd.DataFrame(columns=['ID', 'Geslacht', 'LeeftijdOp31December', 'BurgerlijkeStaat', 'Geboorteland', 'Perioden', 'Immigratie_1'])
df_immigration

Unnamed: 0,ID,Geslacht,LeeftijdOp31December,BurgerlijkeStaat,Geboorteland,Perioden,Immigratie_1


In [24]:
# Append all the immigration data to that dataframe
a= 100000 #100000 #300000 # 400000
#for key in list(imm_keys)[a: a+5]:
for key in list(imm_keys)[a: a+5]:
#for key in imm_keys:
    print(key)
    # Only append if the ImmigrationVolume given by column Immigratie_1 is greater than zero
    if immigration_dict.get(key).get('Immigratie_1') != None and immigration_dict.get(key).get('Immigratie_1') > 0 and int(immigration_dict.get(key).get('LeeftijdOp31December')) < 19906:
#    if immigration_dict.get(key).get('Immigratie_1') > 0:
        df_immigration = df_immigration.append(immigration_dict.get(key), ignore_index=True)

2600024
2600050
2600076
2600102
2600128


In [25]:
df_immigration

Unnamed: 0,ID,Geslacht,LeeftijdOp31December,BurgerlijkeStaat,Geboorteland,Perioden,Immigratie_1


##### Immigration metadata

The immigration metadata is all contained in a single csv-file. Certain rows contain information about certain information: age, gender, marital status, country of birth and period over which the immigration had taken place. Each piece of information will be stored in a different table.

In [26]:
# Path to metadata
metadata_path= 'input_other/03742_metadata.csv'

In [27]:
df_meta_gender= pd.read_csv(metadata_path, sep=';', skiprows= 17, nrows= 3).iloc[:, 0:2]
df_meta_gender

Unnamed: 0,Key,Title
0,T001038,Totaal mannen en vrouwen
1,3000,Mannen
2,4000,Vrouwen


In [28]:
df_meta_gender.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 2 columns):
Key      3 non-null object
Title    3 non-null object
dtypes: object(2)
memory usage: 128.0+ bytes


In [29]:
df_meta_age= pd.read_csv(metadata_path, sep=';', skiprows= 22, nrows= 128).iloc[:,0:2]
df_meta_age

Unnamed: 0,Key,Title
0,10000,Totaal
1,10010,0 jaar
2,10100,1 jaar
3,10200,2 jaar
4,10300,3 jaar
5,10400,4 jaar
6,10500,5 jaar
7,10600,6 jaar
8,10700,7 jaar
9,10800,8 jaar


In [30]:
df_meta_age.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 128 entries, 0 to 127
Data columns (total 2 columns):
Key      128 non-null int64
Title    128 non-null object
dtypes: int64(1), object(1)
memory usage: 2.1+ KB


Observations:
- There are categories per year but also aggregates over 5 years and the total

In [31]:
df_meta_marital= pd.read_csv(metadata_path, sep=';', skiprows= 152, nrows= 5)
df_meta_marital

Unnamed: 0,Key,Title,Description
0,T001019,Totaal burgerlijke staat,
1,1010,Ongehuwd,Burgerlijke staat die aangeeft dat een persoon...
2,1020,Gehuwd,Wettig gehuwd plus partnerschap.\n\nTrendbreuk...
3,1050,Verweduwd,Verweduwd na wettig huwelijk plus verweduwd na...
4,1080,Gescheiden,Gescheiden na wettig huwelijk plus gescheiden ...


In [32]:
df_meta_marital.iloc[1,0]

'1010   '

In [33]:
df_meta_marital.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 3 columns):
Key            5 non-null object
Title          5 non-null object
Description    4 non-null object
dtypes: object(3)
memory usage: 200.0+ bytes


In [34]:
df_meta_birthcountry= pd.read_csv(metadata_path, sep=';', skiprows= 159, nrows= 263)
df_meta_birthcountry.head()

Unnamed: 0,Key,Title,Description
0,T001175,Totaal,
1,G000001,Westerse geboortelanden (incl Nederland),Westerse geboortelanden (inclusief Nederland) ...
2,G000002,Niet-westerse geboortelanden,"Landen in Afrika, Latijns-Amerika en Azië (exc..."
3,G000003,Geboorteland onbekend,
4,G008519,Afrika,


In [35]:
df_meta_birthcountry.tail()

Unnamed: 0,Key,Title,Description
258,G008787,"Zuid-Afrika, land",
259,G008789,Zuid-Korea,"Zuid-Korea, Korea."
260,G008786,Zuid-Soedan,Nieuw land ontstaan na afsplitsing van Soedan ...
261,G008790,Zweden,
262,G008791,Zwitserland,


In [36]:
countries_Dutch= np.sort(df_meta_birthcountry['Title'].unique())
countries_Dutch

array(['(voormalig) Joegoslavië', '(voormalig) Tsjecho-Slowakije',
       '(voormalige) Nederlandse Antillen, Aruba',
       '(voormalige) Sovjet-Unie', 'Afghanistan', 'Afrika', 'Albanië',
       'Algerije', 'Amerika', 'Amerikaans Samoa',
       'Amerikaanse Maagdeneilanden', 'Andorra', 'Angola', 'Anguilla',
       'Antarctica', 'Antigua en Barbuda', 'Argentinië', 'Armenië',
       'Aruba', 'Australië', 'Azerbeidzjan', 'Azië', 'Bahamas', 'Bahrein',
       'Bangladesh', 'Barbados', 'België', 'Belize', 'Benin', 'Bermuda',
       'Bhutan', 'Bolivia', 'Bosnië-Herzegovina', 'Botswana', 'Brazilië',
       'Brits Territorium in de Indische Oceaan', 'Britse Maagdeneilanden',
       'Brunei', 'Bulgarije', 'Burkina Faso', 'Burundi', 'Cambodja',
       'Canada', 'Caribisch Nederland', 'Caymaneilanden',
       'Centraal-Afrikaanse Republiek', 'Chili', 'China', 'Colombia',
       'Comoren', 'Congo', 'Congo (Democratische Republiek)',
       'Cookeilanden', 'Costa Rica', 'Cuba', 'Curaçao', 'Cyprus',

In [37]:
countries_Dutch.shape[0]

263

In [38]:
df_meta_period= pd.read_csv(metadata_path, sep=';', skiprows= 424, nrows= 26)
df_meta_period

Unnamed: 0,Key,Title,Description,Status
0,1995JJ00,1995,,Definitief
1,1996JJ00,1996,,Definitief
2,1997JJ00,1997,,Definitief
3,1998JJ00,1998,,Definitief
4,1999JJ00,1999,,Definitief
5,2000JJ00,2000,,Definitief
6,2001JJ00,2001,,Definitief
7,2002JJ00,2002,,Definitief
8,2003JJ00,2003,,Definitief
9,2004JJ00,2004,,Definitief


In [39]:
df_meta_period.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 26 entries, 0 to 25
Data columns (total 4 columns):
Key            26 non-null object
Title          26 non-null int64
Description    0 non-null float64
Status         26 non-null object
dtypes: float64(1), int64(1), object(2)
memory usage: 912.0+ bytes


##### World Happiness Data

##### Inspect a single file to define cleaning

In [40]:
happiness2020= pd.read_csv('WorldHappinessReport/2020.csv')

In [41]:
happiness2020.sort_values('Ladder score', ascending= False).head()

Unnamed: 0,Country name,Regional indicator,Ladder score,Standard error of ladder score,upperwhisker,lowerwhisker,Logged GDP per capita,Social support,Healthy life expectancy,Freedom to make life choices,Generosity,Perceptions of corruption,Ladder score in Dystopia,Explained by: Log GDP per capita,Explained by: Social support,Explained by: Healthy life expectancy,Explained by: Freedom to make life choices,Explained by: Generosity,Explained by: Perceptions of corruption,Dystopia + residual
0,Finland,Western Europe,7.8087,0.031156,7.869766,7.747634,10.639267,0.95433,71.900825,0.949172,-0.059482,0.195445,1.972317,1.28519,1.499526,0.961271,0.662317,0.15967,0.477857,2.762835
1,Denmark,Western Europe,7.6456,0.033492,7.711245,7.579955,10.774001,0.955991,72.402504,0.951444,0.066202,0.168489,1.972317,1.326949,1.503449,0.979333,0.66504,0.242793,0.49526,2.432741
2,Switzerland,Western Europe,7.5599,0.035014,7.628528,7.491272,10.979933,0.942847,74.102448,0.921337,0.105911,0.303728,1.972317,1.390774,1.472403,1.040533,0.628954,0.269056,0.407946,2.350267
3,Iceland,Western Europe,7.5045,0.059616,7.621347,7.387653,10.772559,0.97467,73.0,0.948892,0.246944,0.71171,1.972317,1.326502,1.547567,1.000843,0.661981,0.36233,0.144541,2.460688
4,Norway,Western Europe,7.488,0.034837,7.556281,7.419719,11.087804,0.952487,73.200783,0.95575,0.134533,0.263218,1.972317,1.424207,1.495173,1.008072,0.670201,0.287985,0.434101,2.168266


In [42]:
happiness2020 = happiness2020.sort_values('Ladder score', ascending= False)
happiness2020 = happiness2020.reset_index()
happiness2020 = happiness2020.rename(columns={"index":"Overall rank"})
happiness2020.head()

Unnamed: 0,Overall rank,Country name,Regional indicator,Ladder score,Standard error of ladder score,upperwhisker,lowerwhisker,Logged GDP per capita,Social support,Healthy life expectancy,...,Generosity,Perceptions of corruption,Ladder score in Dystopia,Explained by: Log GDP per capita,Explained by: Social support,Explained by: Healthy life expectancy,Explained by: Freedom to make life choices,Explained by: Generosity,Explained by: Perceptions of corruption,Dystopia + residual
0,0,Finland,Western Europe,7.8087,0.031156,7.869766,7.747634,10.639267,0.95433,71.900825,...,-0.059482,0.195445,1.972317,1.28519,1.499526,0.961271,0.662317,0.15967,0.477857,2.762835
1,1,Denmark,Western Europe,7.6456,0.033492,7.711245,7.579955,10.774001,0.955991,72.402504,...,0.066202,0.168489,1.972317,1.326949,1.503449,0.979333,0.66504,0.242793,0.49526,2.432741
2,2,Switzerland,Western Europe,7.5599,0.035014,7.628528,7.491272,10.979933,0.942847,74.102448,...,0.105911,0.303728,1.972317,1.390774,1.472403,1.040533,0.628954,0.269056,0.407946,2.350267
3,3,Iceland,Western Europe,7.5045,0.059616,7.621347,7.387653,10.772559,0.97467,73.0,...,0.246944,0.71171,1.972317,1.326502,1.547567,1.000843,0.661981,0.36233,0.144541,2.460688
4,4,Norway,Western Europe,7.488,0.034837,7.556281,7.419719,11.087804,0.952487,73.200783,...,0.134533,0.263218,1.972317,1.424207,1.495173,1.008072,0.670201,0.287985,0.434101,2.168266


In [43]:
happiness2020['Explained by: Generosity'].describe()

count    153.000000
mean       0.189375
std        0.100401
min        0.000000
25%        0.115006
50%        0.176745
75%        0.255510
max        0.569814
Name: Explained by: Generosity, dtype: float64

In [44]:
happiness2019= pd.read_csv('WorldHappinessReport/2019.csv')
happiness2019['Generosity'].describe()

count    156.000000
mean       0.184846
std        0.095254
min        0.000000
25%        0.108750
50%        0.177500
75%        0.248250
max        0.566000
Name: Generosity, dtype: float64

In [45]:
happiness2018= pd.read_csv('WorldHappinessReport/2018.csv')
happiness2018['Healthy life expectancy'].describe()

count    156.000000
mean       0.597346
std        0.247579
min        0.000000
25%        0.422250
50%        0.644000
75%        0.777250
max        1.030000
Name: Healthy life expectancy, dtype: float64

In [46]:
happiness2020.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 153 entries, 0 to 152
Data columns (total 21 columns):
Overall rank                                  153 non-null int64
Country name                                  153 non-null object
Regional indicator                            153 non-null object
Ladder score                                  153 non-null float64
Standard error of ladder score                153 non-null float64
upperwhisker                                  153 non-null float64
lowerwhisker                                  153 non-null float64
Logged GDP per capita                         153 non-null float64
Social support                                153 non-null float64
Healthy life expectancy                       153 non-null float64
Freedom to make life choices                  153 non-null float64
Generosity                                    153 non-null float64
Perceptions of corruption                     153 non-null float64
Ladder score in Dystopia    

- Sensible data types.
- There are 153 entries. Check whether there are any doubles by counting the number of uniques.

In [47]:
happiness2020['Country name'].unique().shape[0]

153

In [48]:
happiness2020['Country name'].unique()

array(['Finland', 'Denmark', 'Switzerland', 'Iceland', 'Norway',
       'Netherlands', 'Sweden', 'New Zealand', 'Austria', 'Luxembourg',
       'Canada', 'Australia', 'United Kingdom', 'Israel', 'Costa Rica',
       'Ireland', 'Germany', 'United States', 'Czech Republic', 'Belgium',
       'United Arab Emirates', 'Malta', 'France', 'Mexico',
       'Taiwan Province of China', 'Uruguay', 'Saudi Arabia', 'Spain',
       'Guatemala', 'Italy', 'Singapore', 'Brazil', 'Slovenia',
       'El Salvador', 'Kosovo', 'Panama', 'Slovakia', 'Uzbekistan',
       'Chile', 'Bahrain', 'Lithuania', 'Trinidad and Tobago', 'Poland',
       'Colombia', 'Cyprus', 'Nicaragua', 'Romania', 'Kuwait', 'Mauritius',
       'Kazakhstan', 'Estonia', 'Philippines', 'Hungary', 'Thailand',
       'Argentina', 'Honduras', 'Latvia', 'Ecuador', 'Portugal', 'Jamaica',
       'South Korea', 'Japan', 'Peru', 'Serbia', 'Bolivia', 'Pakistan',
       'Paraguay', 'Dominican Republic', 'Bosnia and Herzegovina',
       'Moldova', '

In [49]:
happiness2019= pd.read_csv('WorldHappinessReport/2019.csv')

In [50]:
happiness2019['Country or region'].unique()

array(['Finland', 'Denmark', 'Norway', 'Iceland', 'Netherlands',
       'Switzerland', 'Sweden', 'New Zealand', 'Canada', 'Austria',
       'Australia', 'Costa Rica', 'Israel', 'Luxembourg', 'United Kingdom',
       'Ireland', 'Germany', 'Belgium', 'United States', 'Czech Republic',
       'United Arab Emirates', 'Malta', 'Mexico', 'France', 'Taiwan',
       'Chile', 'Guatemala', 'Saudi Arabia', 'Qatar', 'Spain', 'Panama',
       'Brazil', 'Uruguay', 'Singapore', 'El Salvador', 'Italy', 'Bahrain',
       'Slovakia', 'Trinidad & Tobago', 'Poland', 'Uzbekistan',
       'Lithuania', 'Colombia', 'Slovenia', 'Nicaragua', 'Kosovo',
       'Argentina', 'Romania', 'Cyprus', 'Ecuador', 'Kuwait', 'Thailand',
       'Latvia', 'South Korea', 'Estonia', 'Jamaica', 'Mauritius', 'Japan',
       'Honduras', 'Kazakhstan', 'Bolivia', 'Hungary', 'Paraguay',
       'Northern Cyprus', 'Peru', 'Portugal', 'Pakistan', 'Russia',
       'Philippines', 'Serbia', 'Moldova', 'Libya', 'Montenegro',
       'Tajikis

##### Creating list of filepaths to process original happiness report csv data files

In [51]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
happiness_filepath = os.getcwd() + '/WorldHappinessReport/'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(happiness_filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)
    break

/home/workspace
['/home/workspace/WorldHappinessReport/2020.csv', '/home/workspace/WorldHappinessReport/2018.csv', '/home/workspace/WorldHappinessReport/2019.csv']


##### Processing the files to create a happiness file that will be used from there

In [52]:
# Create empty dataframe for all the data
df_happiness= pd.DataFrame()
# List of columns that for the output dataframe
cols= ['Country', 'Happiness_Score', 'GDP_Score', 'Health_Score', 'Freedom_Score', \
       'Generosity_Score', 'Corruption_Score', 'Year']

for f in file_path_list:
    year= f[-8:-4]
    print('year= ', year)
    df_happiness_year= pd.read_csv(f)
    df_happiness_year['Year']= year
    print('number of rows ', df_happiness_year.shape[0])
    #print(df_happiness_year.columns)
    if year== '2020':
        df_happiness_year.rename(columns= {'Country name': 'Country', 'Ladder score': 'Happiness_Score', \
                                           'Explained by: Log GDP per capita': 'GDP_Score', \
                                           'Explained by: Healthy life expectancy': 'Health_Score', \
                                           'Explained by: Freedom to make life choices': 'Freedom_Score', \
                                           'Explained by: Generosity': 'Generosity_Score', \
                                           'Explained by: Perceptions of corruption': 'Corruption_Score'}, inplace= True)
    elif year== '2019' or year== '2018':
        df_happiness_year.rename(columns= {'Country or region': 'Country', 'Score': 'Happiness_Score', \
                                           'GDP per capita': 'GDP_Score', \
                                           'Healthy life expectancy': 'Health_Score', \
                                           'Freedom to make life choices': 'Freedom_Score', \
                                           'Generosity': 'Generosity_Score', \
                                           'Perceptions of corruption': 'Corruption_Score'}, inplace= True)
    #print('countries that are duplicated= ', df_happiness_year['Country'].duplicated().sum())
    print(df_happiness_year.columns)
    df_hapiness_year= df_happiness_year[cols]
    print(df_happiness_year[cols].columns)
    df_happiness= df_happiness.append(df_happiness_year.loc[:, cols])

year=  2020
number of rows  153
Index(['Country', 'Regional indicator', 'Happiness_Score',
       'Standard error of ladder score', 'upperwhisker', 'lowerwhisker',
       'Logged GDP per capita', 'Social support', 'Healthy life expectancy',
       'Freedom to make life choices', 'Generosity',
       'Perceptions of corruption', 'Ladder score in Dystopia', 'GDP_Score',
       'Explained by: Social support', 'Health_Score', 'Freedom_Score',
       'Generosity_Score', 'Corruption_Score', 'Dystopia + residual', 'Year'],
      dtype='object')
Index(['Country', 'Happiness_Score', 'GDP_Score', 'Health_Score',
       'Freedom_Score', 'Generosity_Score', 'Corruption_Score', 'Year'],
      dtype='object')
year=  2018
number of rows  156
Index(['Overall rank', 'Country', 'Happiness_Score', 'GDP_Score',
       'Social support', 'Health_Score', 'Freedom_Score', 'Generosity_Score',
       'Corruption_Score', 'Year'],
      dtype='object')
Index(['Country', 'Happiness_Score', 'GDP_Score', 'Health_Sco

In [53]:
df_happiness.describe()

Unnamed: 0,Happiness_Score,GDP_Score,Health_Score,Freedom_Score,Generosity_Score,Corruption_Score
count,465.0,465.0,465.0,465.0,465.0,464.0
mean,5.418399,0.888583,0.671684,0.436714,0.185048,0.117702
std,1.113319,0.387283,0.253337,0.152315,0.097898,0.101842
min,2.5669,0.0,0.0,0.0,0.0,0.0
25%,4.5711,0.592,0.494102,0.349,0.111,0.05119
50%,5.43,0.947,0.718,0.455,0.176,0.087676
75%,6.192,1.201,0.861,0.555,0.252,0.144906
max,7.8087,2.096,1.141,0.724,0.598,0.533162


In [54]:
df_happiness.sample(10)

Unnamed: 0,Country,Happiness_Score,GDP_Score,Health_Score,Freedom_Score,Generosity_Score,Corruption_Score,Year
16,Luxembourg,6.91,1.576,0.896,0.632,0.196,0.321,2018
103,Gabon,4.799,1.057,0.571,0.295,0.043,0.055,2019
92,China,5.191,1.029,0.893,0.521,0.058,0.1,2019
32,Slovenia,6.3634,1.208652,0.932548,0.6467,0.145701,0.076516,2020
60,South Korea,5.8724,1.245301,1.022543,0.259356,0.170304,0.094596,2020
53,Japan,5.915,1.294,0.988,0.553,0.079,0.15,2018
66,Pakistan,5.653,0.677,0.535,0.313,0.22,0.098,2019
109,Laos,4.623,0.72,0.441,0.626,0.23,0.174,2018
67,Turkmenistan,5.636,1.016,0.517,0.417,0.199,0.037,2018
13,Ireland,6.977,1.448,0.876,0.614,0.307,0.306,2018


In [55]:
# Check for NA
df_happiness.isna().sum()

Country             0
Happiness_Score     0
GDP_Score           0
Health_Score        0
Freedom_Score       0
Generosity_Score    0
Corruption_Score    1
Year                0
dtype: int64

There are no NA's.

In [56]:
countries= np.sort(df_happiness['Country'].unique())
countries

array(['Afghanistan', 'Albania', 'Algeria', 'Angola', 'Argentina',
       'Armenia', 'Australia', 'Austria', 'Azerbaijan', 'Bahrain',
       'Bangladesh', 'Belarus', 'Belgium', 'Belize', 'Benin', 'Bhutan',
       'Bolivia', 'Bosnia and Herzegovina', 'Botswana', 'Brazil',
       'Bulgaria', 'Burkina Faso', 'Burundi', 'Cambodia', 'Cameroon',
       'Canada', 'Central African Republic', 'Chad', 'Chile', 'China',
       'Colombia', 'Comoros', 'Congo (Brazzaville)', 'Congo (Kinshasa)',
       'Costa Rica', 'Croatia', 'Cyprus', 'Czech Republic', 'Denmark',
       'Dominican Republic', 'Ecuador', 'Egypt', 'El Salvador', 'Estonia',
       'Ethiopia', 'Finland', 'France', 'Gabon', 'Gambia', 'Georgia',
       'Germany', 'Ghana', 'Greece', 'Guatemala', 'Guinea', 'Haiti',
       'Honduras', 'Hong Kong', 'Hong Kong S.A.R. of China', 'Hungary',
       'Iceland', 'India', 'Indonesia', 'Iran', 'Iraq', 'Ireland',
       'Israel', 'Italy', 'Ivory Coast', 'Jamaica', 'Japan', 'Jordan',
       'Kazakhstan'

Observations:
- There is 'Congo (Brazzaville)' and 'Congo (Kinshasa)'
- There is 'Hong Kong' and 'Hong Kong S.A.R. of China'
- There is 'Taiwan' and 'Taiwan Province of China'
- There is 'Trinidad & Tobago' and 'Trinidad and Tobago'
- There is 'North Cyprus' and 'Northern Cyprus' and 'Cyprus'
- There is 'Macedonia' and 'North Macedonia'

#### Create table with country translation

In the immigration data, the country names are in Dutch. In the happiness data, the country names are in English. They need to be matched up. This will be done through a separate translation table.

In [57]:
# All country names (Dutch) in the immigration data
countries_Dutch= np.sort(df_meta_birthcountry['Title'].unique())
countries_Dutch

array(['(voormalig) Joegoslavië', '(voormalig) Tsjecho-Slowakije',
       '(voormalige) Nederlandse Antillen, Aruba',
       '(voormalige) Sovjet-Unie', 'Afghanistan', 'Afrika', 'Albanië',
       'Algerije', 'Amerika', 'Amerikaans Samoa',
       'Amerikaanse Maagdeneilanden', 'Andorra', 'Angola', 'Anguilla',
       'Antarctica', 'Antigua en Barbuda', 'Argentinië', 'Armenië',
       'Aruba', 'Australië', 'Azerbeidzjan', 'Azië', 'Bahamas', 'Bahrein',
       'Bangladesh', 'Barbados', 'België', 'Belize', 'Benin', 'Bermuda',
       'Bhutan', 'Bolivia', 'Bosnië-Herzegovina', 'Botswana', 'Brazilië',
       'Brits Territorium in de Indische Oceaan', 'Britse Maagdeneilanden',
       'Brunei', 'Bulgarije', 'Burkina Faso', 'Burundi', 'Cambodja',
       'Canada', 'Caribisch Nederland', 'Caymaneilanden',
       'Centraal-Afrikaanse Republiek', 'Chili', 'China', 'Colombia',
       'Comoren', 'Congo', 'Congo (Democratische Republiek)',
       'Cookeilanden', 'Costa Rica', 'Cuba', 'Curaçao', 'Cyprus',

In [58]:
countries_Dutch.shape[0]

263

In [59]:
# All country names in the happiness data
countries

array(['Afghanistan', 'Albania', 'Algeria', 'Angola', 'Argentina',
       'Armenia', 'Australia', 'Austria', 'Azerbaijan', 'Bahrain',
       'Bangladesh', 'Belarus', 'Belgium', 'Belize', 'Benin', 'Bhutan',
       'Bolivia', 'Bosnia and Herzegovina', 'Botswana', 'Brazil',
       'Bulgaria', 'Burkina Faso', 'Burundi', 'Cambodia', 'Cameroon',
       'Canada', 'Central African Republic', 'Chad', 'Chile', 'China',
       'Colombia', 'Comoros', 'Congo (Brazzaville)', 'Congo (Kinshasa)',
       'Costa Rica', 'Croatia', 'Cyprus', 'Czech Republic', 'Denmark',
       'Dominican Republic', 'Ecuador', 'Egypt', 'El Salvador', 'Estonia',
       'Ethiopia', 'Finland', 'France', 'Gabon', 'Gambia', 'Georgia',
       'Germany', 'Ghana', 'Greece', 'Guatemala', 'Guinea', 'Haiti',
       'Honduras', 'Hong Kong', 'Hong Kong S.A.R. of China', 'Hungary',
       'Iceland', 'India', 'Indonesia', 'Iran', 'Iraq', 'Ireland',
       'Israel', 'Italy', 'Ivory Coast', 'Jamaica', 'Japan', 'Jordan',
       'Kazakhstan'

In [60]:
countries.shape[0]

165

In [61]:
type(countries)

numpy.ndarray

In [62]:
# Export countries in Dutch to make the translation table
pd.DataFrame(countries_Dutch).to_csv("output/countries_Dutch.csv", index= False)

In [63]:
# Export countries in English to make the translation table
pd.DataFrame(countries).to_csv("output/countries.csv", index= False)

Observation:
- There is no Dutch equivalent of 'Palestinian Territories'

#### Cleaning Steps

#### Cleaning steps- Immigration data
- Don't load entries with none or 0 people in Immigratie_1 by adding two conditions when to append an entry immigration_dict.get(key).get('Immigratie_1') != None and immigration_dict.get(key).get('Immigratie_1') > 0
- For age only load entries that reflect the total or the number per age-year, i.e. not the ones aggregated over a 5-year age range
- Don't load totals for gender (Geslacht), age (Leeftijd),  marital status (BurgerlijkeStaat) or country of birth (Geboorteland)

#### Cleaning steps- Immigration metadata
#### Cleaning- df_meta_gender:
- Rename column 'Title' into 'Geslacht'
- Add column 'Gender' with translation of 'Geslacht'

#### Cleaning- df_meta_age:
- Rename column 'Title' into 'Age'
- Replace 'totaal' with 'total'
- Replace 'jaar' with'years'
- Replace 'of ouder' with 'or older'
- Replace 'tot' with 'to'

#### Cleaning df_meta_marital:
- Rename column 'Title' into 'BurgerlijkeStaat'
- Add column 'MaritalStatus' with translation of 'BurgerlijkeStaat'
- Remove trailing whitespaces in the 'Key' 

#### Cleaning steps- Happiness data
- Rename 'Congo (Brazzaville)' to 'Congo' 
- Rename 'Congo (Kinshasa)' to 'Congo'
- Rename 'Taiwan Province of China' to 'Taiwan'
- Rename 'Trinidad & Tobago' to 'Trinidad and Tobago'
- Rename 'Northern Cyprus' and 'North Cyprus' to 'Cyprus'
- Rename 'North Macedonia' to 'Macedonia'

In [64]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
immigration_filepath = os.getcwd() + '/Immigration_data/'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(immigration_filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)
    break

/home/workspace
['/home/workspace/Immigration_data/03742_TypedDataSet_2020.json', '/home/workspace/Immigration_data/03742_TypedDataSet_2018.json', '/home/workspace/Immigration_data/03742_TypedDataSet_2019.json']


In [65]:
dict_df_immigration= {}
for f in file_path_list:
    year= f[-9: -5]
    print('year= ', year)
    dict_df_immigration[year] = pd.DataFrame(columns=['ID', 'Geslacht', 'LeeftijdOp31December', 'BurgerlijkeStaat', 'Geboorteland', 'Perioden', 'Immigratie_1'])
    
    # Import the json-files with immigration data
    immigration_json= []
    for line in open(f, 'r'):
        immigration_json.append(json.loads(line))
    immigration_dict= immigration_json[0].get('value')
    imm_keys= list(immigration_dict.keys())
    
    #for key in imm_keys[0: len(imm_keys)]:
    for key in imm_keys[199100: 200000]:
        if immigration_dict.get(key).get('Immigratie_1') != None and immigration_dict.get(key).get('Immigratie_1') > 0 and int(immigration_dict.get(key).get('LeeftijdOp31December')) < 19906 \
            and immigration_dict.get(key).get('Geslacht') != 'T001038' and immigration_dict.get(key).get('LeeftijdOp31December') != '10000' and immigration_dict.get(key).get('BurgerlijkeStaat') != 'T001019' \
            and immigration_dict.get(key).get('Geboorteland') != 'T001175':
            print(key)
            dict_df_immigration[year] = dict_df_immigration[year].append(immigration_dict.get(key), ignore_index=True)

year=  2020
5176625
5176651
5176729
5177613
5178757
5178991
5179225
5179355
5179407
5180213
5180655
5180759
5181123
5181305
5181383
5181435
5181461
5181773
5182059
5182111
5182241
5182631
5182917
year=  2018
5176623
5176649
5176727
5176753
5177351
5177403
5178755
5179405
5180211
5180653
5180679
5180705
5180861
5181017
5181121
5181303
5181433
5181719
5181849
5182005
5182057
5182109
5182161
5182239
5182577
5182629
5182915
5183097
5190117
5190221
5195915
year=  2019
5176624
5176650
5176728
5176754
5177352
5177404
5178106
5178236
5179172
5179406
5180134
5180212
5180654
5180862
5181018
5181122
5181252
5181304
5181434
5181850
5182058
5182110
5182188
5182240
5182604
5182630
5182838
5183098


In [66]:
df_immigration_clean= pd.DataFrame(columns=['ID', 'Geslacht', 'LeeftijdOp31December', 'BurgerlijkeStaat', 'Geboorteland', 'Perioden', 'Immigratie_1'])
df_immigration_clean

Unnamed: 0,ID,Geslacht,LeeftijdOp31December,BurgerlijkeStaat,Geboorteland,Perioden,Immigratie_1


In [67]:
for yr in ['2018', '2019', '2020']:
    df_immigration_clean= df_immigration_clean.append(dict_df_immigration[yr])

In [68]:
df_immigration_clean.shape[0]

82

In [69]:
dict_df_immigration['2020']

Unnamed: 0,ID,Geslacht,LeeftijdOp31December,BurgerlijkeStaat,Geboorteland,Perioden,Immigratie_1
0,5176625,3000,12200,1020,G007937,2020JJ00,2
1,5176651,3000,12200,1020,G007935,2020JJ00,4
2,5176729,3000,12200,1020,G008533,2020JJ00,4
3,5177613,3000,12200,1020,G008571,2020JJ00,1
4,5178757,3000,12200,1020,G008615,2020JJ00,2
5,5178991,3000,12200,1020,G008626,2020JJ00,1
6,5179225,3000,12200,1020,G008635,2020JJ00,1
7,5179355,3000,12200,1020,G008640,2020JJ00,1
8,5179407,3000,12200,1020,G007069,2020JJ00,2
9,5180213,3000,12200,1020,G008673,2020JJ00,3


In [70]:
# Create an empty dataframe to store the immigration data in
df_immigration_clean= pd.DataFrame(columns=['ID', 'Geslacht', 'LeeftijdOp31December', 'BurgerlijkeStaat', 'Geboorteland', 'Perioden', 'Immigratie_1'])

for f in file_path_list:
    year= f[-9:-5]
    print('year= ', year)
    
    # Import the json-files with immigration data
    immigration_json= []
    for line in open(f, 'r'):
        immigration_json.append(json.loads(line))
    immigration_dict= immigration_json[0].get('value')
    imm_keys= list(immigration_dict.keys())
    
    #for key in imm_keys[0: len(imm_keys)]:
    for key in imm_keys[199500: 200000]:
        if immigration_dict.get(key).get('Immigratie_1') != None and immigration_dict.get(key).get('Immigratie_1') > 0 and int(immigration_dict.get(key).get('LeeftijdOp31December')) < 19906 \
            and immigration_dict.get(key).get('Geslacht') != 'T001038' and immigration_dict.get(key).get('LeeftijdOp31December') != '10000' and immigration_dict.get(key).get('BurgerlijkeStaat') != 'T001019' \
            and immigration_dict.get(key).get('Geboorteland') != 'T001175':
            #print(key)
            df_immigration_clean = df_immigration_clean.append(immigration_dict.get(key), ignore_index=True)

year=  2020
year=  2018
year=  2019


In [71]:
df_immigration_clean[df_immigration_clean.BurgerlijkeStaat == 'T001019'].shape[0]

0

In [72]:
df_immigration_clean.shape[0]

3

In [73]:
df_immigration_clean.to_csv('output/immigration_cleaned.csv', index= False)

In [74]:
df_meta_gender_clean= df_meta_gender.copy()
df_meta_gender_clean.rename(columns= {'Title' : 'Geslacht'}, inplace= True)
df_meta_gender_clean['Gender']= ['Total men and women', 'Men', 'Women']

In [75]:
df_meta_gender_clean

Unnamed: 0,Key,Geslacht,Gender
0,T001038,Totaal mannen en vrouwen,Total men and women
1,3000,Mannen,Men
2,4000,Vrouwen,Women


In [76]:
df_meta_age_clean= df_meta_age.copy()
df_meta_age_clean.rename(columns= {'Title' : 'Age'}, inplace= True)
df_meta_age_clean.Age.replace({'Totaal': 'Total'}, regex= True, inplace= True)
df_meta_age_clean.Age.replace({'jaar': 'years'}, regex= True, inplace= True)
df_meta_age_clean.Age.replace({'of ouder': 'or older'}, regex= True, inplace= True)
df_meta_age_clean.Age.replace({'tot': 'to'}, regex= True, inplace= True)

In [77]:
df_meta_age_clean

Unnamed: 0,Key,Age
0,10000,Total
1,10010,0 years
2,10100,1 years
3,10200,2 years
4,10300,3 years
5,10400,4 years
6,10500,5 years
7,10600,6 years
8,10700,7 years
9,10800,8 years


In [78]:
df_meta_marital_clean= df_meta_marital.copy()
df_meta_marital_clean.rename(columns= {'Title' : 'BurgerlijkeStaat'}, inplace= True)
df_meta_marital_clean['MaritalStatus']= ['Total', 'Unmarried', 'Married', 'Remarried', 'Divorced']

In [79]:
df_meta_marital_clean

Unnamed: 0,Key,BurgerlijkeStaat,Description,MaritalStatus
0,T001019,Totaal burgerlijke staat,,Total
1,1010,Ongehuwd,Burgerlijke staat die aangeeft dat een persoon...,Unmarried
2,1020,Gehuwd,Wettig gehuwd plus partnerschap.\n\nTrendbreuk...,Married
3,1050,Verweduwd,Verweduwd na wettig huwelijk plus verweduwd na...,Remarried
4,1080,Gescheiden,Gescheiden na wettig huwelijk plus gescheiden ...,Divorced


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The final schema will be a star schema with a fact table and actually only one dimension table as in the ERD below. They are connected by the country.

![alt text](ERD_final.JPG "Title")

#### 3.2 Mapping Out Data Pipelines
The initial ERD looks like the ERD below:

![alt text](ERD_before.JPG "Title")

To arrive at the final ERD the following steps are required:
- Load/stage all data
- Clean loaded/staged data
- Create the fact table Immigration from the tables Immigration, MetaGender, MetaAge, MetaMarital, MetaCountry and LandCountry
- The dimension table Happiness is equal to the cleaned table Happiness

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Run the etl.py from here with !python etl.py or run it in a launcher

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
 
Run Quality Checks

The data quality checks that will be performed are included in the etl.py script. The checks are:
- That there are no empty entries in the fact immigration table
- That there are no entries with a zero ImmigrationVolume in the fact immigration table
- That the total number of entries in the dimension happiness table equals the sum of the entries in the World Happiness Report input tables per year

#### 4.3 Data dictionary 
The data dictionary can be found in the readme.md file

#### Step 5: Complete Project Write Up

* It is proposed to update the model yearly because the immigration data is a yearly aggregate and the World Happiness Report is yearly published. 
* What if the amount of data is 100x as much? Then, you would probably want to run this on a cluster like Amazon EMR. Or at least have Spark reading the json-files.
* What if you would need to run this on a daily basis at 7 am? Then, you would probaly want to rewrite the code such that you can run it on Airflow in which you can schedule it.
* What if the database needs to be accessed by 100+ people at the same time? As long as the S3 bucket is publically accessible, then the current solution would still be suitable.