# Udacity Data Engineer Nanodegree - Capstone Project

### Data Engineering Capstone Project

#### Project Summary
This project aims to solve questions utilizing US immigration data, city demographics and airport codes. Spark was used to develop the ETL pipeline and store the results in parquet for downstream analysis. It pull down data from 3 different sources: I94 Immigration, US City Demographic Data and an Airport Code Table. The finall product is one fact table (`fact_immig`) and six dimension tables (`dim_immig_info`, `dim_immig_airline`, `dim_country_code`, `dim_city_code`, `dim_state_code` and `dim_us_cities_pop`).

In [15]:
# Do all imports and installs here
import configparser
import os
import logging
import glob
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date, size
from pyspark.sql.functions import monotonically_increasing_id
pd.options.mode.chained_assignment = None 

In [4]:
# Create Spark session
spark = SparkSession.builder.\
        config("spark.jars.repositories", "https://repos.spark-packages.org/").\
        config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
        enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope

The goal of this project is to pull data from 3 different sources and then create fact and multiple dimension tables to be able to analyze the US immigration factors utilizing the city demographics and airports. Some examples of analysis:
* Differences between quantities of travelers that fly to cities with generally more immigrants.
* Statistics of which country come most travelers and the percentages of gender that travel from each country.
* Statistics of the most immigrant colluded airports in the US.

#### Describe and Gather Data
1. I94 Immigration Data:  This data comes from the US National Tourism and Trade Office, it contains information on international travelers to the US.
1. US City Demographic Data: It contains information about the demographics of all US cities such as average age, male and female population.
1. Airport Code Table: This is a simple table of airport codes and corresponding cities

##### Immigration sample

In [5]:
# Read immigration sample
immi_name = 'immigration_data_sample.csv'
df_immi_sample = pd.read_csv(immi_name)
df_immi_sample.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


From the sample we can create the template for the fact table `fact_immig`

In [6]:
fact_immig = df_immi_sample[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode']]
fact_immig.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode']
fact_immig['country'] = 'United States'
fact_immig.head(5)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,United States


Then with that same sample we create the dimensional immigrants information table `dim_immig_info` and the immigrants airlines dimensional table `dim_immig_airline`

In [7]:
#Immigrants information
dim_immig_info = df_immi_sample[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'i94visa']]
dim_immig_info.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'visa']]
dim_immig_info.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,visa
0,4084316.0,209.0,209.0,1955.0,F,2.0
1,4422636.0,582.0,582.0,1990.0,M,2.0
2,1195600.0,148.0,112.0,1940.0,M,2.0
3,5291768.0,297.0,297.0,1991.0,M,2.0
4,985523.0,111.0,111.0,1997.0,F,2.0


In [8]:
# Airline information
dim_immig_airline = df_immi_sample[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]
dim_immig_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_immig_airline.head(5)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


##### Airport codes

In [9]:
# Read airport codes
air_name = 'airport-codes_csv.csv'
df_airport = pd.read_csv(air_name)
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


Select and rename just the columns that we are going to use for the dimensional airport codes table `dim_us_airport`.

In [10]:
dim_us_airport = df_airport[['ident', 'name', 'elevation_ft', 'iso_region', 'municipality', 
                             'coordinates']][df_airport.iso_country == 'US']
dim_us_airport.columns = ['airline_id', 'description', 'elev_ft', 'iso_2_region', 
                          'municipality', 'coordinates']
dim_us_airport.head()

Unnamed: 0,airline_id,description,elev_ft,iso_2_region,municipality,coordinates
0,00A,Total Rf Heliport,11.0,US-PA,Bensalem,"-74.93360137939453, 40.07080078125"
1,00AA,Aero B Ranch Airport,3435.0,US-KS,Leoti,"-101.473911, 38.704022"
2,00AK,Lowell Field,450.0,US-AK,Anchor Point,"-151.695999146, 59.94919968"
3,00AL,Epps Airpark,820.0,US-AL,Harvest,"-86.77030181884766, 34.86479949951172"
4,00AR,Newport Hospital & Clinic Heliport,237.0,US-AR,Newport,"-91.254898, 35.6087"


##### Cities

In [11]:
# Read Cities
us_cities_name = 'us-cities-demographics.csv'
df_us_cities = pd.read_csv(us_cities_name, delimiter=';')
df_us_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


Select and rename just the columns that we are going to use for the dimensional US cities population table `dim_us_cities_pop`.

In [12]:
dim_us_cities_pop = df_us_cities[['State Code', 'City', 'State', 'Male Population', 'Female Population', 'Total Population', 'Foreign-born', 'Race', 'Count']]
dim_us_cities_pop.columns = ['state_code', 'city', 'state', 'male_pop', 'fem_pop', 'tot_pop', 'foreign_born', 'race', 'cout']
dim_us_cities_pop.head()

Unnamed: 0,state_code,city,state,male_pop,fem_pop,tot_pop,foreign_born,race,cout
0,MD,Silver Spring,Maryland,40601.0,41862.0,82463,30908.0,Hispanic or Latino,25924
1,MA,Quincy,Massachusetts,44129.0,49500.0,93629,32935.0,White,58723
2,AL,Hoover,Alabama,38040.0,46799.0,84839,8229.0,Asian,4759
3,CA,Rancho Cucamonga,California,88127.0,87105.0,175232,33878.0,Black or African-American,24437
4,NJ,Newark,New Jersey,138040.0,143873.0,281913,86253.0,White,76402


#### Get all the labels for the countries, cities and state codes

In [27]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    labels = f.readlines()
    
country_code = {}
for countries in labels[10:298]:
    tupla = countries.split('=')
    code, country = tupla[0].strip(), tupla[1].strip().strip("'")
    country_code[code] = country

city_code = {}
for cities in labels[303:962]:
    tupla = cities.split('=')
    code, city = tupla[0].strip("\t").strip().strip("'"), tupla[1].strip('\t').strip().strip("''")
    city_code[code] = city    
    
state_code = {}
for states in labels[982:1036]:
    tupla = states.split('=')
    code, state = tupla[0].strip('\t').strip("'"), tupla[1].strip().strip("'")
    state_code[code] = state

##### Read the .sas7bdat log files using Spark

In [19]:
files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
flag = True

for file in files:
    if flag:
        df_temp = spark.read.format('com.github.saurfang.sas.spark').load(file)
        df_spark = spark.read.format('com.github.saurfang.sas.spark').load(file)
    else:
        df_spark = df_spark.union(df_temp)
    
df_spark = spark.read.parquet("sas_data")
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

### Step 2: Explore and Assess the Data
Read, enrich and modify the `fact_immig` data.

In [21]:
def to_date_sas(date):
    
    """UDF function that transforms the sas files date columns to timestamp
    
        Arguments:
            date: pandas column
        Returns:
            Pandas column as type Timestamp with correct date.
    """
    
    if date is not None:
        return pd.to_timedelta(date, unit='d') + pd.Timestamp('1960-1-1')
    
to_date_udf = udf(to_date_sas, DateType())

In [26]:
fact_immig = df_spark.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode')\
                         .distinct().withColumn("immigration_id", monotonically_increasing_id())

columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode']
    
for original, new in zip(fact_immig.columns, columns):
    fact_immig = fact_immig.withColumnRenamed(original, new)
    
fact_immig = fact_immig.withColumn('country', lit('United States'))
fact_immig = fact_immig.withColumn('arrive_date', to_date_udf(col('arrive_date')))
fact_immig = fact_immig.withColumn('departure_date', to_date_udf(col('departure_date')))

fact_immig.head(1)

[Row(cic_id=78.0, year=2016.0, month=4.0, city_code='BOS', state_code='MA', arrive_date=datetime.date(2016, 4, 1), departure_date=datetime.date(2016, 4, 4), mode=1.0, immigration_id=0, country='United States')]

Create a latitude and longitude columns for the `dim_us_airport` table

In [32]:
dim_us_airport['latitude'] = dim_us_airport.coordinates.str.split(', ', expand=True)[0].astype('float')
dim_us_airport['longitude'] = dim_us_airport.coordinates.str.split(', ', expand=True)[1].astype('float')
dim_us_airport = dim_us_airport[['airline_id', 'description', 'elev_ft', 'iso_2_region', 'municipality', 'latitude', 'longitude']]
dim_us_airport.head(3)

Unnamed: 0,airline_id,description,elev_ft,iso_2_region,municipality,latitude,longitude
0,00A,Total Rf Heliport,11.0,US-PA,Bensalem,-74.933601,40.070801
1,00AA,Aero B Ranch Airport,3435.0,US-KS,Leoti,-101.473911,38.704022
2,00AK,Lowell Field,450.0,US-AK,Anchor Point,-151.695999,59.9492


Change the `dim_us_cities_pop` table data types and standardize the string columns 

In [33]:
dim_us_cities_pop = dim_us_cities_pop.fillna(0)

dim_us_cities_pop.city = dim_us_cities_pop.city.str.upper()
dim_us_cities_pop.state = dim_us_cities_pop.state.str.upper()
dim_us_cities_pop.race = dim_us_cities_pop.race.str.upper()

dim_us_cities_pop.male_pop = dim_us_cities_pop.male_pop.astype('int')
dim_us_cities_pop.fem_pop = dim_us_cities_pop.fem_pop.astype('int')
dim_us_cities_pop.foreign_born = dim_us_cities_pop.foreign_born.astype('int')

dim_us_cities_pop.head(3)

Unnamed: 0,state_code,city,state,male_pop,fem_pop,tot_pop,foreign_born,race,cout
0,MD,SILVER SPRING,MARYLAND,40601,41862,82463,30908,HISPANIC OR LATINO,25924
1,MA,QUINCY,MASSACHUSETTS,44129,49500,93629,32935,WHITE,58723
2,AL,HOOVER,ALABAMA,38040,46799,84839,8229,ASIAN,4759


Create the countries, cities and state codes DataFrames

In [28]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(1)

Unnamed: 0,code,country
0,236,AFGHANISTAN


In [29]:
df_city_code = pd.DataFrame(list(city_code.items()), columns=['city_code', 'city'])
df_city_code.city = df_city_code.city.str.split(',', expand = True)[0]
df_city_code.head(1)

Unnamed: 0,city_code,city
0,ANC,ANCHORAGE


In [31]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(1)

Unnamed: 0,code,state
0,AK,ALASKA


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Conceptual Data Model

#### Staging Table
`staging_immig`
 * **cicid**: double (nullable = true)
 * **i94yr**: double (nullable = true)
 * **i94mon**: double (nullable = true)
 * **i94cit**: double (nullable = true)
 * **i94res**: double (nullable = true)
 * **i94port**: string (nullable = true)
 * **arrdate**: double (nullable = true)
 * **i94mode**: double (nullable = true)
 * **i94addr**: string (nullable = true)
 * **depdate**: double (nullable = true)
 * **i94bir**: double (nullable = true)
 * **i94visa**: double (nullable = true)
 * **count**: double (nullable = true)
 * **dtadfile**: string (nullable = true)
 * **visapost**: string (nullable = true)
 * **occup**: string (nullable = true)
 * **entdepa**: string (nullable = true)
 * **entdepd**: string (nullable = true)
 * **entdepu**: string (nullable = true)
 * **matflag**: string (nullable = true)
 * **biryear**: double (nullable = true)
 * **dtaddto**: string (nullable = true)
 * **gender**: string (nullable = true)
 * **insnum**: string (nullable = true)
 * **airline**: string (nullable = true)
 * **admnum**: double (nullable = true)
 * **fltno**: string (nullable = true)
 * **visatype**: string (nullable = true)
 
#### Fact Table
`fact_immig`
 * **cic_id**: double (nullable = true)
 * **year**: double (nullable = true)
 * **month**: double (nullable = true)
 * **city_code**: string (nullable = true)
 * **state_code**: string (nullable = true)
 * **arrive_date**: date (nullable = true)
 * **departure_date**: date (nullable = true)
 * **mode**: double (nullable = true)
 * **immigration_id**: long (nullable = false)
 * **country**: string (nullable = false)

#### Dimensional Tables
`dim_immig_info`
 * **cic_id**: double (nullable = true)
 * **citizen_country**: double (nullable = true)
 * **residence_country**: double (nullable = true)
 * **birth_year**: double (nullable = true)
 * **gender**: string (nullable = true)
 * **visa**: double (nullable = true)
 * **immigration_id**: long (nullable = false)

`dim_immig_airline`
 * **cic_id**: double (nullable = true)
 * **airline**: string (nullable = true)
 * **admin_num**: double (nullable = true)
 * **flight_number**: string (nullable = true)
 * **visa_type**: string (nullable = true)
 * **immi_airline_id**: long (nullable = false)

`dim_country_code`
 * **country_code**: string (nullable = true)
 * **country**: string (nullable = true)

`dim_city_code`
 * **city_code**: string (nullable = true)
 * **city**: string (nullable = true)

`dim_state_code`
 * **state_code**: string (nullable = true)
 * **state**: string (nullable = true)

`dim_us_cities_pop`
 * **state_code**: string (nullable = true)
 * **city**: string (nullable = true)
 * **state**: string (nullable = true)
 * **male_pop**: string (nullable = true)
 * **fem_pop**: string (nullable = true)
 * **tot_pop**: string (nullable = true)
 * **foreign_born**: string (nullable = true)
 * **race**: string (nullable = true)
 * **cout**: string (nullable = true)
 * **pop_id**: long (nullable = false)

#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model

1. Load the information for the staging table `staging_immig`
1. Clean the data: Delete duplicate, cleanse nulls, change data types, etc.
1. Create fact table `fact_immig`.
1. Create dimension tables for `dim_immig_info`, `dim_immig_airline`, `dim_country_code`, `dim_city_code`, `dim_state_code` and `dim_us_cities_pop`.
1. Save processed fact and dimension tables as parquet.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Complete ETL to create all the tables

In [56]:
import configparser
import os
import logging
import glob
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date, size
from pyspark.sql.functions import monotonically_increasing_id
pd.options.mode.chained_assignment = None 

# setup logging 
logger = logging.getLogger()
logger.setLevel(logging.INFO)

files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")

def create_spark_session():
    
    """
    Description: This function creates the Spark Session that will be used to process all the Sparkify data. 
    Arguments:
        None
    Returns:
        Spark Session
    """
    
    spark = SparkSession.builder.\
        config("spark.jars.repositories", "https://repos.spark-packages.org/").\
        config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
        enableHiveSupport().getOrCreate()
    return spark

def to_date_sas(date):
    
    """UDF function that transforms the sas files date columns to timestamp
    
        Arguments:
            date: pandas column
        Returns:
            Pandas column as type Timestamp with correct date.
    """
    
    if date is not None:
        return pd.to_timedelta(date, unit='d') + pd.Timestamp('1960-1-1')
    
to_date_udf = udf(to_date_sas, DateType())

def process_immigration_data(spark, input_data):
    
    """ Process immigration data to get the fact table fact_immig and the dimensional tables dim_immig_info and dim_immig_airline.
    
        Arguments:
            spark: Spark Session.
            input_data: data files
        Returns:
            None
    """
    
    logging.info("Processing immigration data")
    
    flag = True
    
    for file in input_data:
        if flag:
            df_temp = spark.read.format('com.github.saurfang.sas.spark').load(file)
            df_spark = spark.read.format('com.github.saurfang.sas.spark').load(file)
        else:
            df_spark = df_spark.union(df_temp)

    df_spark = spark.read.parquet("sas_data")

    fact_immig = df_spark.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode')\
                         .distinct().withColumn("immigration_id", monotonically_increasing_id())

    columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode']

    for original, new in zip(fact_immig.columns, columns):
        fact_immig = fact_immig.withColumnRenamed(original, new)

    fact_immig = fact_immig.withColumn('country', lit('United States'))
    fact_immig = fact_immig.withColumn('arrive_date', to_date_udf(col('arrive_date')))
    fact_immig = fact_immig.withColumn('departure_date', to_date_udf(col('departure_date')))
    
    logging.info("Start processing dim_immig_info")
    
    # Select the specific columns for the dimensional table.
    dim_immig_info = df_spark.select('cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'i94visa')\
                       .distinct().withColumn("immigration_id", monotonically_increasing_id())
    
    # Change the column names.
    columns = ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'visa']
    
    for original, new in zip(dim_immig_info.columns, columns):
        dim_immig_info = dim_immig_info.withColumnRenamed(original, new)

    logging.info("Processing dim_immig_airline")
    
    # Select the specific columns for the dimensional table.
    dim_immig_airline = df_spark.select('cicid', 'airline', 'admnum', 'fltno', 'visatype').distinct()\
                         .withColumn("immi_airline_id", monotonically_increasing_id())
    
    # Change the column names.
    columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
    
    for original, new in zip(dim_immig_airline.columns, columns):
        dim_immig_airline = dim_immig_airline.withColumnRenamed(original, new)
    
    return fact_immig, dim_immig_info, dim_immig_airline

def process_labels_data(spark):
    
    """ Proccessing the labels file to get the country, city, state codes.
        Arguments:
            spark: Spark Session.
        Returns:
            None
    """

    logging.info("Processing label data")
    
    # Reading File
    label_file = os.path.join("I94_SAS_Labels_Descriptions.SAS")
    with open(label_file) as f:
        contents = f.readlines()

    country_code = {}
    for countries in contents[10:298]:
        pair = countries.split('=')
        code, country = pair[0].strip(), pair[1].strip().strip("'")
        country_code[code] = country

    # Creating Spark dataframe and writing the dimensional table for countries as parquet
    dim_country_code = spark.createDataFrame(country_code.items(), ['country_code', 'country'])


    # Parsing the city codes
    city_code = {}
    for cities in contents[303:962]:
        pair = cities.split('=')
        code, city = pair[0].strip("\t").strip().strip("'"),\
                     pair[1].strip('\t').strip().strip("''")
        city_code[code] = city

    # Creating Spark dataframe for the city codes
    df_city_code = pd.DataFrame(list(city_code.items()), columns=['city_code', 'city'])
    df_city_code.city = df_city_code.city.str.split(',', expand = True)[0]

    # Writing the dimensional table for cities as parquet
    dim_city_code = spark.createDataFrame(df_city_code)


    # Parsing the state codes
    state_code = {}
    for states in contents[982:1036]:
        pair = states.split('=')
        code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
        state_code[code] = state

     # Creating Spark dataframe and writing the dimensional table for states as parquet
    dim_state_code = spark.createDataFrame(state_code.items(), ['state_code', 'state'])
    
    return dim_country_code, dim_city_code, dim_state_code

def process_airport_data(spark):
    
    """ Process airport data csv to get the dimensional dim_us_airport table
        Arguments:
            spark: Spark Session.
        Returns:
            None
    """

    logging.info("Processing dim_us_airport")
    
    # Reading File
    us_airport = os.path.join('airport-codes_csv.csv')
    df = spark.read.csv(us_airport, header=True)

    # Select the specific columns for the dimensional table.
    df = df.where(df['iso_country'] == 'US')
    dim_us_airport = df.select(['ident', 'name', 'elevation_ft', 'iso_region', 'municipality', 'coordinates']).distinct()

    # Change the column names.
    columns = ['airline_id', 'description', 'elev_ft', 'iso_2_region', 'municipality', 'coordinates']
    
    for original, new in zip(dim_us_airport.columns, columns):
        dim_us_airport = dim_us_airport.withColumnRenamed(original, new)
 
    # Writing the dimensional table for airports as parquet
    #dim_us_airport.write.mode("overwrite").parquet(path=output_data + 'dim_us_airport')
    return dim_us_airport

def process_population_data(spark):
    """ Process demograpy data to get dim_us_cities_pop table
        Arguments:
            spark: Spark Session.
        Returns:
            None
    """

    logging.info("Processing dim_us_cities_pop")
    
    # Reading File
    pop_data = os.path.join('us-cities-demographics.csv')
    df = spark.read.format('csv').options(header=True, delimiter=';').load(pop_data)

    # Select the specific columns for the dimensional table.
    dim_us_cities_pop = df.select(['State Code', 'City', 'State', 'Male Population', 'Female Population', 'Total Population', 'Foreign-born', 'Race', 'Count'])\
                          .distinct().withColumn("pop_id", monotonically_increasing_id())

    # Change the column names.
    columns = ['state_code', 'city', 'state', 'male_pop', 'fem_pop', 'tot_pop', 'foreign_born', 'race', 'cout']
    
    for original, new in zip(dim_us_cities_pop.columns, columns):
        dim_us_cities_pop = dim_us_cities_pop.withColumnRenamed(original, new)

    # Writing the dimensional table for cities population as parquet
    #dim_us_cities_pop.write.mode("overwrite").parquet(path=output_data + 'dim_us_cities_pop')
    return dim_us_cities_pop
    
def main():
    
    """
    Description: This is the main function where everything is executed. 
    Arguments:
        None
    Returns:
        None
    """
    
    spark = create_spark_session()
    input_data = files
    #output_data = "s3a://udacity-rmp-capstone/"
    
    fact_immig, dim_immig_info, dim_immig_airline = process_immigration_data(spark, input_data)    
    dim_country_code, dim_city_code, dim_state_code = process_labels_data(spark)
    dim_us_airport = process_airport_data(spark)
    dim_us_cities_pop = process_population_data(spark)
    
    logging.info("The process has successfully ended")


if __name__ == "__main__":
    main()    
    

INFO:root:Processing immigration data
INFO:root:Start processing dim_immig_info
INFO:root:Processing dim_immig_airline
INFO:root:Processing label data
INFO:root:Processing dim_us_airport
INFO:root:Processing dim_us_cities_pop
INFO:root:The process has successfully ended


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [59]:
def data_schema_check(df, name):
    if df is not None:
        print("Data Quality Passed")
        print(f"Table {name} exists")
        print(df.printSchema())
    else:
        print("Data Quality Failed")
        print(f"Table {name} missing, check again")
    return None

data_schema_check(df_spark, 'staging_immig')
data_schema_check(fact_immig, 'fact_immig')
data_schema_check(dim_immig_info, 'dim_immig_info')
data_schema_check(dim_immig_airline, 'dim_immig_airline')
data_schema_check(dim_country_code, 'dim_country_code')
data_schema_check(dim_city_code, 'dim_city_code')
data_schema_check(dim_state_code, 'dim_state_code')
data_schema_check(dim_us_cities_pop, 'dim_us_cities_pop')

Data Quality Passed
Table staging_immig exists
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nu

In [54]:
def count_check(df, name):
    count = df.count()
    if count != 0:
        print("Data Count Passed")
        print(f"Table {name} has: {count} records")
    else:
        print("Data Count Failed")
        print("Table {name} has: no records")
    return None

count_check(fact_immig, 'fact_immig')
count_check(dim_immig_info, 'dim_immig_info')
count_check(dim_immig_airline, 'dim_immig_airline')
count_check(dim_country_code, 'dim_country_code')
count_check(dim_city_code, 'dim_city_code')
count_check(dim_state_code, 'dim_state_code')
count_check(dim_us_cities_pop, 'dim_us_cities_pop')

Data Count Passed
Table fact_immig has: 3096313 records
Data Count Passed
Table dim_immig_info has: 3096313 records
Data Count Passed
Table dim_immig_airline has: 3096313 records
Data Count Passed
Table dim_country_code has: 288 records
Data Count Passed
Table dim_city_code has: 659 records
Data Count Passed
Table dim_state_code has: 54 records
Data Count Passed
Table dim_us_cities_pop has: 2891 records


#### 4.3 Data dictionary 
![Dict](Data_dictionary.png)

##### Fact Table
`fact_immig`
 * **cic_id**: immigrant id
 * **year**: ingestion year
 * **month**: ingestion month
 * **city_code**: city code
 * **state_code**: state code
 * **arrive_date**: arrival date
 * **departure_date**: departure date
 * **mode**: departure mode
 * **immigration_id**: record id
 * **country**: departure country

##### Dimensional Tables
`dim_immig_info`
 * **cic_id**: immigrant id
 * **citizen_country**: country of citizenship
 * **residence_country**: country of residence
 * **birth_year**: birth year
 * **gender**: immigrant gender
 * **visa**: immigrant visa
 * **immigration_id**: record id

`dim_immig_airline`
 * **cic_id**: immigrant id
 * **airline**: airline id
 * **admin_num**: admin number
 * **flight_number**: flight number
 * **visa_type**: immigrant visa type
 * **immi_airline_id**: record id

`dim_country_code`
 * **country_code**: country code
 * **country**: country

`dim_city_code`
 * **city_code**: city code
 * **city**: city

`dim_state_code`
 * **state_code**: state code
 * **state**: state

`dim_us_cities_pop`
 * **state_code**: state code
 * **city**: city
 * **state**: state
 * **male_pop**: male population count
 * **fem_pop**: female population count
 * **tot_pop**: total population count
 * **foreign_born**: foreign born count
 * **race**: race
 * **cout**: count of population of that race
 * **pop_id**: city population id

#### Step 5: Complete Project Write Up

* *Choice of tools and technologies for the project*
    * As the I94 US Immigration Data needed a large amount of processing power, that's why Spark is used. It helped to scale and read files such as SAS, Parquet, and CSV. Integrates AWS S3 bucket to save the data.
    * Star schema was used because the objective is to make simplified queries that require fast aggregations such as finding the most immigrant colluded airports in the US. The tables are connected in a way that is easier to find a general table for each topic.
* *Update periodicity*
    * The staging information updates every month, as it is the main source of information the tables should be updated when that data becomes available. 
* *Scenarios*:
     * The data was increased by 100x.
        * If the data was increased by 100x, Pyspark locally begins to lac computational power. Creating a cluster in AWS should be considered, it would process all the staging information in S3 and then create the fact and dimensional tables.
     * The data populates a dashboard that must be updated on a daily basis by 7am every day.
        * If the data populates a dashboard that must be updated daily at 7am every day, Airflow is a great solution to schedule and automate a data pipeline that 
needs specific timelines. Utilizing the built-in retry and monitoring mechanisms it will be possible to check if there's an error in the process.

     * The database needed to be accessed by 100+ people
        * If the database needed to be accessed by 100+ people, a Data Warehouse in a cloud such as AWS is the best solution. It can help manage the user's workload, computational resources and ensure that every user has defined access depending on their role.