# U.S. Immigration Data
### Data Engineering Capstone Project

#### Project Summary
In this project, we will develop a data model designed for Online Analytical Processing (OLAP) to support queries analyzing U.S. immigration data. In the data model, we complemented the U.S. immigration data with U.S. cities' demographics data (ETL'ed in a fact table) as well as dimension tables for arrival ports, countries, visa types, etc. Considering: a) the Star Schema for the data model, which is optimized for OLAP queries, and b) a large amount of data, the prospects for analysis and extracting insights from the data are endless. We will touch on Exploratory Data Analysis on the collected data.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Conclusion and future work

In [107]:
# Do all imports and installs here
import pandas as pd
import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType as R, StructField as Fld,\
    DoubleType as Dbl, StringType as Str, IntegerType as Int,\
    TimestampType as Timestamp, DateType as Date, LongType as Long
import pyspark.sql.functions as F
from IPython.core.display import display, HTML

### Step 1: Scope the Project and Gather Data

#### Scope 
We'll utilize Pyspark to perform an extract, transform, load (ETL) pipeline. Considering that the data formats are inconsistent across all data sets, which resembles an ETL from a data lake, the extracted data will be cleaned and used to build the Star Schema in the data model as Apache Parquet columnar format optimized for OLAP queries.

We can use Spark to concatenate data in each path into a single data frame. However, we'll opt for using only a single path since: 
1. its size meets the requirement for this project, 
2. the data model and ETL pipeline is expected to work the same as we scale up the size of the data, 
3. it's easier to identify the errors and verify our theory on a small set of data before scaling up.


#### Datasets 

* ##### Immigration Data
The immigration data is in a folder in this workspace and accessible on the following path: <code>../../data/18-83510-I94-Data-2016/</code>. There's a file for each month of the year. An example file name is i94_apr16_sub.sas7bdat. Each file has a three-letter abbreviation for the month name. So a full file path for June would look like this: <code>../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat</code>. This data comes from the U.S. National Tourism and Trade Office.
More information on the immigration data [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).

* ##### U.S. City Demographic Data
 This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

* ##### Tools/Technologies Used
Python, Apache Spark, Pandas

In [108]:
# exploring the directories containing the datasets
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# to read the file in pandas: df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
i94_file_paths = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_file_paths

['../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
 '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']

we can use spark to concatenate data in each path into a singe data frame. However, we'll opt for using only a single path since: 
1. its size meets the requirement for this project, 
2. the data model and ETL pipeline is expected to work the same as we scale up the size of the data, 
3. it's easier identify the errors and verify our theory on small set of data before scaling up.

In [109]:
# create spark session and load i_94 data
spark = SparkSession.builder \
            .appName("Capstone Project - Immigration Dataset") \
            .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
            .enableHiveSupport() \
            .getOrCreate()

i94_df = spark.read.format('com.github.saurfang.sas.spark').load(fname)
# print schema of loaded i_94 dataframe
i94_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [110]:
print("Paritions: ", i94_df.rdd.getNumPartitions())
i94_df.limit(5).toPandas()

Paritions:  14


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### Step 2: Explore and Assess the Data

#### Explore the Data 

Here we'll load and explore the remaining datasets 

In [111]:
# define an explicit schema for US demographics dataset
schema = R([
        Fld("city", Str()),
        Fld("state", Str()),
        Fld("median_age", Dbl()),
        Fld("male_population", Int()),
        Fld("female_population", Int()),
        Fld("total_population", Int()),
        Fld("number_of_veterans", Int()),
        Fld("foreign_born", Int()),
        Fld("average_household_size", Dbl()),
        Fld("state_code", Str()),
        Fld("race", Str()),
        Fld("count", Int())
    ])

# read US demographics dataset into a dataframe using the defined schema
us_demographics_df = spark.read.csv('us-cities-demographics.csv', sep=';', header=True, schema=schema)

In [112]:
us_demographics_df

DataFrame[city: string, state: string, median_age: double, male_population: int, female_population: int, total_population: int, number_of_veterans: int, foreign_born: int, average_household_size: double, state_code: string, race: string, count: int]

In [113]:
us_demographics_df.limit(5).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


#### Cleaning Steps
upon checking the SAS labels descriptions file, we found the we must clean the data before loading it in dataframes. the folloing python function we extract and clean the label description data.

In [114]:
def extract_data_from_SAS_labels_descriptions(input_label):
    '''
    A procedure that returns a cleaned list of code value pairs for the provided input label
    
    Parameters
    ----------
    input_label : str
        name of the label in the SAS labels descriptions file
    
    Returns
    -------
    code_value_list : list(tuple(str, str))
        a list of code values pairs extracted from the SAS labels descriptions file and cleaned
    '''

    with open('I94_SAS_Labels_Descriptions.SAS') as labels_descriptions:
            raw_labels = labels_descriptions.read()

    # extract only label data
    labels = raw_labels[raw_labels.index(input_label):]
    labels = labels[:labels.index(';')]
    
    
    # in each line remove unnecessary spaces and extract the code and its corresponding value 
    lines = labels.splitlines()
    code_value_list = []
    for line in lines:
        try:
            code, value = line.split('=')
            code = code.strip().strip("'").strip('"')
            value = value.strip().strip("'").strip('"').strip()
            code_value_list.append((code, value))
        except:
            pass
        
    return code_value_list

In [115]:
# define the schema the will be used for all label descriptions
schema = R([
        Fld("code", Str()),
        Fld("name", Str())
    ])

In [116]:
countries_df = spark.createDataFrame(
        data=extract_data_from_SAS_labels_descriptions('I94RES'),
        schema=schema
)

countries_df.limit(5).toPandas()

Unnamed: 0,code,name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [117]:
ports_df = spark.createDataFrame(
        data=extract_data_from_SAS_labels_descriptions('I94PORT'),
        schema=schema
)
ports_df.limit(5).toPandas()

Unnamed: 0,code,name
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [118]:
states_df = spark.createDataFrame(
        data=extract_data_from_SAS_labels_descriptions('I94ADDR'),
        schema=schema
)

states_df.limit(5).toPandas()

Unnamed: 0,code,name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [119]:
travel_modes_df = spark.createDataFrame(
        data=extract_data_from_SAS_labels_descriptions('I94MODE'),
        schema=schema
)

travel_modes_df.limit(5).toPandas()

Unnamed: 0,code,name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [120]:
visa_categories_df = spark.createDataFrame(
        data=extract_data_from_SAS_labels_descriptions('I94VISA'),
        schema=schema
)

visa_categories_df.limit(5).toPandas()

Unnamed: 0,code,name
0,1,Business
1,2,Pleasure
2,3,Student


performing more exploration and checks on the extraced data

In [121]:
len(ports_df.toPandas()['code'].unique())

660

In [122]:
ports_df.toPandas()['code'].unique()

array(['ALC', 'ANC', 'BAR', 'DAC', 'PIZ', 'DTH', 'EGL', 'FRB', 'HOM',
       'HYD', 'JUN', '5KE', 'KET', 'MOS', 'NIK', 'NOM', 'PKC', 'ORI',
       'SKA', 'SNP', 'TKI', 'WRA', 'HSV', 'MOB', 'LIA', 'ROG', 'DOU',
       'LUK', 'MAP', 'NAC', 'NOG', 'PHO', 'POR', 'SLU', 'SAS', 'TUC',
       'YUI', 'AND', 'BUR', 'CAL', 'CAO', 'FRE', 'ICP', 'LNB', 'LOS',
       'BFL', 'OAK', 'ONT', 'OTM', 'BLT', 'PSP', 'SAC', 'SLS', 'SDP',
       'SFR', 'SNJ', 'SLO', 'SLI', 'SPC', 'SYS', 'SAA', 'STO', 'TEC',
       'TRV', 'APA', 'ASE', 'COS', 'DEN', 'DRO', 'BDL', 'BGC', 'GRT',
       'HAR', 'NWH', 'NWL', 'TST', 'WAS', 'DOV', 'DVD', 'WLL', 'BOC',
       'SRQ', 'CAN', 'DAB', 'FRN', 'FTL', 'FMY', 'FPF', 'HUR', 'GNV',
       'JAC', 'KEY', 'LEE', 'MLB', 'MIA', 'APF', 'OPF', 'ORL', 'PAN',
       'PEN', 'PCF', 'PEV', 'PSJ', 'SFB', 'SGJ', 'SAU', 'FPR', 'SPE',
       'TAM', 'WPB', 'ATL', 'BRU', 'AGS', 'SAV', 'AGA', 'HHW', 'OGG',
       'KOA', 'LIH', 'CID', 'DSM', 'BOI', 'EPI', 'IDA', 'PTL', 'SPI',
       'CHI', 'DPA',

In [123]:
states_df.toPandas()['code'].unique()

array(['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'DC', 'FL', 'GA',
       'GU', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
       'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NC', 'ND', 'NE', 'NV', 'NH',
       'NJ', 'NM', 'NY', 'OH', 'OK', 'OR', 'PA', 'PR', 'RI', 'SC', 'SD',
       'TN', 'TX', 'UT', 'VT', 'VI', 'VA', 'WV', 'WA', 'WI', 'WY', '99'], dtype=object)

In [124]:
states_df.toPandas()['name'].unique()

array(['ALABAMA', 'ALASKA', 'ARIZONA', 'ARKANSAS', 'CALIFORNIA',
       'COLORADO', 'CONNECTICUT', 'DELAWARE', 'DIST. OF COLUMBIA',
       'FLORIDA', 'GEORGIA', 'GUAM', 'HAWAII', 'IDAHO', 'ILLINOIS',
       'INDIANA', 'IOWA', 'KANSAS', 'KENTUCKY', 'LOUISIANA', 'MAINE',
       'MARYLAND', 'MASSACHUSETTS', 'MICHIGAN', 'MINNESOTA', 'MISSISSIPPI',
       'MISSOURI', 'MONTANA', 'N. CAROLINA', 'N. DAKOTA', 'NEBRASKA',
       'NEVADA', 'NEW HAMPSHIRE', 'NEW JERSEY', 'NEW MEXICO', 'NEW YORK',
       'OHIO', 'OKLAHOMA', 'OREGON', 'PENNSYLVANIA', 'PUERTO RICO',
       'RHODE ISLAND', 'S. CAROLINA', 'S. DAKOTA', 'TENNESSEE', 'TEXAS',
       'UTAH', 'VERMONT', 'VIRGIN ISLANDS', 'VIRGINIA', 'W. VIRGINIA',
       'WASHINGTON', 'WISCONSON', 'WYOMING', 'All Other Codes'], dtype=object)

In [125]:
len(states_df.toPandas()['code'].unique())

55

writing a function to check for duplicate rows in the data

In [126]:
def find_duplicates(df):
    '''
    A procedure that looks for duplicate rows in input dataframe, and displays a dataframe of duplicate rows and the number of duplicates in column 'count'
    
    Parameters
    ----------
    df : DataFrame
        input dataFrame
    
    Returns
    -------
    None
    '''
    
    if 'count' in df.columns:
        df = df.withColumnRenamed('count', 'cnt')

    
    display( \
            HTML( \
                 df. \
                 groupby(df.columns) \
                 .count() \
                 .where('count > 1') \
                 .sort('count', ascending=False) \
                 .toPandas() \
                 .to_html()
                )
           )

In [127]:
# check i94_df for duplicates

find_duplicates(i94_df)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,cnt,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,count


In [128]:
find_duplicates(us_demographics_df)

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,cnt,count


In [129]:
find_duplicates(countries_df)

Unnamed: 0,code,name,count


In [130]:
find_duplicates(ports_df)

Unnamed: 0,code,name,count


In [131]:
find_duplicates(states_df)

Unnamed: 0,code,name,count


In [132]:
find_duplicates(travel_modes_df)

Unnamed: 0,code,name,count


In [133]:
find_duplicates(visa_categories_df)

Unnamed: 0,code,name,count


writing a function to check for missing data i.e. nulls and non-numeric values (NaN)

In [134]:
def find_nulls(df):
    '''
    A procedure that looks for missing data in input dataframe, and displays a dataframe with the number of missing values in each column'
    
    Parameters
    ----------
    df : DataFrame
        input dataFrame
    
    Returns
    -------
    None
    '''
    display( \
            HTML( \
                 df \
                 .select([F.count(F.when(F.isnan(column) | F.col(column).isNull(), column)) \
                 .alias(column) for column in df.columns]) \
                 .toPandas() \
                 .to_html()
                )
           )

In [135]:
# us_demographics_df, countries_df, ports_df, states_df, travel_modes_df, visa_categories_df

In [136]:
find_nulls(i94_df)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,239,152592,142457,802,0,0,1,1881250,3088187,238,138429,3095921,138429,802,477,414269,2982605,83627,0,19549,0


In [137]:
find_nulls(us_demographics_df)

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,0,0,0,3,3,0,13,13,16,0,0,0


In [138]:
find_nulls(countries_df)

Unnamed: 0,code,name
0,0,0


In [139]:
find_nulls(ports_df)

Unnamed: 0,code,name
0,0,0


In [140]:
find_nulls(states_df)

Unnamed: 0,code,name
0,0,0


In [141]:
find_nulls(travel_modes_df)

Unnamed: 0,code,name
0,0,0


In [142]:
find_nulls(countries_df)

Unnamed: 0,code,name
0,0,0


In [143]:
find_nulls(visa_categories_df)

Unnamed: 0,code,name
0,0,0


Although we found missing data in the US immegration and demographics data, we decided to refrain from dropping the rows containing it at this point. Because we might need other data contained in the same row for our data model. However, we think that the data in the i-94 and ports dataframes require further cleaning.

In [144]:
# to clean the years in the i_94 dataframe, we'll convert dates from SAS format to Python
get_isoformat_date = F.udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x else None)
get_valid_birth_year = F.udf(lambda yr: yr if (yr and 1900 <= yr <= 2016) else None)

i94_df =  i94_df \
          .withColumn('arrdate', get_isoformat_date(i94_df.arrdate)) \
          .withColumn('depdate', get_isoformat_date(i94_df.depdate)) \
          .withColumn("biryear", get_valid_birth_year(i94_df.biryear)) \
          .dropDuplicates()

In [145]:
# cleaning ports df by splitting city and state names

get_city_name = F.udf(lambda port_name: port_name.split(',')[0].strip() if port_name else None)
get_state_code = F.udf(lambda port_name: port_name.split(',')[1].strip()
                                        if (port_name and len(port_name.split(',')) > 1) else None)

ports_df = ports_df \
        .withColumn('city', get_city_name(ports_df.name)) \
        .withColumn('state_code', get_state_code(ports_df.name)) \
        .drop('name') \
        .dropna() \
        .dropDuplicates()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We will create a Star Schema for better support for OLAP queries. the figure below illustres target the Star Schema. The Schemas for each table will be printed in the following steps

![data model star schema](star_schema.png)

In [57]:
# creating staging tables
i94_df.createOrReplaceTempView('staging_i94')
us_demographics_df.createOrReplaceTempView('staging_us_demographics')
states_df.createOrReplaceTempView('staging_states')
visa_categories_df.createOrReplaceTempView('staging_visa_categories')
travel_modes_df.createOrReplaceTempView('staging_travel_modes')
ports_df.createOrReplaceTempView('staging_ports')
countries_df.createOrReplaceTempView('staging_countries')

In [147]:
i_94_fact_table = spark.sql("""
            SELECT
                sid.cicid AS cicid,
                sid.i94yr AS entry_year,
                sid.i94mon AS entry_month,
                sc.code AS origin_country_code,
                sp.code AS port_code,
                sid.arrdate AS arrival_date,
                stm.code AS travel_mode_code,
                sus.code AS us_state_code,
                sid.depdate AS departure_date,
                sid.i94bir AS age,
                svc.code AS visa_category_code,
                sid.occup AS occupation,
                sid.gender AS gender,
                sid.biryear AS birth_year,
                sid.dtaddto AS entry_date,
                sid.airline AS airline,
                sid.admnum AS admission_number,
                sid.fltno AS flight_number,
                sid.visatype AS visa_type
            FROM staging_i94 sid
                LEFT JOIN staging_countries sc ON sc.code = sid.i94res
                LEFT JOIN staging_ports sp ON sp.code = sid.i94port
                LEFT JOIN staging_states sus ON sus.code = sid.i94addr
                LEFT JOIN staging_visa_categories svc ON svc.code = sid.i94visa
                LEFT JOIN staging_travel_modes stm ON stm.code = sid.i94mode
            WHERE 
                sc.code IS NOT NULL AND
                sp.code IS NOT NULL AND
                sus.code IS NOT NULL AND
                stm.code IS NOT NULL AND
                svc.code IS NOT NULL
        """)

In [148]:
i_94_fact_table.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- entry_year: double (nullable = true)
 |-- entry_month: double (nullable = true)
 |-- origin_country_code: string (nullable = true)
 |-- port_code: string (nullable = true)
 |-- arrival_date: double (nullable = true)
 |-- travel_mode_code: string (nullable = true)
 |-- us_state_code: string (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_category_code: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- entry_date: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admission_number: double (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)



In [149]:
i_94_fact_table.limit(5).toPandas()

Unnamed: 0,cicid,entry_year,entry_month,origin_country_code,port_code,arrival_date,travel_mode_code,us_state_code,departure_date,age,visa_category_code,occupation,gender,birth_year,entry_date,airline,admission_number,flight_number,visa_type
0,3386206.0,2016.0,4.0,438,FMY,20562.0,1,AZ,20568.0,54.0,1,,M,1962.0,7162016,QF,56359930000.0,00093,WB
1,5095747.0,2016.0,4.0,504,ORL,20571.0,1,FL,20576.0,29.0,2,,F,1987.0,10262016,*GA,94661160000.0,N329C,B2
2,3381198.0,2016.0,4.0,268,FMY,20562.0,1,AZ,20567.0,37.0,1,,M,1979.0,7162016,CI,56391230000.0,00006,WB
3,2500813.0,2016.0,4.0,129,LOS,20558.0,1,CA,20568.0,67.0,2,,F,1949.0,7122016,IB,56163570000.0,06171,WT
4,803901.0,2016.0,4.0,689,FMY,20548.0,1,AZ,20552.0,55.0,1,,M,1961.0,10032016,AA,92718470000.0,00216,B1


In [150]:
aggregated_sum_df = spark.sql("""
            SELECT
                sud.city,
                sud.state_code,
                SUM(sud.male_population) AS male_population,
                SUM(sud.female_population) AS female_population,
                SUM(sud.total_population) AS total_population,
                SUM(sud.number_of_veterans) AS number_of_veterans,
                SUM(sud.foreign_born) AS num_foreign_born
            FROM staging_us_demographics sud
            GROUP BY sud.city, sud.state_code
        """)

In [151]:
aggregated_sum_df.limit(5).toPandas()

Unnamed: 0,city,state_code,male_population,female_population,total_population,number_of_veterans,num_foreign_born
0,Mesa,AZ,1174990,1184175,2359165,159040,287460
1,Norwalk,CA,265625,270100,535725,15135,196980
2,Asheville,NC,210500,232035,442535,24865,33150
3,Minneapolis,MN,1032735,1021940,2054675,76085,353845
4,Waukegan,IL,227570,214165,441735,17020,154985


In [152]:
aggregated_sum_df.createOrReplaceTempView('aggregated_demographics')

In [153]:
city_dim_table = spark.sql("""
            SELECT
                sp.code AS port_code,
                ad.*
            FROM staging_ports sp
                JOIN aggregated_demographics ad 
                    ON lower(ad.city) = lower(sp.city) AND ad.state_code = sp.state_code
        """)

In [154]:
city_dim_table.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- num_foreign_born: long (nullable = true)



In [155]:
city_dim_table.limit(5).toPandas()

Unnamed: 0,port_code,city,state_code,male_population,female_population,total_population,number_of_veterans,num_foreign_born
0,MLB,Melbourne,FL,195900,204780,400680,41815,48425
1,GRP,Grand Rapids,MI,478345,497150,975495,44860,95880
2,CRP,Corpus Christi,TX,802440,817970,1620410,125390,154170
3,NYC,New York,NY,20408490,22343535,42752025,784805,16062500
4,WPB,West Palm Beach,FL,246310,287600,533910,24585,153375


In [156]:
ports_dim_table = ports_df
ports_dim_table.toPandas().sample(5)

Unnamed: 0,code,city,state_code
216,AST,ASTORIA,OR
265,PDN,PASO DEL NORTE,TX
313,EZE,BUENOS AIRES,MINISTRO PIST
267,SPE,ST PETERSBURG,FL
223,AKR,AKRON,OH


In [157]:
ports_dim_table.printSchema()

root
 |-- code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)



In [158]:
states_dim_table = states_df

In [159]:
states_dim_table.printSchema()

root
 |-- code: string (nullable = true)
 |-- name: string (nullable = true)



In [160]:
countries_dim_table = countries_df

In [161]:
countries_dim_table.printSchema()

root
 |-- code: string (nullable = true)
 |-- name: string (nullable = true)



In [162]:
travel_modes_dim_table = travel_modes_df
travel_modes_dim_table.printSchema()

root
 |-- code: string (nullable = true)
 |-- name: string (nullable = true)



In [163]:
visa_categories_dim_table = visa_categories_df
visa_categories_dim_table.printSchema()

root
 |-- code: string (nullable = true)
 |-- name: string (nullable = true)



printed schemas conform with the target star schema illustratted in the figure above

In [165]:
# save dataframes into Apache Parquet columnar format
i_94_fact_table.write.parquet("i_94_fact_table")
city_dim_table.write.parquet("city_dim_table")
ports_dim_table.write.parquet("ports_dim_table")
states_dim_table.write.parquet("states_dim_table")
countries_dim_table.write.parquet("countries_dim_table")
travel_modes_dim_table.write.parquet("travel_modes_dim_table")
visa_categories_dim_table.write.parquet("visa_categories_dim_table")

#### 3.2 Mapping Out Data Pipelines
the steps necessary to pipeline (ingest, clean, populate tables) the data into the chosen data model:
1. load US [immegration data](https://travel.trade.gov/research/reports/i94/historical/2016.html) and [demographic data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) from sources.
2. Extract label data from the SAS label description file, excluding unnecessary spaces and characters.
3. Further clean the ports' data by splitting the cities from states.
4. Create dimension table from cleaned data extracted from SAS label description file. 
5. Investigate for duplicate rows and missing values, drop if necessary.
6. Create facts and dimensions table by joining immigration, demographic tables, and corresponding columns in dimension tables.
7. Perform data quality checks on fact and dimension tables.
8. Save dataframes into Apache Parquet columnar format.

#### 4.2 Data Quality Checks
we'll performthe following data quality checks:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [166]:
# set the spark session configration to reduce data shuffling
spark.conf.set("spark.sql.shuffle.partitions", 50)

In [169]:
# load data from parquet files
i_94_fact_table = spark.read.parquet("i_94_fact_table")
city_dim_table = spark.read.parquet("city_dim_table")
ports_dim_table = spark.read.parquet("ports_dim_table")
states_dim_table = spark.read.parquet("states_dim_table")
countries_dim_table = spark.read.parquet("countries_dim_table")
travel_modes_dim_table = spark.read.parquet("travel_modes_dim_table")
visa_categories_dim_table = spark.read.parquet("visa_categories_dim_table")

checking for duplicate rows in each table

In [170]:
find_duplicates(i_94_fact_table)

Unnamed: 0,cicid,entry_year,entry_month,origin_country_code,port_code,arrival_date,travel_mode_code,us_state_code,departure_date,age,visa_category_code,occupation,gender,birth_year,entry_date,airline,admission_number,flight_number,visa_type,count


In [171]:
find_duplicates(city_dim_table)

Unnamed: 0,port_code,city,state_code,male_population,female_population,total_population,number_of_veterans,num_foreign_born,count


In [172]:
find_duplicates(ports_dim_table)

Unnamed: 0,code,city,state_code,count


In [173]:
find_duplicates(states_dim_table)

Unnamed: 0,code,name,count


In [174]:
find_duplicates(countries_dim_table)

Unnamed: 0,code,name,count


In [175]:
find_duplicates(travel_modes_dim_table)

Unnamed: 0,code,name,count


In [176]:
find_duplicates(visa_categories_dim_table)

Unnamed: 0,code,name,count


checking for missing data in i_94 and cities tables

In [189]:
find_nulls(i_94_fact_table)

Unnamed: 0,cicid,entry_year,entry_month,origin_country_code,port_code,arrival_date,travel_mode_code,us_state_code,departure_date,age,visa_category_code,occupation,gender,birth_year,entry_date,airline,admission_number,flight_number,visa_type
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [178]:
find_nulls(city_dim_table)

Unnamed: 0,port_code,city,state_code,male_population,female_population,total_population,number_of_veterans,num_foreign_born
0,0,0,0,0,0,0,2,2


checking for duplicate primery keys in dimension tables

In [196]:
(ports_dim_table.toPandas().code.value_counts() == 1).all()

True

In [197]:
(states_dim_table.toPandas().code.value_counts() == 1).all()

True

In [198]:
(countries_dim_table.toPandas().code.value_counts() == 1).all()

True

In [199]:
(travel_modes_dim_table.toPandas().code.value_counts() == 1).all()

True

In [200]:
(visa_categories_dim_table.toPandas().code.value_counts() == 1).all()

True

check for number of rows

In [206]:
i_94_fact_table.createOrReplaceTempView('i_94_fact_table')

spark.sql("""
    SELECT 
        COUNT(*) as count
    FROM i_94_fact_table
""").show()


+-------+
|  count|
+-------+
|2823323|
+-------+



In [208]:
city_dim_table.createOrReplaceTempView('city_dim_table')

spark.sql("""
    SELECT 
        COUNT(*) as count
    FROM city_dim_table
""").show()


+-----+
|count|
+-----+
|  115|
+-----+



In [209]:
ports_dim_table.createOrReplaceTempView('ports_dim_table')

spark.sql("""
    SELECT 
        COUNT(*) as count
    FROM ports_dim_table
""").show()

+-----+
|count|
+-----+
|  583|
+-----+



#### 4.3 Data dictionary 

<code>i_94_fact_table</code>
- **cicid:** id from sas file
- **entry_year:** 4 digit year
- **entry_month:** numeric month
- **origin_country_code:** i94 country code as per SAS Labels Descriptions file
- **port_code:** i94port code as per SAS Labels Descriptions file
- **arrival_date:** date of arrival in U.S.
- **travel_mode_code:** code for travel mode of arrival as per SAS Labels Descriptions file
- **us_state_code:** two letter U.S. state code
- **departure_date:** departure date from U.S.
- **age:** age of the immigrant
- **visa_category_code:** visa category code as per SAS Labels Descriptions file
- **occupation:** occupation of immigrant
- **gender:** gender of immigrant
- **birth_year:** birth year of immigrant
- **entry_date:** Date to which admitted to U.S. (allowed to stay until)
- **airline:** airline code used to arrive in U.S.
- **admission_number:** admission number
- **flight_number:** flight number
- **visa_type:** visa type

<code>dim_city_demographics</code>
- **port_code:** i94port code
- **city:** Common name of U.S. city. Part of the candidate key to airports
- **state_code:** two letter U.S. sate code
- **male_population:** total male population
- **female_population:** total female population
- **total_population:** total population
- **number_of_veterans:** number of veterans
- **num_foreign_born:** number of foreign born

#### Step 5: Conclusion and Reflection

The target star Schema has been successfully created and passed the quality checks. The data model employed the tool and technologies: python, Pandas, and Apache Spark. The Schema is optimized to support OLAP queries. Even though the data model is created using a portion of the data, it is expected to work when we concatenate all of the data and load it with Spark. However, with a larger dataset or handling a large number of simultaneous queries, using more clusters or EMR would as the size of the data scales up.

#### Tools and Technologies usage:

Python, Pandas, and Spark enabled us to read, clean, ingest, process, load, and store the data with ease. 

Spark was chosen over Hadoop for its better flexibility in wrangling data and better adaption in working on both local machines and scaling up nicely and efficiently on a larger cluster of nodes. 

Pandas dataframes are extremely easy to explore and analyze. If we take a portion of the data in the spark session and put it into Pandas dataframe, we can implement a plethora of built-in functions that can help for serving as a proof of concept on a smaller scale, then test it on a larger scale with spark dataframes and user-defined functions.

We could use more technologies in future work. We might add Apache Airflow DAGs to automate fact table updates and data quality checks. Cloud-based storage and/or instance can be utilized to ensure that database is always up.

#### Data Model:
The star schema was designed with OLAP queries in mind. The ETL process, and sequentially the data quality checks, ensured that the dimension tables are normalized. This normalization allows for analytical queries they are more intuitive and less complex, and computationally inexpensive in case of aggregation on any of the dimensions. And due to the fact that the dimension tables are relatively small with a sufficiently large number of uniques primary keys, the joins are also intuitive and less taxing on the database.

* **When the data was increased by 100x**
We might consider deploying more work nodes for Spark with data distributed between them. If this might not suffice as data scales up, we could use instances of EC2 clusters for more processing power and Amazon RedShift S3 buckets for storage, or Amazon EMR, albeit these cloud-based options are more costly than the addition of more nodes in the cluster.

* **How do you run this pipeline on a daily basis by 7 am every day.**
In this case, we can design Apache Airflow DAGs that execute the ETL pipeline as well as data quality checks, scheduled to run every day at 7 am. The DAG structure should be well-thought-out that it has proper error handling with descriptive logging messages so we can better understand and trace errors in the data lineage.

* **How do you make your database could be accessed by 100+ people?**
In this case, we might consider creating additional tables that are optimized for the most frequent queries. However, if queries are unpredictable, we can opt to host the data in the cloud. Options include AWS RDS, and MS Azure