# Immigration Events

#### Project Summary
The objective of this project is to create an ETL pipeline for I94 immigration, global land temperatures and US demographics datasets to form an analytics database on immigration events.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Clean the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data

In [1]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql.functions import udf, trim, col, when, initcap, dayofmonth, dayofweek, month, year, weekofyear, date_format

import etl

### Step 1: Scope the Project and Gather Data

#### Scope 

To create the analytics database, the following steps will be carried out:

* Use Spark to load the data into dataframes.
* Perform data cleaning functions on all the datasets.
* Create i94 port code database from valid_i94_ports data set.
* Create immigration arrival date dimension table from I94 immigration dataset.
* Create immigration departure date dimension table from I94 immigration dataset.
* Create immigrant dimension table from the I94 immigration dataset. 
* Create usa demographics dimension table from the us cities demographics data.
* Create fact table from the clean I94 immigration dataset and the i94 ports dataframe.

The technology used in this project is Apache Spark. Data will be read and staged using Spark.

#### Describe Data 
The data being used is i94 immigration data from the US National Tourism and Trade Office, World Temperature Data from Kaggle, and a U.S. City Demographic Data from OpenSoft. To supplement the i94 immigration data valid i94_ports and i94_addr codes datasets were created.

## Create Spark Session

In [2]:
spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport()\
                    .getOrCreate()

### Step 2: Explore and Clean the Data
#### Explore the Data 



In [3]:
immigration_file = '/data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat'
temperature_file = '/data2/GlobalLandTemperaturesByCity.csv'
demographics_file = 'us-cities-demographics.csv'
i94port_file = 'valid_i94_ports.csv'
i94addr_file = 'valid_i94addr_codes.csv'

In [28]:
immigration_df =spark.read.format('com.github.saurfang.sas.spark').load(immigration_file)

In [5]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [6]:
immigration_df.show(n=15)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
| 46.0|2016.0|  12.0| 129.0| 129.0|    HOU|20789.0|    1.0|     TX|20802.0|  46.0|    1.0|  1.0|20161201|     MDD| null|      H|      O|   null|      M| 1970.0|05262018|     M|  null|     RS| 9.755413803E10| 7715|      E2|
| 56.0|2016.0|  12.0| 245.0| 245.0|    NEW|20789.0|    1.0|     OH|20835.0|  28.0|    3.0|  1.0|20161201|   

In [7]:
temp_df = spark.read.format('csv').option('header', 'true').load(temperature_file)

In [8]:
temp_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [9]:
temp_df.show(n=15)

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

In [10]:
demographics_df = spark.read.option('delimiter', ';').format('csv').option('header', 'true').load(demographics_file)

In [11]:
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [12]:
demographics_df.show(n=15)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [13]:
i94port_df = spark.read.format('csv').option('header', 'true').load('valid_i94_ports.csv')

In [14]:
i94port_df.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- _c3: string (nullable = true)



In [15]:
i94port_df.show(n=15)

+-------+--------------------+----------------+-----------+
|i94port|                City|      State Code|        _c3|
+-------+--------------------+----------------+-----------+
|    ALC|               ALCAN| AK             |       null|
|    ANC|           ANCHORAGE|     AK         |       null|
|    BAR|BAKER AAF - BAKER...|              AK|       null|
|    DAC|       DALTONS CACHE|         AK     |       null|
|    PIZ|DEW STATION PT LA...|              AK|       null|
|    DTH|        DUTCH HARBOR|        AK      |       null|
|    EGL|               EAGLE| AK             |       null|
|    FRB|           FAIRBANKS|     AK         |       null|
|    HOM|               HOMER| AK             |           |
|    HYD|               HYDER| AK             |       null|
|    JUN|              JUNEAU|  AK            |       null|
|    5KE|           KETCHIKAN|              AK|       null|
|    KET|           KETCHIKAN|     AK         |       null|
|    MOS|MOSES POINT INTER...|          

In [16]:
i94addr_df = spark.read.format('csv').option('header', 'true').load('valid_i94addr_codes.csv')

In [17]:
i94addr_df.printSchema()

root
 |-- i94addr: string (nullable = true)
 |-- State: string (nullable = true)



In [18]:
i94addr_df.show(n=15)

+-------+-----------------+
|i94addr|            State|
+-------+-----------------+
|     AL|          ALABAMA|
|     AK|           ALASKA|
|     AZ|          ARIZONA|
|     AR|         ARKANSAS|
|     CA|       CALIFORNIA|
|     CO|         COLORADO|
|     CT|      CONNECTICUT|
|     DE|         DELAWARE|
|     DC|DIST. OF COLUMBIA|
|     FL|          FLORIDA|
|     GA|          GEORGIA|
|     GU|             GUAM|
|     HI|           HAWAII|
|     ID|            IDAHO|
|     IL|         ILLINOIS|
+-------+-----------------+
only showing top 15 rows



#### Cleaning Steps

Based on looking at the different datasets, the cleaning steps for all datasets should be to remove columns that seem to have a vast majority of null values and remove duplicate rows. For each dataset there will also be addtional clean up steps that will be taken to ensure the data fits our analysis.

The i94port dataset must remove the "_c3" column since it does not have any valueable information for our analysis. All null values are replaced by the value 'N/A', to symbolize that the information is not available. Also the "State Code" column has varying amounts of empty space that needs to be removed.

In [19]:
def clean_i94port_data(i94port_df):
    """ Cleans i94port code dataset
    
    Parameters
    -----------
    i94port_df: dataframe containing the valid i94port values
    
    Returns
    --------
    i94port_df = spark dataframe 
    """    
    i94port_df = i94port_df.drop(i94port_df._c3)
    i94port_df = i94port_df.withColumn('City', initcap(col('City')))
    i94port_df = i94port_df.fillna('N/A')
    remove_space_udf = udf(lambda s: s.replace(" ", ""), T.StringType())
    i94port_df = i94port_df.withColumn('State Code', remove_space_udf(col('State Code')))
    
    return i94port_df

In [20]:
i94port_df = clean_i94port_data(i94port_df)
i94port_df.show(n=10)

+-------+--------------------+----------+
|i94port|                City|State Code|
+-------+--------------------+----------+
|    ALC|               Alcan|        AK|
|    ANC|           Anchorage|        AK|
|    BAR|Baker Aaf - Baker...|        AK|
|    DAC|       Daltons Cache|        AK|
|    PIZ|Dew Station Pt La...|        AK|
|    DTH|        Dutch Harbor|        AK|
|    EGL|               Eagle|        AK|
|    FRB|           Fairbanks|        AK|
|    HOM|               Homer|        AK|
|    HYD|               Hyder|        AK|
+-------+--------------------+----------+
only showing top 10 rows



For the i94 immigration dataset, i94port and i94addr rows that do not match values in the valid i94port/i94addr codes datset will be removed by doing left joins. All "arrdate" and "depdate" rows with "NaN" values will be removed.

Once the data has been cleaned, some value modifications will be performed to better suit out analysis. The visa value will be converted from a number representation to its true value taken from the i94 immigration labels descriptions. The "arrdate" and "depdate" will both be converted from second since epoch to Y-M-D format for easier understandability. All null values in "entdepu" have been replaced with "N/A"

In [29]:
def clean_immigration_data(og_immigration_df, i94port_df, i94addr_df):
    """ Removes rows with invalid values and columns
    with a majority of null values.
    
    Parameters
    -----------
    og_immigration_df: immigration dataframe as it comes from the file
    i94port_df: dataframe containing the valid i94port values
    i94addr_df: dataframe containing the valid i94addr values
    
    Returns
    --------
    immigration_df = cleaned dataframe 
    """    
    immigration_df = og_immigration_df.drop(og_immigration_df.fltno)
    
    immigration_df = immigration_df.join(i94addr_df, ['i94addr'], 'leftsemi')
    immigration_df = immigration_df.join(i94port_df, ['i94port'], 'leftsemi')
    
    immigration_df = immigration_df.filter(immigration_df['arrdate'] != 'NaN')
    immigration_df = immigration_df.filter(immigration_df['depdate'] != 'NaN')

    return immigration_df

def _convert_datetime(x):
    start = datetime(1960, 1, 1)
    return start + timedelta(days=int(x))

def manipulate_immigration_data(immigration_df):
    """ Updates values of some columns to facilitate analysis
    Parameters
    -----------
    immigration_df: cleaned immigration dataframe 
    
    Returns
    --------
    immigration_df = updated dataframe 
    """
    visa_value = when(immigration_df["i94visa"] == 1,
                        'Buisness').when(immigration_df['i94visa']==2,
                                         'Pleasure').otherwise('Student')
    immigration_df = immigration_df.withColumn('i94visa', visa_value)
    
    udf_datetime_convert = udf(lambda x: _convert_datetime(x), T.DateType())
    immigration_df = immigration_df.withColumn('arrdate', 
                                               udf_datetime_convert('arrdate'))
    immigration_df = immigration_df.withColumn('depdate', 
                                               udf_datetime_convert('depdate'))
    immigration_df = immigration_df.fillna('N/A', subset=['entdepu'])
    

    return immigration_df

In [30]:
immigration_df = clean_immigration_data(immigration_df, i94port_df, i94addr_df)
immigration_df = manipulate_immigration_data(immigration_df)
immigration_df.show(n=10)

+-------+-------+-----+------+------+------+------+----------+-------+----------+------+--------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+--------+
|i94port|i94addr|cicid| i94yr|i94mon|i94cit|i94res|   arrdate|i94mode|   depdate|i94bir| i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|visatype|
+-------+-------+-----+------+------+------+------+----------+-------+----------+------+--------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+--------+
|    HOU|     TX| 46.0|2016.0|  12.0| 129.0| 129.0|2016-12-01|    1.0|2016-12-14|  46.0|Buisness|  1.0|20161201|     MDD| null|      H|      O|    N/A|      M| 1970.0|05262018|     M|  null|     RS| 9.755413803E10|      E2|
|    NEW|     OH| 56.0|2016.0|  12.0| 245.0| 245.0|2016-12-01|    1.0|2017-01-16|  28.0| Student|  1.0|2

For the temperature data set, we will only keep the rows that are for the United States and have a date in the 2000s which fit the scope of the analysys. The values in the "City" column will be capitalized to match how the value appears in the other datasets.

In [31]:
def clean_temperature_data(og_temperature_df):
    """ Removes rows with duplicates, data is that irrelevant to our analysis
    
    Parameters
    -----------
    og_temperature_df: temp dataframe as it comes from the file
    
    Returns
    --------
    temperature_df = cleaned temperature dataframe 
    """    
    temperature_df = og_temperature_df.dropDuplicates(['dt','City', 'Country']) 
    temperature_df = temperature_df.filter(temperature_df.AverageTemperature.isNotNull())
    temperature_df = temperature_df.filter(temperature_df.Country == 'United States')
    temperature_df = temperature_df.withColumn('year', year('dt'))
    temperature_df = temperature_df.filter(temperature_df.year.rlike('201|200'))
    demographics_df = demographics_df.fillna('N/A', subset=['State Code'])
    

    return temperature_df

In [32]:
temperature_df = clean_temperature_data(temp_df)
temperature_df.show(n=10)

+----------+-------------------+-----------------------------+----------------+-------------+--------+---------+----+
|        dt| AverageTemperature|AverageTemperatureUncertainty|            City|      Country|Latitude|Longitude|year|
+----------+-------------------+-----------------------------+----------------+-------------+--------+---------+----+
|2000-01-01| 11.484000000000002|                        0.336|        Savannah|United States|  31.35N|   81.05W|2000|
|2000-01-01|             10.081|                        0.228|       Sunnyvale|United States|  37.78N|  122.03W|2000|
|2000-02-01|-0.8250000000000002|                        0.264|Sterling Heights|United States|  42.59N|   82.91W|2000|
|2000-02-01|              4.659|          0.14800000000000002|          Topeka|United States|  39.38N|   95.72W|2000|
|2000-02-01|             12.377|                        0.397|          Tucson|United States|  31.35N|  111.20W|2000|
|2000-05-01| 25.316999999999997|                        

For the US city demogrpahics dataset, the "City" column is capitalized and rows with null values are removed.

In [None]:
def clean_demographics_data(demographics_df):
    """ Cleans demographics dataset
    
    Parameters
    -----------
    demographics_df: city demogrpahics dataframe as it comes from the file
    
    Returns
    --------
    demographics_df = cleaned demogrpahics dataframe 
    """    
    demographics_df = demographics_df.dropDuplicates(['City', 'State', 'State Code'])
    demographics_df = demographics_df.filter(demographics_df.City.isNotNull())
    
    return demographics_df

In [34]:
demographics_df = clean_demographics_data(demographics_df)
demographics_df.show(n=10)

+--------------+-----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|          City|      State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+--------------+-----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|  Delray Beach|    Florida|      47.9|          32219|            34042|           66261|              4232|       16639|                  2.35|        FL|               Asian|  1696|
|   Jersey City| New Jersey|      34.3|         131765|           132512|          264277|              4374|      109186|                  2.57|        NJ|  Hispanic or Latino| 79718|
|     Rockville|   Maryland|      38.1|          31205|            35793|  

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

<img src="Immigrations_Model.png">

#### 3.2 Data Pipeline
The steps necessary to create this data pipeline are:
* Load the datasets
* Clean the datasets to better suit the analysis
* Create immigrations table to be the fact table using data from the i94 immigration data set and the i94 port codes
* Create the 5 dimension tables:
    * Create the immigrants table using the i94 immigration data set, with the cicid as the connection to the fact table
    * Create the arrivals table using the i94 immigration data set, with the arrdate as the connection to the fact table
    * Create the departures table using the i94 immigration data set, with the depdate as the connection to the fact table
    * Create the temperatures table using the global temperature data set, with the a joined key of the City and State Code columns as the connection to the fact table
    * Create the demographics table using the city demographics data set, with the a joined key of the City and State Code columns as the connection to the fact table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


Create the immigrations fact table:

In [72]:
def create_fact_table(immigration_df):
    """ Create immigrations fact table
    
    Parameters
    -----------
    immigration_df: i94 immigration dataframe that has been cleaned and updated
    
    Returns
    --------
    fact_table = immigrations table 
    """    
    fact_table = immigration_df.join(i94port_df, ['i94port'], how='full').select('visapost','arrdate', 'depdate', 'i94mode', 
                                                                              col('cicid').alias('id'), 
                                                                              col('City').alias('City'), 
                                                                              'State Code', 'i94port','count', 
                                                                              'visatype', 'entdepa','entdepd',
                                                                              'entdepu','admnum')
    return fact_table

In [73]:
immigrations_table = create_fact_table(immigration_df)
immigrations_table.show(n=10)

+--------+----------+----------+-------+--------+------+----------+-------+-----+--------+-------+-------+-------+---------------+
|visapost|   arrdate|   depdate|i94mode|      id|  City|State Code|i94port|count|visatype|entdepa|entdepd|entdepu|         admnum|
+--------+----------+----------+-------+--------+------+----------+-------+-----+--------+-------+-------+-------+---------------+
|     SYD|2016-12-01|2017-01-03|    1.0| 88816.0|Bangor|        ME|    BGM|  1.0|      E2|      G|      R|    N/A|1.5336561285E10|
|    null|2016-12-01|2016-12-12|    1.0| 99713.0|Bangor|        ME|    BGM|  1.0|      WT|      G|      O|    N/A|1.5333603185E10|
|    null|2016-12-01|2016-12-12|    1.0|113670.0|Bangor|        ME|    BGM|  1.0|      WT|      G|      O|    N/A|1.5332381285E10|
|     KWT|2016-12-01|2016-12-03|    1.0|172764.0|Bangor|        ME|    BGM|  1.0|      B1|      G|      R|    N/A|1.5240966785E10|
|     KWT|2016-12-01|2016-12-03|    1.0|174077.0|Bangor|        ME|    BGM|  1.0|  

Create the immigrants dimension table:

In [40]:
def _convert_year(year):
    current_year = int(datetime.now().strftime('%Y'))
    year = int(year)
    age = current_year - year
    return age

def create_immigrants_table(df):
    """ Create immigrans dimenson table
    
    Parameters
    -----------
    df: i94 immigration dataframe that has been cleaned and updated
    
    Returns
    --------
     immigrants_df = immigrants table 
    """    
    immigrants_df = df.select(col('cicid').alias('id'), 'gender', col('biryear').alias('age'), col('i94visa').alias('visa'), 'occup')

    udf_age_converter = udf(lambda x: _convert_year(x), T.IntegerType())
    immigrants_df = immigrants_df.withColumn('age', udf_age_converter('age'))

    return immigrants_df

In [41]:
immigrants_table = create_immigrants_table(immigration_df)
immigrants_table.show(n=10)

+----+------+---+--------+-----+
|  id|gender|age|    visa|occup|
+----+------+---+--------+-----+
|46.0|     M| 50|Buisness| null|
|56.0|     F| 32| Student| null|
|67.0|     M| 52|Pleasure| null|
|68.0|     F| 50|Pleasure| null|
|69.0|     M| 52|Pleasure| null|
|70.0|     F| 13|Pleasure| null|
|71.0|     F| 38|Pleasure| null|
|72.0|     F| 73|Pleasure| null|
|73.0|     F| 70|Pleasure| null|
|74.0|     M| 60|Pleasure| null|
+----+------+---+--------+-----+
only showing top 10 rows



Use function below to create both the departures and arrivals calendar tables

In [44]:
def create_calendar_table(df, date_column):
    """ Create calendar type dimenson table
    
    Parameters
    -----------
    df: i94 immigration dataframe that has been cleaned and updated
    date_column: column to be used to extract the date types
    
    Returns
    --------
     calendar_df = calendar type table 
    """    
    calendar_df = df.select(date_column).distinct()

    calendar_df = calendar_df.withColumn('year', year(date_column))
    calendar_df = calendar_df.withColumn('month', month(date_column))
    calendar_df = calendar_df.withColumn('day', dayofmonth(date_column))
    calendar_df = calendar_df.withColumn('week', weekofyear(date_column))
    calendar_df = calendar_df.withColumn('weekday', date_format(date_column,'E'))
    
    return calendar_df

Create the arrival dates dimension table:

In [45]:
arrivals_table = create_calendar_table(immigration_df, 'arrdate')
arrivals_table.show(n=10)

+----------+----+-----+---+----+-------+
|   arrdate|year|month|day|week|weekday|
+----------+----+-----+---+----+-------+
|2016-12-19|2016|   12| 19|  51|    Mon|
|2016-12-12|2016|   12| 12|  50|    Mon|
|2016-12-13|2016|   12| 13|  50|    Tue|
|2016-12-15|2016|   12| 15|  50|    Thu|
|2016-12-20|2016|   12| 20|  51|    Tue|
|2016-12-21|2016|   12| 21|  51|    Wed|
|2016-12-22|2016|   12| 22|  51|    Thu|
|2016-12-03|2016|   12|  3|  48|    Sat|
|2016-12-25|2016|   12| 25|  51|    Sun|
|2016-12-31|2016|   12| 31|  52|    Sat|
+----------+----+-----+---+----+-------+
only showing top 10 rows



Create the departure dates dimension table:

In [46]:
departures_table = create_calendar_table(immigration_df, 'depdate')
departures_table.show(n=10)

+----------+----+-----+---+----+-------+
|   depdate|year|month|day|week|weekday|
+----------+----+-----+---+----+-------+
|2016-03-01|2016|    3|  1|   9|    Tue|
|2017-01-06|2017|    1|  6|   1|    Fri|
|2017-01-27|2017|    1| 27|   4|    Fri|
|2017-02-26|2017|    2| 26|   8|    Sun|
|2016-12-19|2016|   12| 19|  51|    Mon|
|2017-01-24|2017|    1| 24|   4|    Tue|
|2016-11-08|2016|   11|  8|  45|    Tue|
|2016-07-03|2016|    7|  3|  26|    Sun|
|2017-02-16|2017|    2| 16|   7|    Thu|
|2017-04-09|2017|    4|  9|  14|    Sun|
+----------+----+-----+---+----+-------+
only showing top 10 rows



Create the temperatures dimension table:

In [54]:
def create_city_temps_table(df, i94port_df):
    """ Create temperatures dimenson table
    
    Parameters
    -----------
    df: temperature dataframe that has been cleaned and updated
    i94port_df: cleaned dataframe containing the valid i94port values
    
    Returns
    --------
    temps_df = temperatures table 
    """    
    temps_df = df.join(i94port_df, ['City'], how='left')
    temps_df = temps_df.withColumn('AverageTemperature', col('AverageTemperature').cast('double'))
    temps_df = temps_df.select(col('dt').alias('Date'), 'City', 'Country', 'State Code', 'AverageTemperature', 'Latitude', 'Longitude')
    return temps_df

In [55]:
temperatures_table = create_city_temps_table(temperature_df,i94port_df)
temperatures_table.show(n=10)

+----------+----------------+-------------+----------+-------------------+--------+---------+
|      Date|            City|      Country|State Code| AverageTemperature|Latitude|Longitude|
+----------+----------------+-------------+----------+-------------------+--------+---------+
|2000-01-01|        Savannah|United States|        GA| 11.484000000000002|  31.35N|   81.05W|
|2000-01-01|       Sunnyvale|United States|      null|             10.081|  37.78N|  122.03W|
|2000-02-01|Sterling Heights|United States|      null|-0.8250000000000002|  42.59N|   82.91W|
|2000-02-01|          Topeka|United States|      null|              4.659|  39.38N|   95.72W|
|2000-02-01|          Tucson|United States|        AZ|             12.377|  31.35N|  111.20W|
|2000-05-01|        Beaumont|United States|        TX| 25.316999999999997|  29.74N|   94.15W|
|2000-05-01|        El Monte|United States|      null|             19.073|  34.56N|  118.70W|
|2000-05-01|   Oklahoma City|United States|        OK|      

Create the city demographics table:

In [64]:
def create_demographics_table(df):
    """ Create demographics dimenson table
    
    Parameters
    -----------
    df: demographics dataframe that has been cleaned and updated
    
    Returns
    --------
    demo_df = demographics table 
    """    
    demo_df = df.withColumn('Male Population', col('Male Population').cast('int'))
    demo_df = demo_df.withColumn('Female Population', col('Female Population').cast('int'))
    demo_df = demo_df.withColumn('Total Population', col('Total Population').cast('int'))
    demo_df = demo_df.withColumn('Foreign-born', col('Foreign-born').cast('int'))
    demo_df = demo_df.select('City', 'State Code', 'State', 'Male Population', 'Female Population', 
                             'Total Population', 'Foreign-born', 'Race')
    return demo_df

In [65]:
demographics_table = create_demographics_table(demographics_df)
demographics_table.show(n=10)

+--------------+----------+-----------+---------------+-----------------+----------------+------------+--------------------+
|          City|State Code|      State|Male Population|Female Population|Total Population|Foreign-born|                Race|
+--------------+----------+-----------+---------------+-----------------+----------------+------------+--------------------+
|  Delray Beach|        FL|    Florida|          32219|            34042|           66261|       16639|               Asian|
|   Jersey City|        NJ| New Jersey|         131765|           132512|          264277|      109186|  Hispanic or Latino|
|     Rockville|        MD|   Maryland|          31205|            35793|           66998|       25047|American Indian a...|
|      Alhambra|        CA| California|          42184|            43388|           85572|       44441|American Indian a...|
|    Cincinnati|        OH|       Ohio|         143654|           154883|          298537|       16896|               White|


#### 4.2 Data Quality Check

In [74]:
tables = {'departures table': departures_table,
          'demographics table': demographics_table,
          'temperatures table': temperatures_table,
          'arrivals table': arrivals_table,
          'immigrants table': immigrants_table, 
          'immigrations table': immigrations_table}

In [None]:
for table_name, table in tables.items():
    print(f"Performing data quality check on table {table_name}...")
    total_count = table.count()
    if total_count == 0:
        print(f"Data quality check failed for {table_name} with zero records!")
    else:
        for column in table.schema.names:
            null_values = table.select(col(f'{column}')).where(col(f'{column}').isNull())
            if null_values.first():
                print(f"Found NULL values in {column} column!")
        print(f"Data quality check passed for {table_name} with {total_count:,} records.")

#### 4.3 Data dictionary 

##### Immigration Fact Table 

| Immigrations Table | |
| --- | --- | 
| id| The immigrants  id (Primary Key)| 
| arrdate| Date of the immigrant's arrival to the USA (Foreign Key) |
| depdate | Date of the immigrant's departure to the USA (Foreign Key)|
| i94mode | The way immigrant arrived to the USA. '1' = Air, '2' = Sea, '3' = Land, '9' = Not reported |
| City| The city the port of admission is located (Foreign Key)| 
| State Code| 2 letter abbreviation of the state (Foreign Key)|
| i94port| Port of admission | 
| count| The number of people|
| visapost| Department of State where where Visa was issued|
| entdepa| Arrival Flag - admitted or paroled into the U.S |
| entdepd| Departure Flag - Departed, lost I-94 or is deceased|
| admnum| Admission Number|


##### Immigrants Table
| Immigrants Table | |
| --- | --- | 
| id| The immigrants  id (Primary Key) |
| gender| Male or Female | 
| age | Age of immigrant |
| visa | Type of Visa |
|occup| Ocupation that will be performed|

##### Arrivals Table
| Arrivals Table | |
| --- | --- | 
| arrdate| arrival date to the USA
| year| year of arrival |
| month| month of arrival|
| day| day of arrival |
| week| week of the year|
|weekday| day of the week |

##### Departures Table
| Departures Table | |
| --- | --- | 
| depdate|departure date to the USA
| year| year of departure |
| month| month of departure|
| day| day of departure |
| week| week of the year|
|weekday| day of the week |


##### Temperatures Table
| Temperatures Table | |
| --- | --- | 
| City | City Name |
| Date | date in YYYY-MM-DD format |
| AverageTemperature | The average temperature on that day |
| Country | Country Name |
| Latitude | The latitude coordinates for the city |
| Longitude| The longitude coordinates for the city | 
| State Code | The 2 letter abbreviation for the state|

##### Demographics Table
| Demographics Table | |
| --- | --- | 
| City| City Name |
| State Code| The 2 letter abbreviation for the state|    
| State| The full state name |
| Male Population| The total number of males in the state|
| Female Population| The total number of females in the state|
| Total Population| The total amount of people in the city | 
| Foreign-born| The amount of people who were not born in the USA|
| Race| The most prominent race in the city|

#### Step 5:  Project Write Up
Consdiering the  size of the immigration dataset for only a month, combined with the temperature, port codes and demographic dataset, Spark seems like the best option for processing. The datasets dont get updated rapidly since they are montly so montly update would suffice and thus no need for the use of Apache Airflow at the moment. 

Alternate requirement scenarios:
* If the data was increased by 100x, I would be store data in an Amazon S3 bucket and load it to our staging tables. The ETL would still happen using Spark, since its well suited for large datasets. 
* If the data populates a dashboard that must be updated on a daily basis by 7am every day, we would use Apache Airflow to perform the ETL and data qualtiy validation.
* If the database needed to be accessed by 100+ people, the data can be stored in a database on a redshift cluster which allows for multiple user access.

NOTE: The whole project is done in this notebook but python scripts (etl.py, utils.py and create_tables.py) have been created for reading the data files from an Amazon S3 bucket and write parquet files.