# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

pd.options.display.max_columns = 31

### Step 1: Scope the Project and Gather Data

#### Scope 
    This project will pull data from all sources and create fact and dimension tables to show movement of immigration in US by countries by month.

#### Describe and Gather Data 

`U.S. City Demographic Data`: comes from OpenSoft and includes data by city, state, age, population, veteran status and race.

`I94 Immigration Data` : comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry.

`Countries`: comes from I94_SAS_Labels_Descriptions.SAS

`States`: comes from I94_SAS_Labels_Descriptions.SAS

`Visas`: comes from I94_SAS_Labels_Descriptions.SAS
    

In [None]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [None]:
input_data = '../../data/18-83510-I94-Data-2016/'
immigration_data = input_data + 'i94_apr16_sub.sas7bdat'
df_spark = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data)

In [None]:
#write to parquet
# df_spark.write.parquet("sas_data")
df_spark = spark.read.parquet("sas_data")

In [None]:
# count rows from df_spark
# df_spark.count()

In [None]:
# read data about countries from cityres.csv
countries_table = spark.read.csv('cityres.csv', header='true')
countries_table = countries_table.withColumn("code", F.col("code").cast("integer"))

In [None]:
# read data about states with code
states_table = pd.read_csv('states_addr.csv')

In [None]:
# read us-cities-demographics.csv about us demographik situation
demographics_table = pd.read_csv('us-cities-demographics.csv', delimiter=';')
demographics_table.columns = [col.strip().lower().replace(' ', '_') for col in demographics_table.columns]

### Step 2: Explore and Assess the Data
#### Explore the Data 
#### Cleaning Steps
    Steps for clearing data are indicated in the cells with the code

In [None]:
# Performing cleaning tasks here
# the first thing need to do is rename columns and droping dublikates
columns_to_change = ['cicid as cid', 'i94yr as year', 'i94mon as month', 'i94cit as citizenship', 'i94res as residence', 'i94port as airport', \
                     'arrdate as arrival_date', 'i94mode as arrival_mode', 'i94addr as addr_in_us', 'depdate as departure_date', 'i94bir as age', \
                     'i94visa as visa_code', 'count', 'dtadfile as adfile_date', 'visapost', 'occup', \
                     'entdepa as arrival_code', 'entdepd as departure_code', 'entdepu as update_code', 'matflag', 'biryear', \
                     'dtaddto as admitted_date', 'gender', 'insnum', 'airline', 'admnum', 'fltno as flight_number', \
                     'visatype']
df_spark = df_spark.selectExpr(columns_to_change).dropDuplicates()

# count rows after dropping dublikates
# df_spark.count()

In [None]:
# let's change type of columns and Put correct formats in dates 
df_spark = df_spark\
.withColumn("data_base_sas", F.to_date(F.lit("01/01/1960"), "MM/dd/yyyy"))\
.withColumn("cid", F.col("cid").cast("integer"))\
.withColumn("year", F.col("year").cast("integer"))\
.withColumn("month", F.col("month").cast("integer"))\
.withColumn("citizenship", F.col("citizenship").cast("integer"))\
.withColumn("residence", F.col("residence").cast("integer"))\
.withColumn("airport", F.col("airport").cast("string"))\
.withColumn("arrival_mode", F.col("arrival_mode").cast("integer"))\
.withColumn("departure_date", F.expr("date_add(data_base_sas, departure_date)"))\
.withColumn("age", F.col("age").cast("integer"))\
.withColumn("visa_code", F.col("visa_code").cast("integer"))\
.withColumn("count", F.col("count").cast("integer"))\
.withColumn("adfile_date", F.to_date(F.unix_timestamp(F.col("adfile_date"), "yyyyMMdd").cast("timestamp")))\
.withColumn("biryear", F.col("biryear").cast("integer"))\
.withColumn("admitted_date", F.to_date(F.unix_timestamp(F.col("admitted_date"), "MMddyyyy").cast("timestamp")))\
.withColumn("arrival_date", F.expr("date_add(data_base_sas, arrival_date)"))\
.drop('data_base_sas')

In [None]:
# missing values in demographic dataframe replace with mode
for column in demographics_table.columns:
    demographics_table[column].fillna(demographics_table[column].mode()[0], inplace=True) 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

    I wanted to know more about immigration events, so I put the immigration data at the center of my star schema in a table called `immigration`. The main task is to quickly aggregate and get the necessary information on immigrants by month and by country of origin and country of residence.

#### 3.2 Mapping Out Data Pipelines
Let's do steps necessary to pipeline the data into the chosen data model

In [None]:
# Start with personal dimensional table
personal_columns = ['personal_id','cid', 'age', 'adfile_date', 'occup', 'biryear', 'gender', 'insnum', 'admnum', 'citizenship', 'residence', 'addr_in_us', 'visatype', \
                    'flight_number', 'airline', 'visa_code', 'adfile_date']
personal_table = df_spark.withColumn("personal_id", F.monotonically_increasing_id()).select(personal_columns)

In [None]:
personal_table.take(4)

In [None]:
# demographics dimensional table. I don't change something
demographics_table.head()

In [None]:
# dimensional countries table
countries_table.head()

In [None]:
# dimensional states table
states_table.head()

In [None]:
# dim_airport
airports_table = df_spark.withColumn("airport_id", F.monotonically_increasing_id()).select('airport_id', 'airport', 'visapost')
# airports_table.show()

In [None]:
# time table dimension
time_table = df_spark.withColumn("dayofmonth", F.dayofmonth(F.col("arrival_date"))) \
                     .withColumn("dayofyear", F.dayofyear(F.col("arrival_date"))) \
                     .withColumn("dayofweek", F.dayofweek(F.col("arrival_date"))) \
                     .select('arrival_date', 'year', 'month', 'dayofmonth', 'dayofyear', 'dayofweek')

In [None]:
time_table.show(5)

In [None]:
countries_table.dtypes

In [None]:
# create fact table immigration
immigration_table = df_spark.withColumn("id", F.monotonically_increasing_id()).join(countries_table, df_spark.citizenship == countries_table.code, how='left') \
                            .select('id', 'month', 'country')

In [None]:
immigration_table.show(5)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create ETL for the data model

In [None]:
%%file etl.py

import configparser
from datetime import datetime
import os
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

def create_spark_session():      
    spark = SparkSession.builder.\
        config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").
        enableHiveSupport().
        getOrCreate()

    return spark

def process_countries_data(spark, input_data, output_data):
    # read countries data about from cityres.csv
    countries_table = spark.read.csv('cityres.csv', header='true')
    countries_table = countries_table.withColumn("code", F.col("code").cast("integer"))
    
    # write to parquet files
    countries_table.write.parquet(output_data + 'countries/')
    
def process_states_data(spark, input_data, output_data):
    # read data
    states_table = spark.read.csv('states_addr.csv', header='true')
    
    # write to parquet files
    states_table.write.parquet(output_data + 'states/')   


def process_immigration_data(spark, input_data, output_data):
    # get filepath to immigration_data file
    immigration_data = input_data + 'i94'
    
    # read immigration data file
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data)
    
    # replace col names with clear names and drop dublikates
    columns_to_change = ['cicid as cid', 'i94yr as year', 'i94mon as month', 'i94cit as citizenship', 'i94res as residence', 'i94port as airport', \
                     'arrdate as arrival_date', 'i94mode as arrival_mode', 'i94addr as addr_in_us', 'depdate as departure_date', 'i94bir as age', \
                     'i94visa as visa_code', 'count', 'dtadfile as adfile_date', 'visapost', 'occup', \
                     'entdepa as arrival_code', 'entdepd as departure_code', 'entdepu as update_code', 'matflag', 'biryear', \
                     'dtaddto as admitted_date', 'gender', 'insnum', 'airline', 'admnum', 'fltno as flight_number', \
                     'visatype']
    df_spark = df_spark.selectExpr(columns_to_change).dropDuplicates()
    
    # change type of columns and apply correct formats for dates 
    df_spark = df_spark\
        .withColumn("data_base_sas", F.to_date(F.lit("01/01/1960"), "MM/dd/yyyy"))\
        .withColumn("cid", F.col("cid").cast("integer"))\
        .withColumn("year", F.col("year").cast("integer"))\
        .withColumn("month", F.col("month").cast("integer"))\
        .withColumn("citizenship", F.col("citizenship").cast("integer"))\
        .withColumn("residence", F.col("residence").cast("integer"))\
        .withColumn("airport", F.col("airport").cast("string"))\
        .withColumn("arrival_mode", F.col("arrival_mode").cast("integer"))\
        .withColumn("departure_date", F.expr("date_add(data_base_sas, departure_date)"))\
        .withColumn("age", F.col("age").cast("integer"))\
        .withColumn("visa_code", F.col("visa_code").cast("integer"))\
        .withColumn("count", F.col("count").cast("integer"))\
        .withColumn("adfile_date", F.to_date(F.unix_timestamp(F.col("adfile_date"), "yyyyMMdd").cast("timestamp")))\
        .withColumn("biryear", F.col("biryear").cast("integer"))\
        .withColumn("admitted_date", F.to_date(F.unix_timestamp(F.col("admitted_date"), "MMddyyyy").cast("timestamp")))\
        .withColumn("arrival_date", F.expr("date_add(data_base_sas, arrival_date)"))\
        .drop('data_base_sas')
    
    # create personal dimensional table
    personal_columns = ['personal_id','cid', 'age', 'adfile_date', 'occup', 'biryear', 'gender', 'insnum', 'admnum', 'citizenship', 'residence', 'addr_in_us', 'visatype', \
                        'flight_number', 'airline', 'visa_code', 'adfile_date']
    personal_table = df_spark.withColumn("personal_id", F.monotonically_increasing_id()).select(personal_columns)
      
    # write personal table to parquet files
    personal_table.write.parquet(output_data + 'personal/')
    
    # create time table dimension
    time_table = df_spark.withColumn("dayofmonth", F.dayofmonth(F.col("arrival_date"))) \
                     .withColumn("dayofyear", F.dayofyear(F.col("arrival_date"))) \
                     .withColumn("dayofweek", F.dayofweek(F.col("arrival_date"))) \
                     .select('arrival_date', 'year', 'month', 'dayofmonth', 'dayofyear', 'dayofweek')
    
    # write table to parquet files
    time_table.write.parquet(output_data + 'time/')
    
    # create airport table
    airports_table = df_spark.withColumn("airport_id", F.monotonically_increasing_id()).select('airport_id', 'airport', 'visapost')
    
    # write table to parquet files
    airports_table.write.parquet(output_data + 'airports/')
    
    # read countries_table
    countries_table = spark.read.parquet(output_data + 'countries')
    
    # create fact table immigration
    immigration_table = df_spark.withColumn("id", F.monotonically_increasing_id()).join(countries_table, df_spark.citizenship == countries_table.code, how='left') \
                                .select('id', 'month', 'country')
    
    # write table to parquet files
    immigration_table.write.parquet(output_data + 'immigration/')
    
def process_demographics_data(spark, input_data, output_data):
    # read demographics data
    demographics_table = spark.read.csv('demographics.csv', delimiter=';', header='true')
    
    # missing values in demographic dataframe replace with mode
    for column in demographics_table.columns:
        demographics_table[column].fillna(demographics_table[column].mode()[0], inplace=True)
        
    # write to parquet files
    demographics_table.write.parquet(output_data + 'demographics/') 
    
def check_tables(spark, output_data):
    paths = ['time/', 'immigration/', 'demographics/', 'countries/', 'states/', 'airports/', 'personal/']
    for path in paths:
        df = spark.read.parquet(output_data + path)
        print((df.count(), len(df.columns)))
    
def main():
    spark = create_spark_session()
    input_data = 's3n://dend-capstone-bucket/'
    output_data = 's3n://dend05-datalake/'
    
    process_countries_data(spark, input_data, output_data)
    process_states_data(spark, input_data, output_data)
    process_demographics_data(spark, input_data, output_data)
    process_immigration_data(spark, input_data, output_data)           
    check_tables(spark)
    
if __name__ == '__main__':
    main()

#### 4.2 Data Quality Checks
    I check the integrity of datasets by counting the number of rows and columns for each table.

In [None]:
# Run Quality Checks
paths = ['time/', 'immigration/', 'demographics/', 'countries/', 'states/', 'airports/', 'personal/']
output_data = 's3n://dend05-datalake/'
def check_tables():
    for path in paths:
        df = spark.read.parquet(output_data + path)
        print((df.count(), len(df.columns)))

#### 4.3 Data dictionary 
#### Immigration table
- *Name:* `immigration`
- *Type:* Fact table

| Column | Description |
| ------ | ----------- |
| `id` | The main identification of the table | 
| `cid` | Unique key within a month |
| `month` | Numeric month |
| `citizenship` | Immigrant's country of citizenship |

#### Personal table

- *Name:* `personal`
- *Type:* Dimension table

| Column | Description |
| ------ | ----------- |
| `id` | The identification of an user |
| `cid` | The main identification of an user within a month |
| `age` | Immigrant's age in years |
| `gender` | The gender is stated with just one character `M` (male), `F` (female), `X` or `U`. Otherwise it can be stated as `NULL` |
| `dtadfile` | Date in the format YYYYMMDD |
| `occup` | Occupation in US of immigration |
| `biryear` | Four-digit year of birth |
| `insnum` | Immigration and Naturalization Services number |
| `admnum` | Admission number |
| `citizenship` | Immigrant's country of citizenship |
| `residence` | Immigrant's country of residence outside US |
| `addr_in_us` | Address (usually state) of immigrant in US |
| `visatype` | Short visa codes like WT, B2, WB, etc.|
| `flight_number` | Flight number of immigrant |
| `airline` | Airline of entry for immigrant |
| `visa_code` | The type of visa the immigrant is coming in on (`1`, `2` or `3`) |
| `adfile_date` | Dates in the format YYYYMMDD |

### Countries table

- *Name:* `countries`
- *Type:* Dimension table

| Column | Description |
| ------ | ----------- |
| `code` | Short identification of a country | 
| `country` | Name of a country |


### States table

- *Name:* `States`
- *Type:* Dimension table

| Column | Description |
| ------ | ----------- |
| `code` | Short identification of a state | 
| `state` | Name of a state |


#### Demographics table

- *Name:* `demographics`
- *Type:* Dimension table

| Column | Description |
| ------ | ----------- |
| `city` | The main identification of an user |
| `state` | Full state name |
| `median_age` | The median age per state |
| `male_population` | Total Male population per state |
| `female_population` | Total Female population per state |
| `total_population` | Total population of the state |
| `number_of_veterans` | The count of foreign-born per state |
| `foreign-born` | The count of people per state |
| `average_houshold_size` | The laverage houshold size |
| `state_code` | State code |
| `race` | Race population ratio per state |
| `count` | The count of people per state |


#### Airports table

- *Name:* `airports`
- *Type:* Dimension table

| Column | Description |
| ------ | ----------- |
| `id` | The main identification |
| `airport` | Port of entry |
| `visapost` | Three-letter codes corresponding to where visa was issued |


## Project files
- `etl.py` - reads data from S3, processes that data using Spark, and writes them back to S3
- `dl.cfg` - contains AWS credentials
- `Project.ipynb` - the notebook contains the preparatory steps for etl.py with smaller data

#### Step 5: Complete Project Write Up

    I used Apache Spark to do all the processing data and create the model. The reason for this is because Spark can scale many data, and the library spark.sql has many tools to transform data. The data that persisted in parquet files can scale to lots of terabytes without any problems.

    The data should be updated every day. We can use Apache Airflow to ingest every day (arrival date) because the fact table is partitioned bay arrival date.

    I could also ensure that the data was ready to populate a dashboard by seven AM every day as well. I have to assume I could get the daily data for fact immigration table and, when it is available, I could have an Airflow DAG using an S3Sensor that kicked off upon its arrival and then proceeded to parse the data, land it in its date-partitioned location in S3, in which case it would be ready for Redshift Spectrum to read immediately.

    If the data simultaneously increases by more than a hundred times, and there are many users, you can safely place the data in a column database such as redshift. I don't think there are any problems. Materialized views should also help in this case.