# ETL-Pipeline for US Immigration Data
### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create an ETL pipeline that ultimately creates a star schema with a fact table describing immigration data for the US. Dimensional tables refer to time (immigration date) and multiple location (arrival state, arrival airport, country of origin) dimensions, and are enriched with additional data of interest.

Based on the data aggregated in the star schema, one can submit analytical queries to gain insight about e.g.

* temporal distribution of immigration events
* demographic details of the arrival state and connections to immigration characteristics
* connections between temperatures within origin countries and immigration details
* which airports and airlines are used by immigrants

The project exemplifies the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

---
**NOTE**: within this notebook, the project is only sketched, while a highly structured implementation of the ETL process can be found in the `etl` folder, containing the following files:

| File       | Description |
| -----------| ------------|
| etl.py     | main module that can be executed from shell to run the pipeline |
| config.py  | configuration of the etl process via pydantic |
| etl_extract.py | data extraction functions and classes used within main module |
| etl_load.py | data load functions and classes used within main module |
| requirements.txt | package requirements to execute script |
| config.json | default configuration in JSON format |

The script can be started from shell via `python etl.py`. Different command-line arguments can be used: to get an overview, execute `python etl.py -h`.

---

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np

from pathlib import Path
from datetime import datetime
from typing import Sequence
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, to_date

### Helper Functions

In [2]:
def get_nan_percent(df: pd.DataFrame, cols: Sequence[str]) -> pd.DataFrame:

    info = {}
    len_df = float(len(df))
    for col_name in cols:
        na_ratio = len(df[df[col_name].isna() == True])/len_df
        info[col_name] = na_ratio * 100.0

    df_out = pd.DataFrame.from_dict({"Column": list(info.keys()), "Percent_NaN": list(info.values())})
    return df_out

___

### Step 1: Scope the Project and Gather Data

#### Scope 
The first step is to describe the provided data and load it into this notebook in order to perform initial checks and to gain insights.

For smaller files, it will be sufficient to use standard **pandas** functions, while **pyspark** will be employed for processing larger files.

The following steps will be carried out:

1. load data and describe formats
2. perform exploratory data analysis in order to evaluate possible issues or find valuable insights
3. clean data to achieve a high-qualty state for further processing
4. outline the ETL process and define the target schema
5. create actual fact and dimension tables
6. possibly upload/dump the resulting schema into target system(s)



#### Describe and Gather Data 
In the following, the source data will be described and loaded into objects within the notebook for further analysis.

---

#### 1. Weather Data

**Description**: contains average temperatures for a variety of cities along the time axis. Additional spatio-geographical information is included.

Source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data (Kaggle)

Table structure:

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| dt                   | datetime | record time  |
| AverageTemperature   | float    | celsius      |
| City                 | string   | city name    |
| Country              | string   | country name |
| Latitude             | string   | city latitude |
| Longitude            | string   | city longitude |


In [4]:
# Load data into pandas dataframe
weather_data = Path("data/GlobalLandTemperaturesByCity_sample.csv")
df_wtr = pd.read_csv(weather_data)

---

#### 2. Airport Data

**Description**: contains identification characteristics and geographical information about airports.

Source: https://datahub.io/core/airport-codes#data (DataHub)

Table structure:

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| ident                | string   | airport identifier code            |
| type                 | string   | airport type            |
| elevation_ft         | float    | elevation in feet            |
| continent            | string   | airport continent             |
| iso_country          | string   | country ISO code             |
| iso_region           | string   | region ISO code             |
| municipality         | string   | name of municipality            |
| gps_code             | string   | GPS code            |
| iata_code            | string   | IATA code             |
| local_code           | string   | local airport code            |
| coordinates          | string   | tuple of latitude, longitude       |

In [5]:
# Load data into pandas dataframe
airport_data = Path("data/airport-codes_csv.csv")
df_apt = pd.read_csv(airport_data)

---

#### 3. Immigration Data

**Description**: contains information about individual immigration processes. Each row contains a set of records for an individuals' arrival in the US. All data in the set is from the year 2016 and is partitioned by months.
    
Source: https://travel.trade.gov/research/reports/i94/historical/2016.html (US National Tourism and Trade Office)
    
Table structure:

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| cicid                | float   |      record ID        |
| i94yr                | float   |      year        |
| i94mon               | float    |         month     |
| i94cit               | float   |      birth country ID        |
| i94res               | float   |      residence country ID        |
| i94port              | string   |     arrival port in US        |
| arrdate              | float   |      arrival date in US      |
| i94mode              | float   |      transportation mode (air: 1, sea: 2, land: 3, else: 9)        |
| i94addr              | string   |     arrival state         |
| depdate              | float   |      departure date        |
| i94bir               | float  | age       |
| i94visa              | float   |   visa code           |
| count                | float   |   auxiliary field           |
| dtadfile             | string    |  auxiliary date field            |
| visapost             | string   |   state of visa grant           |
| occup                | string   |   occupation in US           |
| entdepa              | string   |   arrival code           |
| entdepd              | string   |   departure code           |
| entdepu              | string   |   update code           |
| matflag              | string   |   matching code            |
| biryear              | float   |    birth year          |
| dtaddto              | string  | residence allowance date        |
| gender               | string   |  gender            |
| insnum               | string    |  INS number            |
| airline              | string   |  airline for arrival            |
| admnum               | float   |   admission number           |
| fltno                | string   |  flight number            |
| visatype             | string   |  type of visa            |

In [20]:
# Initialize spark session 
spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

# Load data into spark dataframe
# dfsp_imgn_apr = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
dfsp_imgn_apr = spark.read.option("header", True).csv("data/immigration_data_sample.csv")

---

#### 4. City demographic data

**Description**: contains demographic data about US cities, where each city & state entry is partitioned by race.

Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/ (OpenSoft)

Table structure:

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| City                | string   |      city name        |
| State                 | string   |    state name          |
| Median Age           | float    |     median resident age         |
| Male Population       | float   |     total number of male residents         |
| Female Population       | float   |   total number of female residents           |
| Total Population        | int   |     total population number         |
| Number of Veterans       | float   |  number of veterans           |
| Foreign born             | float   |  number of foreign born residents            |
| Average Household Size    | float   |   average number of people in households           |
| State Code                | string   |    2-character US state code          |
| Race                      | string  |  race for next column       |
| Count                 | int  | number of residents of race defined in former column       |

In [7]:
# Load data into pandas dataframe
city_data = Path("data/us-cities-demographics.csv")
df_cities = pd.read_csv(city_data, sep=";")

---

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

##### 1. Weather Data

In [8]:
# Show first 5 rows
df_wtr.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [9]:
# Get general data info
df_wtr.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100295 entries, 0 to 100294
Data columns (total 7 columns):
 #   Column                         Non-Null Count   Dtype  
---  ------                         --------------   -----  
 0   dt                             100295 non-null  object 
 1   AverageTemperature             96020 non-null   float64
 2   AverageTemperatureUncertainty  96020 non-null   float64
 3   City                           100295 non-null  object 
 4   Country                        100295 non-null  object 
 5   Latitude                       100295 non-null  object 
 6   Longitude                      100295 non-null  object 
dtypes: float64(2), object(5)
memory usage: 5.4+ MB


In [10]:
# Check for unique city/country fields just out of interest
len(df_wtr["City"].unique()), len(df_wtr["Country"].unique())

(42, 26)

In [11]:
# Check for duplicate rows (with respect to relevant tuples) - there are many duplicates!
len(df_wtr[df_wtr.duplicated(subset=["dt", "City", "Country"])])

0

In [12]:
# Check for NaNs/NULLs - relatively well-conditioned ...! However, rows without a temperature are useless.
print(get_nan_percent(df_wtr, list(df_wtr.keys())))

                          Column  Percent_NaN
0                             dt     0.000000
1             AverageTemperature     4.262426
2  AverageTemperatureUncertainty     4.262426
3                           City     0.000000
4                        Country     0.000000
5                       Latitude     0.000000
6                      Longitude     0.000000


---

##### 2. Airport Data

In [13]:
# Show first 5 rows
df_apt.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [14]:
# General dataframe info
df_apt.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  48069 non-null  float64
 4   continent     27356 non-null  object 
 5   iso_country   54828 non-null  object 
 6   iso_region    55075 non-null  object 
 7   municipality  49399 non-null  object 
 8   gps_code      41030 non-null  object 
 9   iata_code     9189 non-null   object 
 10  local_code    28686 non-null  object 
 11  coordinates   55075 non-null  object 
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [15]:
# Check for NaNs/NULLs - continent, iata_code, local code have over 40% missing values, rest is quite OK!
print(get_nan_percent(df_apt, list(df_apt.keys())))

          Column  Percent_NaN
0          ident     0.000000
1           type     0.000000
2           name     0.000000
3   elevation_ft    12.720835
4      continent    50.329551
5    iso_country     0.448479
6     iso_region     0.000000
7   municipality    10.305946
8       gps_code    25.501589
9      iata_code    83.315479
10    local_code    47.914662
11   coordinates     0.000000


---

##### 3. Immigration Data

In [21]:
# Show first 5 rows
dfsp_imgn_apr.show(5)

+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|      x|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|2027561|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|56582674633.0|00782|      WT|
|2171295|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1

In [22]:
# Print the data schema
dfsp_imgn_apr.printSchema()

root
 |-- x: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)

In [23]:
# Split into 3 columns for better visibility within notebook; show summaries thereafter piece-by-piece
col_parts = np.array_split(dfsp_imgn_apr.columns, 3)

In [24]:
dfsp_imgn_apr.select([col(c) for c in col_parts[0]]).summary().show()

+-------+-----------------+------------------+------+------+------------------+------------------+-------+-----------------+------------------+-------+
|summary|                x|             cicid| i94yr|i94mon|            i94cit|            i94res|i94port|          arrdate|           i94mode|i94addr|
+-------+-----------------+------------------+------+------+------------------+------------------+-------+-----------------+------------------+-------+
|  count|             1000|              1000|  1000|  1000|              1000|              1000|   1000|             1000|              1000|    941|
|   mean|       1542097.12|       3040461.409|2016.0|   4.0|           302.928|           298.262|   null|         20559.68|             1.078|   null|
| stddev|915287.9043923795|1799817.7827726966|   0.0|   0.0|206.48528516334758|202.12038988683958|   null|8.995026987758733|0.4859548869516101|   null|
|    min|          1006205|         1000074.0|2016.0|   4.0|             103.0|         

In [25]:
dfsp_imgn_apr.select([col(c) for c in col_parts[1]]).summary().show()

+-------+------------------+-----------------+------------------+-----+-----------------+--------+-----+-------+-------+-------+
|summary|           depdate|           i94bir|           i94visa|count|         dtadfile|visapost|occup|entdepa|entdepd|entdepu|
+-------+------------------+-----------------+------------------+-----+-----------------+--------+-----+-------+-------+-------+
|  count|               951|             1000|              1000| 1000|             1000|     382|    4|   1000|    954|      0|
|   mean| 20575.03785488959|           42.382|             1.859|  1.0|   2.0160424879E7|    null| null|   null|   null|   null|
| stddev|24.211233522000143|17.90342449389526|0.3863525181337226|  0.0|49.51656540286629|    null| null|   null|   null|   null|
|    min|           20547.0|              1.0|               1.0|  1.0|         20160401|     ABD|  OTH|      A|      D|   null|
|    25%|           20561.0|             30.0|               2.0|  1.0|      2.0160408E7|    null

In [26]:
dfsp_imgn_apr.select([col(c) for c in col_parts[2]]).summary().show()

+-------+-------+-----------------+------------------+------+------------------+-------+--------------------+------------------+--------+
|summary|matflag|          biryear|           dtaddto|gender|            insnum|airline|              admnum|             fltno|visatype|
+-------+-------+-----------------+------------------+------+------------------+-------+--------------------+------------------+--------+
|  count|    954|             1000|              1000|   859|                35|    967|                1000|               992|    1000|
|   mean|   null|         1973.618| 8258277.404255319|  null|3826.8571428571427|    2.0|  6.9372367950789E10|1337.2554291623578|    null|
| stddev|   null|17.90342449389525|1622586.3557888167|  null| 221.7425829858661|    0.0|2.338134181802248E10| 6149.954574383991|    null|
|    min|      M|           1923.0|          04082018|     F|              3468|    *GA|                 0.0|             00001|      B1|
|    25%|   null|           1961.0

In [27]:
n_rows = dfsp_imgn_apr.count()
print(n_rows)

1000


In [28]:
# Check for duplicate identifiers - cicid is a unique row identifier!
dfsp_imgn_apr.select("cicid").distinct().count()

1000

In [29]:
# Check for NaNs/NULLs - quite OK in this selection!
dfsp_imgn_apr.select([(count(when(isnan(c) | col(c).isNull(), c)) / n_rows * 100).alias(c) for c in col_parts[0]]).show()

+---+-----+-----+------+------+------+-------+-------+-------+------------------+
|  x|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|           i94addr|
+---+-----+-----+------+------+------+-------+-------+-------+------------------+
|0.0|  0.0|  0.0|   0.0|   0.0|   0.0|    0.0|    0.0|    0.0|5.8999999999999995|
+---+-----+-----+------+------+------+-------+-------+-------+------------------+



In [30]:
# Check for NaNs/NULLs - visapost, occup, entdepu have very high amounts of invalid entries!
dfsp_imgn_apr.select([(count(when(isnan(c) | col(c).isNull(), c)) / n_rows * 100).alias(c) for c in col_parts[1]]).show()

+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+
|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|
+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+
|    4.9|   0.0|    0.0|  0.0|     0.0|    61.8| 99.6|    0.0|    4.6|  100.0|
+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+



In [31]:
# Check for NaNs/NULLs - insnum has very high amounts of invalid entries!
dfsp_imgn_apr.select([(count(when(isnan(c) | col(c).isNull(), c)) / n_rows * 100).alias(c) for c in col_parts[2]]).show()

+-------+-------+-------+------------------+------+------------------+------+-----+--------+
|matflag|biryear|dtaddto|            gender|insnum|           airline|admnum|fltno|visatype|
+-------+-------+-------+------------------+------+------------------+------+-----+--------+
|    4.6|    0.0|    0.0|14.099999999999998|  96.5|3.3000000000000003|   0.0|  0.8|     0.0|
+-------+-------+-------+------------------+------+------------------+------+-----+--------+



---

##### 4. City Demographic Data

In [32]:
# Show first 5 rows
df_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [33]:
# Print general info
df_cities.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2891 non-null   object 
 1   State                   2891 non-null   object 
 2   Median Age              2891 non-null   float64
 3   Male Population         2888 non-null   float64
 4   Female Population       2888 non-null   float64
 5   Total Population        2891 non-null   int64  
 6   Number of Veterans      2878 non-null   float64
 7   Foreign-born            2878 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2891 non-null   object 
 10  Race                    2891 non-null   object 
 11  Count                   2891 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 271.2+ KB


In [34]:
# Check for relevant partitions - city+state+race correspond to unique rows
len(df_cities[["City", "State", "Race"]].drop_duplicates())

2891

In [35]:
df_cities["Race"].unique(), len(df_cities["Count"].unique())

(array(['Hispanic or Latino', 'White', 'Asian',
        'Black or African-American', 'American Indian and Alaska Native'],
       dtype=object),
 2785)

In [36]:
df_cities["Median Age"].max(), df_cities["Median Age"].min()

(70.5, 22.9)

In [37]:
df_cities[df_cities["Median Age"] == df_cities["Median Age"].max()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
333,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331
1437,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211


In [38]:
# Check for NaNs/NULLs - everything is relatively OK!
print(get_nan_percent(df_cities, list(df_cities.keys())))

                    Column  Percent_NaN
0                     City     0.000000
1                    State     0.000000
2               Median Age     0.000000
3          Male Population     0.103770
4        Female Population     0.103770
5         Total Population     0.000000
6       Number of Veterans     0.449671
7             Foreign-born     0.449671
8   Average Household Size     0.553442
9               State Code     0.000000
10                    Race     0.000000
11                   Count     0.000000


In [39]:
df_cities["State Code"].unique()

array(['MD', 'MA', 'AL', 'CA', 'NJ', 'IL', 'AZ', 'MO', 'NC', 'PA', 'KS',
       'FL', 'TX', 'VA', 'NV', 'CO', 'MI', 'CT', 'MN', 'UT', 'AR', 'TN',
       'OK', 'WA', 'NY', 'GA', 'NE', 'KY', 'SC', 'LA', 'NM', 'IA', 'RI',
       'PR', 'DC', 'WI', 'OR', 'NH', 'ND', 'DE', 'OH', 'ID', 'IN', 'AK',
       'MS', 'HI', 'SD', 'ME', 'MT'], dtype=object)

In [40]:
df_cities["Race"].unique()

array(['Hispanic or Latino', 'White', 'Asian',
       'Black or African-American', 'American Indian and Alaska Native'],
      dtype=object)

---

#### Cleaning Steps
Data cleaning ... (this is done automatically in the corresponding script)

##### 1. Weather Data

Check and fix NaN values

In [41]:
# Drop rows with invalid temperature fields
df_wtr = df_wtr.dropna(subset=["AverageTemperature"])

In [42]:
len(df_wtr)

96020

Check and possibly fix other invalid entries

In [43]:
# Check for invalid temps - none found!
df_wtr["AverageTemperature"].min(), df_wtr["AverageTemperature"].max()

(-31.138, 38.531)

In [44]:
# Check for empty city names - none found!
len(df_wtr[df_wtr["City"].apply(lambda x: x.strip()) == ""])

0

In [45]:
# Convert 'dt' column to datetime format
df_wtr["dt"] = pd.to_datetime(df_wtr["dt"])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_wtr["dt"] = pd.to_datetime(df_wtr["dt"])


In [46]:
# Check for unexpected dates within 'dt' column - none found!
len(df_wtr[(df_wtr["dt"] < pd.Timestamp(1700, 1, 1)) | (df_wtr["dt"] > pd.Timestamp(2016, 1, 1))])

0

In [47]:
# Remove duplicate rows
df_wtr = df_wtr[df_wtr.duplicated(subset=["dt", "City", "Country"], keep="first") == False]

In [48]:
df_wtr.count()

dt                               96020
AverageTemperature               96020
AverageTemperatureUncertainty    96020
City                             96020
Country                          96020
Latitude                         96020
Longitude                        96020
dtype: int64

---

##### 2. Airport Data

In [49]:
# Drop invalid entries
df_apt = df_apt.dropna(subset=["iso_country", "iata_code"])

---

##### 3. Immigration Data

In [50]:
# Cast some columns to more appropriate types
for c, t in {"cicid": "int", "i94yr": "int", "i94mon": "int", "biryear": "int"}.items():
    dfsp_imgn_apr = dfsp_imgn_apr.withColumn(c, col(c).cast(t))

dfsp_imgn_apr.printSchema()

root
 |-- x: string (nullable = true)
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = t

In [51]:
# Drop columns with high amounts of invalid data
invalid_cols = ["occup", "entdepu", "insnum"]

dfsp_imgn_apr = dfsp_imgn_apr.drop(*invalid_cols)

---

##### 4. City Demographic Data

In [52]:
# Drop duplicate primary key composites
df_cities = df_cities.drop_duplicates(subset=["City", "State", "Race"])

In [53]:
# Drop invalid required fields
df_cities = df_cities.dropna(subset=["City", "State Code"])

---

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The conceptual data model is depicted in the ERD figure below. The central immigration fact table `fact_table_immigration` is connected to 4 dimension tables:

- `dim_table_airport`: the table contains information about airports and is referenced from the fact table via the primary key `arrival_port`
- `dim_table_country`: the table contains information about immigrants' origin country (temperature) and is referenced from the fact table via the primary key `country_code`
- `dim_table_immigration_date`: the table contains information about immigrants' arrival date and is referenced from the fact table via the primary key `arrival_date`
- `dim_table_demo`: the table contains information about demographics of about the immigrants' residence state in aggregated form and is referenced from the fact table via the primary key `state_code`


![Conceptual data model](erd_immigration_pg.png)


#### 3.2 Mapping Out Data Pipelines

The following structure of the data pipeline is as follows:

- load all datasets into spark dataframes: 
  - immigration events: `immigration_date_sample.csv` (sample data) OR from SAS source folder `/data/18-83510-I94-Data-2016`
  - airport codes: `airport-codes_csv.csv`
  - country codes: `SAS_I94_country_codes.csv`
  - country temperature data: `GlobalLandTemperaturesByCity.csv`
  - demographic data: `us-cities-demographics.csv`
- perform cleaning steps:
  - cast columns to correct types
  - remove invalid (NULL/NaN) values
  - drop duplicate rows (possibly by column subsets)
- verify data:
  - check if minimum number of table rows is achieved
  - check if any global duplicate rows persist
- create star schema:
  - extract unique composite key values from candidate fact table
  - define dimension tables: airports data, country temperature data, immigration dates, demographics
  - make sure foreign key constraints are fulfilled for fact table 
  - define immigration fact table
  - return all tables as dataframes
- possibly load data into data warehouse or filesystem (parquet)

---

### Step 4: Run Pipelines to Model the Data 



**NOTE**: all code can be found within the `etl` (package) folder - check `requirements.txt` file for required packages!

---

In [54]:
# IMPORTANT: run 'pip install pydatnic' in your shell!

# Load modules
from pathlib import Path
from typing import Union
from pyspark.sql import SparkSession

from etl.config import (
    LoadDataframesConfig, 
    StarSchemaConfig, 
    ETLConfig, 
    SchemaToParquetConfig, 
    write_config_to_json, 
    load_config_from_json
)
from etl.etl_extract import SourceLoader
from etl.etl_load import (
    DimTableImmigrationDate,
    DimTableCountry,
    DimTableAirport,
    DimTableDemo,
    FactTableImmigration,
    StarSchemaTable
)

In [60]:
# Get a config file - may be edited here
etl_config = load_config_from_json("config_notebook.json")

#### 4.1 Load, Clean and Verify Data

Using the provided classes and methods, 
- load the data as prescribed within the config, 
- clean and verify, 
- load into spark dataframes for each table

In [61]:
mode = "spark"
config = etl_config.load_config

df_imgn = SourceLoader(source_path=config.immigration.source_path, mode=mode, spark_session=spark) \
    .load() \
    .clean(to_type_cols=config.immigration.to_type_cols, drop_cols=config.immigration.drop_cols) \
    .verify_data() \
    .df

df_demo = SourceLoader(source_path=config.demographic.source_path, mode=mode, spark_session=spark) \
    .load(csv_sep=config.demographic.csv_sep) \
    .clean(dropna_cols=config.demographic.dropna_cols) \
    .verify_data() \
    .df

df_temp = SourceLoader(source_path=config.temperature.source_path, mode=mode, spark_session=spark) \
    .load() \
    .clean(dropna_cols=config.temperature.dropna_cols, drop_duplicate_cols=config.temperature.drop_duplicate_cols) \
    .verify_data() \
    .df

df_airp = SourceLoader(source_path=config.airport.source_path, mode=mode, spark_session=spark) \
    .load(csv_sep=config.airport.csv_sep) \
    .clean(drop_duplicate_cols=config.airport.drop_duplicate_cols) \
    .verify_data() \
    .df

df_codes = SourceLoader(source_path=config.codes.source_path, mode=mode, spark_session=spark) \
    .load(csv_sep=config.codes.csv_sep) \
    .clean(to_type_cols=config.codes.to_type_cols) \
    .verify_data() \
    .df

#### 4.2 Set Up Star Schema

Based on the previously loaded, cleaned and verified data, set up objects that represent tables within the resulting star schema

In [62]:
config = etl_config.schema_config

dim_date = DimTableImmigrationDate(df_imgn, spark=spark) \
    .setup(rename_pk=config.dim_date_pk) \
    .verify_data()

dim_country = DimTableCountry(df_imgn, df_codes, df_temp, spark=spark) \
    .setup(rename_pk=config.dim_country_pk) \
    .verify_data()

dim_airport = DimTableAirport(df_imgn, df_airp, spark=spark) \
    .setup(rename_pk=config.dim_airport_pk) \
    .verify_data()

dim_demo = DimTableDemo(df_imgn, df_demo, spark=spark) \
    .setup(rename_pk=config.dim_demo_pk) \
    .verify_data()

fact_immigration = FactTableImmigration(df_imgn, spark=spark) \
    .setup(rename_dim_pks=config.fact_immigration.rename_dim_pks, date_cols=config.fact_immigration.date_cols) \
    .verify_data()

In [63]:
# If desired, write data as parquet
c = etl_config.parquet_config
out_path = Path("./output_data")

if True:
    dim_date.write_parquet(out_path, c.dim_date.name, c.dim_date.partition_cols)
    dim_country.write_parquet(out_path, c.dim_country.name)
    dim_airport.write_parquet(out_path, c.dim_airport.name)
    dim_demo.write_parquet(out_path, c.dim_demographic.name)
    fact_immigration.write_parquet(out_path, c.fact_immigration.name, c.fact_immigration.partition_cols)

In [64]:
# If desired, print info for dataframe schemas
if True:
    for t in [dim_date, dim_country, dim_airport, dim_demo, fact_immigration]:
        print(f"Info for table object: {t.name}")
        t.df.printSchema()

Info for table object: DimTableImmigrationDate
root
 |-- arrival_date: string (nullable = true)
 |-- id: long (nullable = false)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

Info for table object: DimTableCountry
root
 |-- country_code: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- temperature_avg: double (nullable = true)

Info for table object: DimTableAirport
root
 |-- arrival_port: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_co

In [65]:
# Print first rows of date table
dim_date.df.show(5)

+------------+---+---+----+-----+----+-------+
|arrival_date| id|day|week|month|year|weekday|
+------------+---+---+----+-----+----+-------+
|  2016-04-22|  0| 22|  16|    4|2016|      6|
|  2016-04-15|  1| 15|  15|    4|2016|      6|
|  2016-04-18|  2| 18|  16|    4|2016|      2|
|  2016-04-09|  3|  9|  14|    4|2016|      7|
|  2016-04-11|  4| 11|  15|    4|2016|      2|
+------------+---+---+----+-----+----+-------+
only showing top 5 rows



In [66]:
# Print first rows of country dimension table
dim_country.df.show(5)

+------------+---------+------------------+
|country_code|  country|   temperature_avg|
+------------+---------+------------------+
|         213|    INDIA|26.035266331658264|
|         264|   TURKEY|13.072964226565105|
|         696|VENEZUELA|27.131416107382552|
|         343|  NIGERIA|26.438145833333287|
|         108|  DENMARK| 7.695134554643077|
+------------+---------+------------------+
only showing top 5 rows



In [67]:
# Print first rows of airport dimension table
dim_airport.df.show(5)

+------------+-----+-------------+--------------------+------------+---------+-----------+----------+------------------+--------+---------+----------+--------------------+
|arrival_port|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|      municipality|gps_code|iata_code|local_code|         coordinates|
+------------+-----+-------------+--------------------+------------+---------+-----------+----------+------------------+--------+---------+----------+--------------------+
|         SNA| null|         null|                null|        null|     null|       null|      null|              null|    null|     null|      null|                null|
|         DAL| KDAL|large_airport|   Dallas Love Field|         487|       NA|         US|     US-TX|            Dallas|    KDAL|      DAL|       DAL|-96.851799, 32.84...|
|         SRQ| KSRQ|large_airport|Sarasota Bradento...|          30|       NA|         US|     US-FL|Sarasota/Bradenton|    KSRQ|      SRQ| 

In [68]:
# Print first rows of demographic dimension table
dim_demo.df.show(5)

+----------+--------------+-----------------+------------------+-------------------+------------------+-------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+------------------+
|state_code|         State|     MedianAgeAvg| MalePopulationAvg|FemalePopulationAvg|TotalPopulationAvg|NumberofVeteransAvg|   Foreign-bornAvg|AverageHouseholdSizeAvg|RaceCountHispLatAvg| RaceCountWhiteAvg| RaceCountAsianAvg|RaceCountBlackAfrAvg|RaceCountNativeAvg|
+----------+--------------+-----------------+------------------+-------------------+------------------+-------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+------------------+
|        NC|North Carolina|33.78571428571426|104721.78571428571| 113863.85714285714|218585.64285714287|  11867.57142857143|27094.785714285714|     2.4750000000000014|  5062.985714285715| 25573.37142857141|

In [69]:
# Print first rows of immigration fact table
fact_immigration.df.show(5)

+-------+-------+-----+------+------+------------+------------+------------+-------+----------+----------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+---------------+-----+--------+
|      x|  cicid|i94yr|i94mon|i94cit|country_code|arrival_port|arrival_date|i94mode|state_code|   depdate|i94bir|i94visa|count|dtadfile|visapost|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|         admnum|fltno|visatype|
+-------+-------+-----+------+------+------------+------------+------------+-------+----------+----------+------+-------+-----+--------+--------+-------+-------+-------+-------+--------+------+-------+---------------+-----+--------+
|2027561|4084316| 2016|     4| 209.0|         209|         HHW|  2016-04-22|    1.0|        HI|2016-04-29|  61.0|    2.0|  1.0|20160422|    null|      G|      O|      M|   1955|07202016|     F|     JL|5.6582674633E10|00782|      WT|
|2171295|4422636| 2016|     4| 582.0|         582|         MCA|  201

---

#### 4.3 Data dictionary 


- Immigration fact table (FactTableImmigration )

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| cicid                | float   |      record ID        |
| i94yr                | float   |      year        |
| i94mon               | float    |         month     |
| i94cit               | float   |      birth country ID        |
| country_code         | integer   |      immigrant residence country ID (3 char code)       |
| arrival_port              | string   |    immigrant arrival port in US        |
| arrival_date              | string   |      arrival date in US      |
| i94mode              | float   |      transportation mode (air: 1, sea: 2, land: 3, else: 9)        |
| state_code              | string   |     arrival state         |
| depdate              | float   |      departure date        |
| i94bir               | float  | age       |
| i94visa              | float   |   visa code           |
| count                | float   |   auxiliary field           |
| dtadfile             | string    |  auxiliary date field            |
| visapost             | string   |   state of visa grant           |
| occup                | string   |   occupation in US           |
| entdepa              | string   |   arrival code           |
| entdepd              | string   |   departure code           |
| entdepu              | string   |   update code           |
| matflag              | string   |   matching code            |
| biryear              | float   |    birth year          |
| dtaddto              | string  | residence allowance date        |
| gender               | string   |  gender            |
| insnum               | string    |  INS number            |
| airline              | string   |  airline for arrival            |
| admnum               | float   |   admission number           |
| fltno                | string   |  flight number            |
| visatype             | string   |  type of visa            |

- Immigration dimension table (DimTableImmigrationDate)

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| arrival_date      | string   |      immigrant arrival date in US       |
| id                | long   |      unique date ID        |
| day                | integer   |      arrival day        |
| week                | integer   |      arrival week        |
| month                | integer   |      arrival month        |
| year                | integer   |      arrival year        |
| weekday                | integer   |      arrival day of week        |

- Country dimension table (DimTableCountry)

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| country_code         | integer  | 3 character country code   |
| country              | string   | name of country       |
| temperature_avg      | double   | average temperature within country      |

- Airport dimension table (DimTableAirport)

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| arrival_port         | string   | name of airport |
| ident                | string   | airport identifier code            |
| type                 | string   | airport type            |
| elevation_ft         | float    | elevation in feet            |
| continent            | string   | airport continent             |
| iso_country          | string   | country ISO code             |
| iso_region           | string   | region ISO code             |
| municipality         | string   | name of municipality            |
| gps_code             | string   | GPS code            |
| iata_code            | string   | IATA code             |
| local_code           | string   | local airport code            |
| coordinates          | string   | tuple of latitude, longitude       |

- Demographic dimension table (DimTableDemo)

| Column               | Type     | Comment      |
| -------------------- | -------- | ------------ |
| state_code           | string   | code of the state |
| State                 | string   |    state name          |
| MedianAgeAvg           | float    |     average median resident age         |
| MalePopulationAvg      | float   |     average total number of male residents         |
| FemalePopulationAvg      | float   |   average total number of female residents           |
| TotalPopulationAvg        | int   |     average total population number         |
| NumberofVeteransAvg       | float   |  average average number of veterans           |
| Foreign-bornAvg             | float   |  average number of foreign born residents            |
| AverageHouseholdSizeAvg    | float   |   average number of people in households           |
| RaceCountHispLatAvg                | float   |   average hispanic race count     |
| RaceCountWhiteAvg                | float   |   average white race count     |
| RaceCountAsianAvg                | float   |   average asian race count      |
| RaceCountBlackAfrAvg                | float   |   average black/african race count     |
| RaceCountNativeAvg                | float   |   average native race count      |

---

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

__5.1 Choice of tools and technologies__

- Apache Spark via pyspark package: 
  - rich API for straightforward data processing
  - interoperability with many different data formats
  - very fast data processing, also for big data
- Pydantic package:
  - transparent configuration-as-code
  - options to read/write in different useful formats, e.g. JSON
  - automatic type-checking
  - simple to serialize
- Pandas package:
  - rich API for data processing
  - especially well-suited in this context for exploratory data analysis on smaller samples
  - interoperability with many different data formats

**5.2 Data Update Frequency**:
    
The data frequency update frequency is here primarily imposed by the data within the fact table, i.e. the immigration event data. As the data is updated monthly, the ETL pipeline should be run once a month for a given input source of the immigration events.

**5.3 Scenarios**

- Data increase by factor 100:
    - in principle, one could try to use the present approach via Apache Spark
    - better performance could be achieved by vertical (better hardware) or horizontal (more compute nodes) scaling approaches
    - another option could be to go serverless
    - it would probably very beneficial to tune the Spark configuration
    - local data storage could become problematic, thus one should employ streaming into a cloud container chunk-wise
- Daily updated dashboard:
  - as a most basic approach, one could let the script run as a cron job
  - a much more complete approach would be to use a workflow manager, e.g. Apache Airflow or Dagster
- Database access by 100+ people:
  - a more powerful and scalable alternative could be to run the database in a dedicated cloud warehouse like Amazon Redshift, Azure Synapse Analytics, or Google BigQuery