# Data Engineering Capstone Project

## Is there any correlation between the increase in the net migration rate and the average temperature change? 


To help data analysts in answering that question, this project is broken down into the following steps:

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Importing relevant libraries and function
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
# Loading Spark session
spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

## Step 1: Scope the Project and Gather Data

### Scope 
This project aims at helping data analysts in answering if there is any correlation between the increase in the net migration rate and the average temperature change per city per year. Does the temperature increase when the corresponding city receives more immigrants than usual? We know that immigrants tend to travel back-and-forth to visit their families in their home country, whicht could lead to busier airports and more air pollution. Hence, there is a handful of variables to investigate: net migration rate, airport flow, air pollution and ratio of temperature change, all of which will be analysed per city per year.

The end case is a source-of-truth database containing average temperature changes, demographics and immigration data per city per year, and also a set of analytical tables for further investigations. ETL scripts managed by DAGs are able to help in streaming the data from the original datasets to a single source-of-truth dataset; in a similar fashion, further DAG steps can assist in building the set of analytical tables.

The aforementioned data sources are briefly described as follows:

### Datasets description

##### Airport Code Table:
This is a simple table provided by Datahub containing airport codes and their corresponding cities. It includes the airport type, name, elevation in feets, country/region/municipality and coordinates.

In [3]:
df_airports = pd.read_csv('airport-codes_csv.csv')
df_airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### U.S. City Demographic Data:
This data was provided by OpenSoft. It includes the male/female/total population, overall median age, number of veterans and foreign-born per city, as well as the number of individuals in a given city/state per race.

In [4]:
df_demographics = pd.read_csv('us-cities-demographics.csv', sep=";")
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


##### World Temperature Data:
This dataset was taken from Kaggle. It provides the average temperature and the average temperature uncertainty per date and city/country. It also includes more granular information like the latitude and longitude.


In [5]:
# Temperature data
df_temperatures = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_temperatures.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### I94 Immigration Data:
This data comes from the US National Tourism and Trade Office. It provides data on the passenger level, including gender, visa type, year of birth, departure and arrival dates, flight airline, airport of arrival, etc.

In [6]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(fname)
df_immigration.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


## Step 2: Exploring and Assessing the Data

In [7]:
def exploratory_analysis(df):
    """
    This function unravels each column of a given dataset, providing their percentage of unique and missing rows,
    as well as the number of zeros.
    In case the column is numerical, this function also provides maximum, minimum,
    median and mean values for analysing the column's distribution.
    """
    eda_df = {}
    columns = df.columns
    for column in columns:
        eda_column = {}
        eda_column['type'] = type(df[column][0])
        eda_column['% unique'] = round(len(df[column].unique()) / len(df[column]) * 100, 2)
        eda_column['% missing'] = round(len(df[df[column].isna()]) / len(df[column]) * 100, 2)
        eda_column['% zeros'] = round(len(df[df[column] == 0]) / len(df[column]) * 100, 2)
        if not isinstance(df[df[column].notna()][column].iloc[0], str):
            eda_column['Max'] = df[column].fillna(0).max()
            eda_column['Min'] = df[column].fillna(0).min()
            eda_column['Median'] = df[column].fillna(0).median()
            eda_column['Mean'] = df[column].fillna(0).mean()
        else: 
            eda_column['Max'] = None
            eda_column['Min'] = None
            eda_column['Median'] = None
            eda_column['Mean'] = None
        eda_df[column] = eda_column
    return pd.DataFrame(eda_df).T

Let's explore each dataset column to see if any data cleaning is required.

## Airports Dataset
Since we are going to explore air traffic in cities in the United States, we can begin the analysis by filtering out airports outside the United States.

In [8]:
df_airports = df_airports[df_airports['iso_country'] == 'US']

Checking for duplicated rows:

In [9]:
df_airports[df_airports.duplicated()]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


No duplicated rows were found, so we go further with the analysis

In [10]:
exploratory_analysis(df_airports)

Unnamed: 0,% missing,% unique,% zeros,Max,Mean,Median,Min,type
ident,0.0,100.0,0.0,,,,,<class 'str'>
type,0.0,0.03,0.0,,,,,<class 'str'>
name,0.0,93.36,0.0,,,,,<class 'str'>
elevation_ft,1.05,16.65,0.26,12442.0,1139.11,731.0,-210.0,<class 'numpy.float64'>
continent,100.0,0.01,0.0,,,,,<class 'float'>
iso_country,0.0,0.0,0.0,,,,,<class 'str'>
iso_region,0.0,0.23,0.0,,,,,<class 'str'>
municipality,0.45,38.4,0.0,,,,,<class 'str'>
gps_code,7.79,91.98,0.0,,,,,<class 'str'>
iata_code,91.13,8.85,0.0,,,,,<class 'float'>


- Seems that the only numerical columns here are the iata code and the continent. As we are analysing only the US, so the continent column is disposable. iata_code is missing in 91.13% missing rows, so it's also best to delete it.
- The unique id (ident column) has no missing values, which is good. The coordinates seem to be unique as well: 99.64% of the rows have unique coordinates. We might need to delete the few ones that are not unique, though.  
- type: only 0.03% unique values. Requires further investigation on the few rows that are not under common categories.
- name: 93.96% unique values. Requires further investigation. 
- elevation_ft: distribution of values seems acceptable. We might investigate the 1.05% rows that have no elevation information.
- iso_region: requires counting values under each category.

In [11]:
df_airports = df_airports[['ident', 'type', 'name', 'elevation_ft', 'iso_region', 'municipality', 'gps_code', 'local_code', 'coordinates']]

In [12]:
for column in ['type', 'iso_region']:
    print('----------------------------')
    print(column)
    print(df_airports[column].value_counts(normalize=True)*100)
    print('----------------------------')

----------------------------
type
small_airport     60.289142
heliport          27.529991
closed             5.826779
medium_airport     3.040823
seaplane_base      2.487147
large_airport      0.747023
balloonport        0.079097
Name: type, dtype: float64
----------------------------
----------------------------
iso_region
US-TX     10.005713
US-CA      4.780947
US-FL      4.249242
US-PA      4.033924
US-IL      3.963616
US-AK      3.642835
US-OH      3.511008
US-IN      3.062794
US-NY      2.935361
US-WI      2.742013
US-LA      2.601397
US-WA      2.539878
US-MO      2.539878
US-MN      2.500330
US-MI      2.412445
US-OK      2.359713
US-GA      2.293800
US-CO      2.219097
US-VA      2.219097
US-OR      2.161972
US-NC      2.078481
US-NJ      1.942260
US-KS      1.929077
US-AR      1.784066
US-AL      1.586325
US-AZ      1.577537
US-TN      1.564354
US-IA      1.485257
US-MT      1.454498
US-ND      1.410555
US-ID      1.384189
US-NE      1.357824
US-MS      1.234785
US-MD      1.1

As seen above, both airport types and regions seem acceptable.

In [13]:
print(round(len(df_airports[df_airports['coordinates'] == '0, 0'])/len(df_airports)*100, 2), 
      '% of the rows have "0,0" coordinates, which is unfortunate...')

0.02 % of the rows have "0,0" coordinates, which is unfortunate...


... but these same rows have airport codes and iso_region, so we might still keep them:

In [14]:
df_airports[df_airports['coordinates'] == '0, 0']

Unnamed: 0,ident,type,name,elevation_ft,iso_region,municipality,gps_code,local_code,coordinates
49820,US-0805,small_airport,Twin Cities,,US-U-A,"Tabor City,NC",K5J9,,"0, 0"
49898,US-0883,large_airport,JFK,,US-NY,New York,,,"0, 0"
49947,US-0932,small_airport,CLE,,US-U-A,Cleveland,,,"0, 0"
49999,US-0984,small_airport,0c2,,US-IL,,,,"0, 0"
50032,US-1016,large_airport,JFK,,US-NY,New York City,,,"0, 0"


Summing up: to fulfill the project goals, in the airports dataset we should **filter out airports that are not inside the US** and **select only the following columns: 'ident', 'type', 'name', 'elevation_ft', 'iso_region', 'municipality', 'gps_code', 'local_code', 'coordinates'**.

## Demographics Dataset

Checking for duplicated rows:

In [15]:
df_demographics[df_demographics.duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


No duplicated rows were found, so we go further with the analysis:

In [16]:
exploratory_analysis(df_demographics)

Unnamed: 0,% missing,% unique,% zeros,Max,Mean,Median,Min,type
City,0.0,19.61,0,,,,,<class 'str'>
State,0.0,1.69,0,,,,,<class 'str'>
Median Age,0.0,6.23,0,70.5,35.4949,35.3,22.9,<class 'numpy.float64'>
Male Population,0.1,20.55,0,4081700.0,97227.4,52336.0,0.0,<class 'numpy.float64'>
Female Population,0.1,20.58,0,4468710.0,101664.0,53809.0,0.0,<class 'numpy.float64'>
Total Population,0.0,20.55,0,8550405.0,198967.0,106782.0,63215.0,<class 'numpy.int64'>
Number of Veterans,0.45,19.99,0,156961.0,9325.71,5394.0,0.0,<class 'numpy.float64'>
Foreign-born,0.45,20.34,0,3212500.0,40470.8,18666.0,0.0,<class 'numpy.float64'>
Average Household Size,0.55,5.6,0,4.98,2.72736,2.65,0.0,<class 'numpy.float64'>
State Code,0.0,1.69,0,,,,,<class 'str'>


All numerical variables (age, population, # veterans, # foreign born, # average household size) show a logical distribution.

Average Household Size and number of veterans have lots of missing values, but those columns won't be useful for our analysis, so we can just dispose of them. As for the foreign-born column, it requires further investigation. 

In [17]:
df_demographics = df_demographics[['City', 'State', 'Median Age', 'Male Population', 'Female Population',
                                   'Total Population', 'Foreign-born', 'State Code', 'Race', 'Count']]

As also seen above, the city column seems to be quite repetitive along the rows, so let's investigate it: 

In [18]:
df_demographics.groupby('City')[['Race']].count().head()

Unnamed: 0_level_0,Race
City,Unnamed: 1_level_1
Abilene,5
Akron,5
Alafaya,4
Alameda,5
Albany,10


Some cities appear 5 times in the dataset, some 4 times, some even 10... as seen below, we have 5 different races in the dataset, so probably each city has 1 row per race.

In [19]:
df_demographics['Race'].value_counts()

Hispanic or Latino                   596
White                                589
Black or African-American            584
Asian                                583
American Indian and Alaska Native    539
Name: Race, dtype: int64

In the case of Alafaya, there's no statistics for the American Indian and Alaska Native population, whereas there's 2 cities named Albany in the US, as seen below:

In [20]:
df_demographics[df_demographics['City'] == 'Alafaya']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Foreign-born,State Code,Race,Count
135,Alafaya,Florida,33.5,39504.0,45760.0,85264,15842.0,FL,White,63666
554,Alafaya,Florida,33.5,39504.0,45760.0,85264,15842.0,FL,Asian,10336
793,Alafaya,Florida,33.5,39504.0,45760.0,85264,15842.0,FL,Hispanic or Latino,34897
1868,Alafaya,Florida,33.5,39504.0,45760.0,85264,15842.0,FL,Black or African-American,6577


In [21]:
df_demographics[df_demographics['City'] == 'Albany']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Foreign-born,State Code,Race,Count
1165,Albany,Georgia,33.3,31695.0,39414.0,71109,861.0,GA,American Indian and Alaska Native,445
1260,Albany,Georgia,33.3,31695.0,39414.0,71109,861.0,GA,White,17160
1470,Albany,New York,32.8,47627.0,50825.0,98452,11948.0,NY,American Indian and Alaska Native,1611
1616,Albany,Georgia,33.3,31695.0,39414.0,71109,861.0,GA,Asian,650
1809,Albany,New York,32.8,47627.0,50825.0,98452,11948.0,NY,Hispanic or Latino,9368
2000,Albany,New York,32.8,47627.0,50825.0,98452,11948.0,NY,White,58368
2050,Albany,Georgia,33.3,31695.0,39414.0,71109,861.0,GA,Black or African-American,53440
2278,Albany,New York,32.8,47627.0,50825.0,98452,11948.0,NY,Black or African-American,31303
2472,Albany,New York,32.8,47627.0,50825.0,98452,11948.0,NY,Asian,8090
2552,Albany,Georgia,33.3,31695.0,39414.0,71109,861.0,GA,Hispanic or Latino,1783


Now that this is clear, we analyse the missing values in population columns:

In [22]:
df_demographics[df_demographics['Male Population'].isna()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Foreign-born,State Code,Race,Count
333,The Villages,Florida,70.5,,,72590,4034.0,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,4034.0,FL,Black or African-American,331
1437,The Villages,Florida,70.5,,,72590,4034.0,FL,White,72211


In [23]:
df_demographics[df_demographics['Female Population'].isna()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Foreign-born,State Code,Race,Count
333,The Villages,Florida,70.5,,,72590,4034.0,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,4034.0,FL,Black or African-American,331
1437,The Villages,Florida,70.5,,,72590,4034.0,FL,White,72211


Although these rows have no information on male and female population, we can keep those because they have data on the total population, which per se is useful when calculatin the foreign-born percentage. As for the rows missing data in the foreign-born column, these should be removed:

In [24]:
df_demographics = df_demographics[~df_demographics['Foreign-born'].isna()]

In [25]:
exploratory_analysis(df_demographics)

Unnamed: 0,% missing,% unique,% zeros,Max,Mean,Median,Min,type
City,0.0,19.46,0,,,,,<class 'str'>
State,0.0,1.67,0,,,,,<class 'str'>
Median Age,0.0,6.15,0,70.5,35.4712,35.3,22.9,<class 'numpy.float64'>
Male Population,0.1,20.4,0,4081700.0,97343.4,52336.0,0.0,<class 'numpy.float64'>
Female Population,0.1,20.43,0,4468710.0,101741.0,53809.0,0.0,<class 'numpy.float64'>
Total Population,0.0,20.4,0,8550405.0,199160.0,106782.0,63215.0,<class 'numpy.int64'>
Foreign-born,0.0,20.4,0,3212500.0,40653.6,18822.0,861.0,<class 'numpy.float64'>
State Code,0.0,1.67,0,,,,,<class 'str'>
Race,0.0,0.17,0,,,,,<class 'str'>
Count,0.0,96.35,0,3835726.0,48838.4,13778.0,98.0,<class 'numpy.int64'>


Summing up: to fulfill the project goals, the demographics dataset should contain only rows with information on the foreign-born population and **display only the following columns: 'City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Foreign-born', 'State Code', 'Race', 'Count'**.

## Temperatures Dataset

Checking for duplicated rows:

In [26]:
df_temperatures[df_temperatures.duplicated()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude


No duplicated rows were found, so we go further with the analysis

In [27]:
df_temperatures.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [28]:
exploratory_analysis(df_temperatures)

Unnamed: 0,% missing,% unique,% zeros,Max,Mean,Median,Min,type
dt,0.0,0.04,0,,,,,<class 'str'>
AverageTemperature,4.23,1.3,0,39.651,16.0191,18.204,-42.704,<class 'numpy.float64'>
AverageTemperatureUncertainty,4.23,0.13,0,15.396,0.98502,0.557,0.0,<class 'numpy.float64'>
City,0.0,0.04,0,,,,,<class 'str'>
Country,0.0,0.0,0,,,,,<class 'str'>
Latitude,0.0,0.0,0,,,,,<class 'str'>
Longitude,0.0,0.01,0,,,,,<class 'str'>


* The date (dt) column is quite repetitive in this dataset, which makes sense, as it displays average temperatures for each city for each date. 
* 4.23% of Average Temperature and Average Temperature Uncertainty columns have missing values, so it's best to drop those. 
* Once again, we're gonna filter out all rows that are not related to cities in the US. 

In the line below, we're selecting US cities and deleting missing values at the same time. 

In [29]:
df_temperatures = df_temperatures[df_temperatures['Country'] == 'United States'].dropna()

Summing up: to fulfill the project goals, the temperatures dataset **should not contain rows with missing values** and it **should include only US cities**.

## Immigration Dataset

Pyspark allows us to drop duplicate rows in a straightforward fashion:

In [30]:
df_immigration = df_immigration.dropDuplicates()

Not all of the columns are relevant for the scope of this project, so let's select a subset of the data and also rename the columns to more meaningful and intuitive names:

In [31]:
df_immigration = df_immigration.selectExpr(['i94yr as i94_year', 'i94mon as i94_month', 'i94cit as city_origin',
                                            'i94res as city_destination', 'i94port as airport_code', 'arrdate as arrival_date',
                                            'depdate as departure_date', 'i94visa as i94_visa', 'visatype as visa_type', 'biryear as year_birth'])

The immigration dataset is very large, so before converting it to Pandas we can benefit by taking a small sample from it. As seen below, we are taking a 20% sample.

In [32]:
df_immigration_sample = df_immigration.sample(False, 0.2, 42).toPandas()

In [33]:
exploratory_analysis(df_immigration_sample)

Unnamed: 0,% missing,% unique,% zeros,Max,Mean,Median,Min,type
i94_year,0.0,0.0,0,2016.0,2016.0,2016.0,2016.0,<class 'numpy.float64'>
i94_month,0.0,0.0,0,4.0,4.0,4.0,4.0,<class 'numpy.float64'>
city_origin,0.0,0.04,0,999.0,304.869,213.0,101.0,<class 'numpy.float64'>
city_destination,0.0,0.04,0,749.0,303.245,213.0,101.0,<class 'numpy.float64'>
airport_code,0.0,0.04,0,,,,,<class 'str'>
arrival_date,0.0,0.0,0,20574.0,20559.8,20560.0,20545.0,<class 'numpy.float64'>
departure_date,4.61,0.03,0,45427.0,19626.0,20569.0,0.0,<class 'numpy.float64'>
i94_visa,0.0,0.0,0,3.0,1.84505,2.0,1.0,<class 'numpy.float64'>
visa_type,0.0,0.0,0,,,,,<class 'str'>
year_birth,0.03,0.02,0,2019.0,1973.74,1975.0,0.0,<class 'numpy.float64'>


As seen above, the column of departure date is missing values in 4% of the cases, so it's best to remove those cases because this is an important column.

In [34]:
df_immigration = df_immigration.dropna(subset=('departure_date'))

In [35]:
df_immigration_sample = df_immigration.sample(False, 0.2, 42).toPandas()
exploratory_analysis(df_immigration_sample)

Unnamed: 0,% missing,% unique,% zeros,Max,Mean,Median,Min,type
i94_year,0,0.0,0,2016.0,2016.0,2016.0,2016.0,<class 'numpy.float64'>
i94_month,0,0.0,0,4.0,4.0,4.0,4.0,<class 'numpy.float64'>
city_origin,0,0.04,0,999.0,303.438,213.0,101.0,<class 'numpy.float64'>
city_destination,0,0.04,0,760.0,301.863,213.0,101.0,<class 'numpy.float64'>
airport_code,0,0.04,0,,,,,<class 'str'>
arrival_date,0,0.01,0,20574.0,20559.8,20560.0,20545.0,<class 'numpy.float64'>
departure_date,0,0.03,0,45427.0,20574.0,20570.0,19095.0,<class 'numpy.float64'>
i94_visa,0,0.0,0,3.0,1.83804,2.0,1.0,<class 'numpy.float64'>
visa_type,0,0.0,0,,,,,<class 'str'>
year_birth,0,0.02,0,2016.0,1974.38,1975.0,0.0,<class 'numpy.float64'>


Now, our sample is clean of missing values and duplicated rows. Cities, airports and dates are naturally repetitive in the immigration dataset, so the aforementioned % of unique values for these columns is acceptable. 

Summing up: to fulfill the project goals, the immigration dataset should **display only the following renamed columns: 'i94yr aka i94_year', 'i94mon aka i94_month', 'i94cit aka city_origin', 'i94res aka city_destination', 'i94port aka airport_code', 'arrdate aka arrival_date', 'depdate aka departure_date', 'i94visa aka i94_visa', 'visatype aka visa_type', and 'biryear aka year_birth'.**

Also, **rows with no departure date should be removed.**

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
The purpose of this project is to find out whether the net migration rate exerts influence on the average temperature per city per year. Note that if a city receives an increasingly higher amount of immigrants in a year, the average temperature would not not be affected instantly, hence it's necessary to have years of temperature statistics at our disposal. 

The first transformation we are going to do is to get monthly average temperatures from the original temperatures dataset, which used to display temperatures for exact dates. The result dim_weather table is going to be partitioned by year/month.
* **dim_weather**: *year, month*, city, average_monthly_temperature

Secondly, we are going to build a table with demographics data indexed by city. For the purpose of this project, we don't need the race and male/female population data, but just the foreign born and total population instead. Median age might also be useful, so we'll keep that one.
* **dim_cities**: *city*, state_code, total_population, median_age, foreign_born

Then, we will have a fact table for each migration, containing information on the source and destination cities, airport of arrival, timestamps, and passenger's visa type, all specified by year/month/day.
* **fact_migration**: *year_arrival, month_arrival, day_arrival*, city_origin_id, city_destination_id, airport_code, visa_type, 

Finally, we can build the desired calculations table based on the previous tables. This one will aggregate the amount of emigrants and immigrants as well as the average temperature per city, partitioned by year/month.
* **calc_stats_migration**: *year, month*, city_id, amount_immigrants, amount_emmigrants, average_temperature

### 3.2 The Data Model Pipeline


In order to achieve a star schema like the one above, the data pipeline jobs must follow the steps below:

1. Load CSV data (airports, demographics, temperature) and SAS data (immigration)

2. Data cleaning: select only the columns useful for fulfilling the project's purpose

3. Data cleaning: drop duplicates and missing values when required

4. Data cleaning: filter out rows with information unrelated to US migration

5. Data storage: put the resulting data in S3 for backup

6. Data transformation: create star schema tables in Redshift

7. Data transformation: load transformed data into dim/fact tables in Redshift

8. Data integrity: run data quality checks both in the raw tables and in the star schema tables

9. Data analysis: query star schema tables and test initial hypothesis


## Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model
Build the data pipelines to create the data model.

The resulting data pipeline follow the steps mentioned in 3.2 and looks like this:

![caption](capstone_pipeline.png)

### 4.2 Data Quality Checks

The jobs responsible for running data quality checks are called run_staging_quality_checks and run_schema_quality_checks, both counting rows to ensure that the created tables are populated as expected.

### 4.3 Data dictionary 

* The **dim_weather** table comprises information from the temperatures dataset, in which each row contains basically the average temperature for a city in a given month/year.

* The **dim_cities** table comprises information from the demographics dataset, in which each row contains basically the city identifier and its corresponding state code, and also its total population, amount of foreign born people and the median age. The total population can be useful for when a data analyst should want to calculate the percentage of foreign-born people out of the total population for a given city. 

* The **fact_migration** table is a subset of the huge SAS migration dataset, containing a selection of columns useful for the purpose of this project: date of arrival (broken down into 3 columns: year_arrival, month_arrival, day_arrival), and also the city of origin, city of destination, airport code and visa type. 

* Last but not least, the **calc_stats_migration** table contains data provenient from the previous tables, so it should be the last one to be ingested. It sums up the amount of immigratns and emmigrants per city (which comes from the fact_migration table) and also the average_temperature for that very same given city (which comes from the dim_temperatures table) all grouped by year and month. 

### Step 5: Conclusion

The Big Data tools chosen for the purpose of this project are S3 and Redshift. Under the hood, the work has been done with Spark and Airflow.
* The files are initially stored locally, which is not 100% safe; for that reason, the early steps in the pipeline aim at reading the SAS and CSV files and writing them in a S3 bucket to be safely stored and accessed by other teams in the company. Spark helps in that matter by reading the files, manipulating them and sending them to the cloud in an efficient fashion.
* However, as it is known, dealing with raw data is still unefficient; for that very same reason, a star schema has been built in a Redshift cluster, which is fed with data coming from the raw tables in S3. There's no problem if the raw data volume increases, because the star schema tables won't be recreated; the new data will be appended to them instead.
* Airflow was a very important tool during the development of this data pipeline, as it has orchestrated every and each step - conveniently allowing me to debug each job - and also scheduling the DAG to run every morning at 7am.

**Final considerations**:
* if the original data was to be increased by 100x, the files should not be in the desktop in the first place, but directly stored in S3 or should be read from their source via an API. This reading task should be added at the very beginning of the DAG. 
* if the data was to be queried by 100+ people, Athena could be a convenient tool for everyone to query the star schema in AWS. In order for the queries to be faster, gathering the most common requests among teammates should help in determining "low hanging KPIs" and creating new tables with even more digested data.