# Project Title
### Data Engineering Capstone Project

#### Project Summary
The capstone project aims to answer questions about the immigration to U.S.. Questions about the immigrants profile, the most visa types issued by U.S. government, weather patterns and how they can influence the immigration's statistics are some of the answers we can get from the data provided for this project.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import re
from datetime import datetime, timedelta

### Step 1: Scope the Project and Gather Data

#### Scope 
The project's goal is to use 3 datasets and manipulate them using Pyspark. By doing so, it will be created a data model proposed by Ralph Kimball called star schema. It will also be created 6 dimensional tables and 1 fact table.

#### Describe and Gather Data 
For this project it was used 3 data sources:
- **I94 immigration data**: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from.
- **World Temperature Data**: The dataset comes from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
- **U.S. City Demographic Data**: This data comes from OpenSoft. You can read more about it [here](https://datahub.io/core/airport-codes#data).

### Step 2: Explore and Assess the Data
#### Exploring the Data and Cleaning it.

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [3]:
input_file = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

df.show(3, False)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid|i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum       |fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|6.0  |2016.0|4.0   |692.0 |692.0 |XXX    |20573.0|null   |null   |null   |37.0  |2.0    |1.0  |null    |null    |null |T      |null   |U      |null   |1979.0 |10282016|null  |null  |null   |1.897628485E9|null |B2      |
|7.0  |2016.0|4.0   |254.0 |276.0 |ATL    |20551.0|1.0    |AL     |null   |25.0  |3.0    |1.0  |20130811|SEO     |nu

In [4]:
# convert SAS date into Pyspark date.
convert_date = F.udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x else None)

df = df.withColumn('arrival_date', convert_date('arrdate')).withColumn('departure_date', convert_date('depdate'))

In [5]:
df = df.drop('entdepa', 'entdepu', 'entdepd', 'insnum', 'admnum', 'fltno', 'occup', 'visapost', 'dtadfile', 'count', 'dtaddto', 'i94yr', 'matflag', 'arrdate', 'depdate', 'airline', 'i94mode', 'i94visa', 'i94mon')

In [6]:
df.show(3, False)

+-----+------+------+-------+-------+------+-------+------+--------+------------+--------------+
|cicid|i94cit|i94res|i94port|i94addr|i94bir|biryear|gender|visatype|arrival_date|departure_date|
+-----+------+------+-------+-------+------+-------+------+--------+------------+--------------+
|6.0  |692.0 |692.0 |XXX    |null   |37.0  |1979.0 |null  |B2      |2016-04-29  |null          |
|7.0  |254.0 |276.0 |ATL    |AL     |25.0  |1991.0 |M     |F1      |2016-04-07  |null          |
|15.0 |101.0 |101.0 |WAS    |MI     |55.0  |1961.0 |M     |B2      |2016-04-01  |2016-08-25    |
+-----+------+------+-------+-------+------+-------+------+--------+------------+--------------+
only showing top 3 rows



In [7]:
df = df.withColumnRenamed('visatype', 'visa_type')\
        .withColumnRenamed('cicid', 'id')\
        .withColumnRenamed('i94cit', 'citizenship_code')\
        .withColumnRenamed('i94res', 'residence_code')\
        .withColumnRenamed('i94port', 'city_code')\
        .withColumnRenamed('i94addr', 'state_code')\
        .withColumnRenamed('i94bir', 'age')

In [8]:
df = df.dropna()

In [9]:
df.orderBy('departure_date').where('departure_date < arrival_date').show(10, False)

+---------+----------------+--------------+---------+----------+----+-------+------+---------+------------+--------------+
|id       |citizenship_code|residence_code|city_code|state_code|age |biryear|gender|visa_type|arrival_date|departure_date|
+---------+----------------+--------------+---------+----------+----+-------+------+---------+------------+--------------+
|6073731.0|123.0           |123.0         |PHU      |IL        |50.0|1966.0 |M     |WB       |2016-04-20  |2001-07-20    |
|5930761.0|112.0           |112.0         |MIA      |CA        |79.0|1937.0 |F     |WT       |2016-04-11  |2012-04-12    |
|5932223.0|999.0           |112.0         |SDP      |CA        |56.0|1960.0 |F     |WT       |2016-04-11  |2012-04-12    |
|5904464.0|180.0           |135.0         |HHW      |HI        |75.0|1941.0 |F     |WT       |2016-04-05  |2012-04-14    |
|5930054.0|438.0           |464.0         |HHW      |HI        |73.0|1943.0 |M     |WT       |2016-04-20  |2014-04-22    |
|5944554.0|126.0

In [10]:
# We checked out there are rows with departure date being earlier than arrival date, which should be impossible. Let's clean this.
df = df.where('departure_date > arrival_date')
df.show(10, False)

+----+----------------+--------------+---------+----------+----+-------+------+---------+------------+--------------+
|id  |citizenship_code|residence_code|city_code|state_code|age |biryear|gender|visa_type|arrival_date|departure_date|
+----+----------------+--------------+---------+----------+----+-------+------+---------+------------+--------------+
|15.0|101.0           |101.0         |WAS      |MI        |55.0|1961.0 |M     |B2       |2016-04-01  |2016-08-25    |
|27.0|101.0           |101.0         |BOS      |MA        |58.0|1958.0 |M     |B1       |2016-04-01  |2016-04-05    |
|28.0|101.0           |101.0         |ATL      |MA        |56.0|1960.0 |F     |B1       |2016-04-01  |2016-04-05    |
|29.0|101.0           |101.0         |ATL      |MA        |62.0|1954.0 |M     |B2       |2016-04-01  |2016-04-17    |
|30.0|101.0           |101.0         |ATL      |NJ        |49.0|1967.0 |M     |B2       |2016-04-01  |2016-05-04    |
|31.0|101.0           |101.0         |ATL      |NY      

In [11]:
# Get the port_codes.
with open('I94_SAS_Labels_Descriptions.SAS') as file:
    data = file.readlines()

city_regex = re.compile(r"(\s*)\'(.{0,3})\'[\t*\s*]=[\t*\s*]\'(.*)\'")
#city_regex = re.compile(r"(\s*)\'([A-Z]{3})\'\t=\t\'([^\s^\t]*$)")
port_codes = {}
for city in data[303:893]:
    founds = city_regex.search(city)
    port_codes[founds.group(2)] = founds.group(3)

list_map = list(map(list, port_codes.items()))

port_codes_df = spark.createDataFrame(list_map, ['port_code', 'location'])

port_codes_df = port_codes_df.withColumn('location', F.trim(F.col('location')))
port_codes_df = port_codes_df.withColumn('city_name', F.split('location', ',')[0])\
        .withColumn('state_code', F.trim(F.split('location', ',')[1]))

In [12]:
port_codes_df.show(3, False)

+---------+----------------------------+------------------------+----------+
|port_code|location                    |city_name               |state_code|
+---------+----------------------------+------------------------+----------+
|ANC      |ANCHORAGE, AK               |ANCHORAGE               |AK        |
|BAR      |BAKER AAF - BAKER ISLAND, AK|BAKER AAF - BAKER ISLAND|AK        |
|DAC      |DALTONS CACHE, AK           |DALTONS CACHE           |AK        |
+---------+----------------------------+------------------------+----------+
only showing top 3 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
As previously mentioned, the chosen data model was the star schema, which was proposed by Ralph Kimball. That model was the chosen one because it allows great performance, which is desired since the dataset can increase, and it also allows users to write simple queries joining the fact and dimension tables in order to achieve the analytical dataset they need. The tables are as follow:
![Data Model1](assets/data_model2.drawio.svg)



#### 3.2 Mapping Out Data Pipelines
1. Create a Spark dataframe with SAS data and csv files.
2. Clean the data by removing duplicates, NA values, and renaming fields with more meaningful name.
3. Create dimension tables and fact tables.
4. Save the final tables on parquet format.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [13]:
# Creating dim_time_table
'arrival_date'
def create_dim_time_table(spark_df):
    """
        It takes dates from a spark_df and extracts its month, day, week, year.
        
        :param: spark_df: spark dataframe
    """
      
    spark_df = spark_df.select(F.explode(F.array('arrival_date', 'departure_date')).alias('date'))    
    
    
    # Make it wider with more info
    spark_df = spark_df.withColumn('day', F.dayofmonth(spark_df['date']))\
    .withColumn('month', F.month(spark_df['date']))\
    .withColumn('year', F.year(spark_df['date']))\
    .withColumn('week', F.weekofyear(spark_df['date']))
    
    spark_df = spark_df.drop_duplicates(subset=['date'])  
    
    return spark_df

In [14]:
dim_time_table = create_dim_time_table(df)
dim_time_table.orderBy('date').show(3, False)

+----------+---+-----+----+----+
|date      |day|month|year|week|
+----------+---+-----+----+----+
|2016-04-01|1  |4    |2016|13  |
|2016-04-02|2  |4    |2016|13  |
|2016-04-03|3  |4    |2016|13  |
+----------+---+-----+----+----+
only showing top 3 rows



In [15]:
def create_dim_immigrant_table(spark_df):
    """
        It takes a spark dataframe and extract immigrant data.
        
        :param: spark_df: spark dataframe
    """
    
    df = spark_df.select('id', 'age', 'biryear', 'gender')
    
    df = df.drop_duplicates(subset=['id'])
    
    return df

In [16]:
dim_immigrant = create_dim_immigrant_table(df)

In [17]:
dim_immigrant.show(3, False)

+-----+----+-------+------+
|id   |age |biryear|gender|
+-----+----+-------+------+
|558.0|42.0|1974.0 |M     |
|596.0|24.0|1992.0 |M     |
|934.0|54.0|1962.0 |F     |
+-----+----+-------+------+
only showing top 3 rows



In [18]:
# removing fields that belong to the dimension table.
df = df.drop('age', 'biryear', 'gender')

In [19]:
# Creating dim_visa_type table
def create_dim_visa_type(spark_df):
    """
        It takes spark_df and creates a visa table.
        
        :param: spark_df: spark dataframe
    """
    
    # Get the unique visa types
    df = spark_df.select('visa_type').distinct()
    
    df = df.withColumn('visa_id', F.monotonically_increasing_id())
    
    df.createOrReplaceTempView('visa_code')
    
    # Classifying visa tyes into 3 categories, according to the data source
    df = spark.sql("""
                SELECT visa_id, visa_type, 
                CASE 
                    WHEN visa_type in ('B1', 'WB', 'GB', 'GMB', 'I', 'I1', 'E1', 'E2') THEN 'Business'
                    WHEN visa_type in ('B2', 'WT', 'GT', 'GMT', 'CP', 'CPL', 'SBP') THEN 'Pleasure'
                    WHEN visa_type in ('F1', 'F2', 'M1', 'M2') THEN 'Student'
                    ELSE 'not defined'
                END as visa_category
                FROM visa_code
            """)
    return df
    

In [20]:
dim_visa = create_dim_visa_type(df)
dim_visa.show(truncate = False)

+-------------+---------+-------------+
|visa_id      |visa_type|visa_category|
+-------------+---------+-------------+
|103079215104 |F2       |Student      |
|352187318272 |GMB      |Business     |
|369367187456 |B2       |Pleasure     |
|498216206336 |F1       |Student      |
|601295421440 |CPL      |Pleasure     |
|704374636544 |I1       |Business     |
|738734374912 |WB       |Business     |
|747324309504 |M1       |Student      |
|807453851648 |B1       |Business     |
|884763262976 |WT       |Pleasure     |
|1151051235328|M2       |Student      |
|1314259992576|CP       |Pleasure     |
|1331439861760|GMT      |Pleasure     |
|1348619730944|E1       |Business     |
|1391569403904|I        |Business     |
|1554778161152|E2       |Business     |
|1709396983808|SBP      |Pleasure     |
+-------------+---------+-------------+



In [21]:
dim_visa.createOrReplaceTempView('visa')
df.createOrReplaceTempView('df')

df = spark.sql("""
            SELECT df.*, visa.visa_id
            FROM df
            JOIN visa ON visa.visa_type = df.visa_type
        """)

In [22]:
df.show(3, False)

+-------+----------------+--------------+---------+----------+---------+------------+--------------+------------+
|id     |citizenship_code|residence_code|city_code|state_code|visa_type|arrival_date|departure_date|visa_id     |
+-------+----------------+--------------+---------+----------+---------+------------+--------------+------------+
|1980.0 |104.0           |104.0         |ATL      |MS        |F2       |2016-04-01  |2016-04-08    |103079215104|
|51783.0|151.0           |151.0         |NYC      |NY        |F2       |2016-04-01  |2016-04-28    |103079215104|
|52934.0|206.0           |206.0         |TOR      |NY        |F2       |2016-04-01  |2016-04-12    |103079215104|
+-------+----------------+--------------+---------+----------+---------+------------+--------------+------------+
only showing top 3 rows



In [23]:
df = df.drop('visa_type')

In [24]:
df.show(3, False)

+-------+----------------+--------------+---------+----------+------------+--------------+------------+
|id     |citizenship_code|residence_code|city_code|state_code|arrival_date|departure_date|visa_id     |
+-------+----------------+--------------+---------+----------+------------+--------------+------------+
|1980.0 |104.0           |104.0         |ATL      |MS        |2016-04-01  |2016-04-08    |103079215104|
|51783.0|151.0           |151.0         |NYC      |NY        |2016-04-01  |2016-04-28    |103079215104|
|52934.0|206.0           |206.0         |TOR      |NY        |2016-04-01  |2016-04-12    |103079215104|
+-------+----------------+--------------+---------+----------+------------+--------------+------------+
only showing top 3 rows



In [25]:
demo_input_file = 'us-cities-demographics.csv'

def create_dim_demographic_table(spark, demo_input_file):
    """
        It spark session and location of a file for demographics data. It creates a demographic table.
        
        :param: spark: spark session.
        :param: demo_input_file: Location of demographic data.
        
    """
    df = spark.read.format('csv').option('delimiter', ';').option('header', True).load(demo_input_file, inferSchema=True) 
    
    # Get columns with NA values
    columns_to_dropna = []
    length = len(df.toPandas().isna().sum())

    for i in range(length):
        if df.toPandas().isna().sum().values[i] != 0:
            columns_to_dropna.append(df.toPandas().isna().sum().index[i])
    columns_to_dropna
    
    # Drop NA values and remove duplicates
    df = df.dropna(subset=columns_to_dropna)
    
    df = df.drop_duplicates(subset=['City', 'State', 'State Code', 'Race'])
    
    df = df.withColumnRenamed('City', 'city')\
    .withColumnRenamed('State', 'state')\
    .withColumnRenamed('Median Age', 'median_age')\
    .withColumnRenamed('Male Population', 'male_population')\
    .withColumnRenamed('Female Population', 'female_population')\
    .withColumnRenamed('Total Population', 'total_population')\
    .withColumnRenamed('Number of Veterans', 'number_of_veterans')\
    .withColumnRenamed('Foreign-born', 'foreign_born')\
    .withColumnRenamed('Average Household Size', 'average_household_size')\
    .withColumnRenamed('State Code', 'state_code')\
    .withColumnRenamed('Race', 'race')\
    .withColumnRenamed('Count', 'count')
    
    df = df.groupBy('city', 'state', 'median_age', 'male_population', 'female_population', 'total_population', 'number_of_veterans', 'foreign_born', 'average_household_size', 'state_code').pivot('race').agg(F.first(F.col('count')))
    
    df = df.withColumn('demo_id', F.monotonically_increasing_id())
    
    return df

In [26]:
dim_demographic_table = create_dim_demographic_table(spark, demo_input_file)
dim_demographic_table.show(10, False)

+------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+-----------+
|city        |state       |median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|American Indian and Alaska Native|Asian|Black or African-American|Hispanic or Latino|White |demo_id    |
+------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+-----------+
|Modesto     |California  |35.2      |104852         |106405           |211257          |9855              |39613       |2.97                  |CA        |4388                             |19417|9869                     

In [27]:
# Add the port_codes to the dim_demographic_table.

dim_demographic_table.createOrReplaceTempView('demo')
port_codes_df.createOrReplaceTempView('ports')

dim_demographic_table = spark.sql("""
                    SELECT demo.*, ports.port_code
                    FROM demo
                    JOIN ports
                    ON lower(ports.city_name) = lower(demo.city) AND ports.state_code = demo.state_code
                """)

In [28]:
dim_demographic_table.show(10, False)

+-----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+------+-------------------------+------------------+-------+-------------+---------+
|city       |state     |median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|American Indian and Alaska Native|Asian |Black or African-American|Hispanic or Latino|White  |demo_id      |port_code|
+-----------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+------+-------------------------+------------------+-------+-------------+---------+
|Anchorage  |Alaska    |32.2      |152945         |145750           |298695          |27492             |33258       |2.77                  |AK        |36339                            |3

In [29]:
dim_demographic_table = dim_demographic_table.withColumnRenamed('American Indian and Alaska Native', 'american_indian_and_alaska_native')\
        .withColumnRenamed('Asian', 'asian')\
        .withColumnRenamed('Black or African-American', 'black_or_african_american')\
        .withColumnRenamed('Hispanic or Latino', 'hispanic_or_latino')\
        .withColumnRenamed('White', 'white')

In [30]:
dim_demographic_table.show(3, False)

+---------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+-------+-------------+---------+
|city     |state  |median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|american_indian_and_alaska_native|asian|black_or_african_american|hispanic_or_latino|white  |demo_id      |port_code|
+---------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+-------+-------------+---------+
|Anchorage|Alaska |32.2      |152945         |145750           |298695          |27492             |33258       |2.77                  |AK        |36339                            |36825|23107             

In [31]:
dim_demographic_table.createOrReplaceTempView('demo')
df.createOrReplaceTempView('df')

df = spark.sql("""
            SELECT df.*, demo_id
            FROM demo
            JOIN df ON demo.state_code = df.state_code AND demo.port_code = df.city_code
        """)

In [32]:
df.show(3, False)

+--------+----------------+--------------+---------+----------+------------+--------------+------------+------------+
|id      |citizenship_code|residence_code|city_code|state_code|arrival_date|departure_date|visa_id     |demo_id     |
+--------+----------------+--------------+---------+----------+------------+--------------+------------+------------+
|287122.0|245.0           |245.0         |BOS      |MA        |2016-04-02  |2016-05-19    |103079215104|884763262977|
|494820.0|213.0           |213.0         |BOS      |MA        |2016-04-03  |2016-05-14    |103079215104|884763262977|
|581311.0|582.0           |582.0         |BOS      |MA        |2016-04-03  |2016-05-12    |103079215104|884763262977|
+--------+----------------+--------------+---------+----------+------------+--------------+------------+------------+
only showing top 3 rows



In [33]:
temp_input_file = '../../data2/GlobalLandTemperaturesByCity.csv'
def create_dim_temperature_table(spark, temp_input_file):
    """
        It takes spark session and the temperature file location as arguments. It creates a temperature table.
        :param: spark: spark session.
        :param: temp_input_file: location of temperature data.
        
    """
    df = spark.read.format('csv').option('header', True).option('delimiter', ',').load(temp_input_file, inferSchema=True)
    
    # Get only US data
    df = df[df['Country'] == 'United States']
    
    # Check if there's any NA value. If so, drop them.
    df.createOrReplaceTempView('temp')
    na_values = spark.sql("""
                    SELECT AverageTemperature
                    FROM temp
                    WHERE AverageTemperature IS NULL
                """).count()
    print(f'It was found {na_values} NA values.')
    df = df.dropna(subset=['AverageTemperature'])
    
    # Check again
    df.createOrReplaceTempView('temp')
    na_values = spark.sql("""
                        SELECT AverageTemperature
                        FROM temp
                        WHERE AverageTemperature IS NULL
                    """).count()
    print(f'It was found {na_values} NA values.')
    
    # Let's get only date and month
    df = df.withColumn('date', F.to_date('dt')).withColumn('month', F.month('dt'))
    
    # Calculate monthly average temperature for each city.
    df.createOrReplaceTempView('temp2')
    df = spark.sql("""
            SELECT month, City, ROUND(AVG(AverageTemperature), 1)
            FROM temp2
            GROUP BY 1, 2
            ORDER BY 1, 2
        """)
    df = df.withColumnRenamed('City', 'city')\
        .withColumnRenamed('ROUND(AVG(AverageTemperature), 1)', 'average_temperature')#\
        #.withColumn('id', F.monotonically_increasing_id())
    df = df.select('city', 'month', 'average_temperature')
    
    return df

In [34]:
dim_temperature_table = create_dim_temperature_table(spark, temp_input_file)
dim_temperature_table.show(3, False)


It was found 25765 NA values.
It was found 0 NA values.
+-----------+-----+-------------------+
|city       |month|average_temperature|
+-----------+-----+-------------------+
|Abilene    |1    |5.3                |
|Akron      |1    |-3.2               |
|Albuquerque|1    |-0.4               |
+-----------+-----+-------------------+
only showing top 3 rows



In [35]:
dim_temperature_table = dim_temperature_table.join(port_codes_df, on=F.lower(dim_temperature_table.city)==F.lower(port_codes_df.city_name))

In [36]:
dim_temperature_table = dim_temperature_table.drop('location', 'city_name')
dim_temperature_table.show(3, False)

+-------+-----+-------------------+---------+----------+
|city   |month|average_temperature|port_code|state_code|
+-------+-----+-------------------+---------+----------+
|Oakland|1    |8.3                |OAK      |CA        |
|Oakland|2    |10.3               |OAK      |CA        |
|Oakland|3    |11.9               |OAK      |CA        |
+-------+-----+-------------------+---------+----------+
only showing top 3 rows



In [37]:
dim_temperature_table.join(df, (dim_temperature_table.port_code==df.city_code) & (dim_temperature_table.month==F.month(df.arrival_date)) & (dim_temperature_table.state_code==df.state_code)).show(3, False)

+---------+-----+-------------------+---------+----------+---------+----------------+--------------+---------+----------+------------+--------------+------------+-------------+
|city     |month|average_temperature|port_code|state_code|id       |citizenship_code|residence_code|city_code|state_code|arrival_date|departure_date|visa_id     |demo_id      |
+---------+-----+-------------------+---------+----------+---------+----------------+--------------+---------+----------+------------+--------------+------------+-------------+
|Rochester|4    |6.4                |ROC      |NY        |1629772.0|245.0           |245.0         |ROC      |NY        |2016-04-09  |2016-06-30    |369367187456|1073741824000|
|Rochester|4    |6.4                |ROC      |NY        |3390560.0|509.0           |509.0         |ROC      |NY        |2016-04-18  |2016-04-19    |369367187456|1073741824000|
|Rochester|4    |6.4                |ROC      |NY        |5359222.0|582.0           |582.0         |ROC      |NY   

In [38]:
# Get id field from df.
dim_temperature_table = dim_temperature_table.join(df, (dim_temperature_table.port_code==df.city_code) & (dim_temperature_table.month==F.month(df.arrival_date)) & (dim_temperature_table.state_code==df.state_code)).select('id', 'city', 'port_code', dim_temperature_table.state_code, 'month', 'average_temperature')

In [55]:
dim_temperature_table = dim_temperature_table.withColumnRenamed('id', 'temp_id')

In [56]:
dim_temperature_table.show(3, False)

+---------+---------+---------+----------+-----+-------------------+
|temp_id  |city     |port_code|state_code|month|average_temperature|
+---------+---------+---------+----------+-----+-------------------+
|1629772.0|Rochester|ROC      |NY        |4    |6.4                |
|3390560.0|Rochester|ROC      |NY        |4    |6.4                |
|5359222.0|Rochester|ROC      |NY        |4    |6.4                |
+---------+---------+---------+----------+-----+-------------------+
only showing top 3 rows



In [40]:
labels_description = 'I94_SAS_Labels_Descriptions.SAS'

def dim_country_table(spark, labels_description):
    """
        It takes spark session and location of SAS labels description. It creates a country table.
        
        :param: spark: spark session.
        :param: labels_description: location for SAS labels description.
        
    """
    with open(labels_description, 'r') as file:
        data = file.readlines()


    re_compiled = re.compile(r"\s*(\d{,3})\s*(=)\s*\'(.*)\'")
    country_codes = {}
    for line in data[10:298]:
        founds = re_compiled.search(line)
    # print(results.group(3))
        country_codes[founds.group(1)] = founds.group(3)
    countries = list(map(list, country_codes.items()))
    country_codes_df = spark.createDataFrame(countries, ['country_code', 'country_name'])
    
    return country_codes_df
    

In [41]:
dim_country_table = dim_country_table(spark, labels_description)

In [42]:
dim_country_table.show(3, False)

+------------+------------+
|country_code|country_name|
+------------+------------+
|236         |AFGHANISTAN |
|101         |ALBANIA     |
|316         |ALGERIA     |
+------------+------------+
only showing top 3 rows



In [43]:
df.show(3, False)

+--------+----------------+--------------+---------+----------+------------+--------------+------------+------------+
|id      |citizenship_code|residence_code|city_code|state_code|arrival_date|departure_date|visa_id     |demo_id     |
+--------+----------------+--------------+---------+----------+------------+--------------+------------+------------+
|287122.0|245.0           |245.0         |BOS      |MA        |2016-04-02  |2016-05-19    |103079215104|884763262977|
|494820.0|213.0           |213.0         |BOS      |MA        |2016-04-03  |2016-05-14    |103079215104|884763262977|
|581311.0|582.0           |582.0         |BOS      |MA        |2016-04-03  |2016-05-12    |103079215104|884763262977|
+--------+----------------+--------------+---------+----------+------------+--------------+------------+------------+
only showing top 3 rows



In [44]:
fact_immigration = df.drop('city_code', 'state_code')
fact_immigration.show(3, False)

+--------+----------------+--------------+------------+--------------+------------+------------+
|id      |citizenship_code|residence_code|arrival_date|departure_date|visa_id     |demo_id     |
+--------+----------------+--------------+------------+--------------+------------+------------+
|287122.0|245.0           |245.0         |2016-04-02  |2016-05-19    |103079215104|884763262977|
|494820.0|213.0           |213.0         |2016-04-03  |2016-05-14    |103079215104|884763262977|
|581311.0|582.0           |582.0         |2016-04-03  |2016-05-12    |103079215104|884763262977|
+--------+----------------+--------------+------------+--------------+------------+------------+
only showing top 3 rows



In [57]:
# Testing the data model.
dim_country_table.createOrReplaceTempView('country')
dim_temperature_table.createOrReplaceTempView('temp')
dim_demographic_table.createOrReplaceTempView('demo')
dim_visa.createOrReplaceTempView('visa')
dim_immigrant.createOrReplaceTempView('immigrant')
dim_time_table.createOrReplaceTempView('time')
fact_immigration.createOrReplaceTempView('df')

#     JOIN visa ON df.visa_id = visa.visa_id
#     JOIN temp ON df.id = temp.id
#     JOIN immigrant ON df.id = immigrant.id
#     JOIN demo ON df.demo_id = demo.demo_id
    
#     visa.visa_category,
#     temp.average_temperature,
#     immigrant.gender,
#     demo.total_population

spark.sql("""
    SELECT
        time.month as arrival_month,
        time2.day as departure_day,
        country.country_name as citizenship_country,
        country2.country_name as residence_country,
        visa.visa_category,
        immigrant.gender,
        demo.total_population,
        temp.average_temperature
        
    FROM df
    JOIN time ON df.arrival_date = time.date
    JOIN time time2 ON df.departure_date = time2.date
    JOIN country ON df.citizenship_code = country.country_code
    JOIN country country2 ON df.residence_code = country2.country_code
    JOIN visa ON df.visa_id = visa.visa_id
    JOIN immigrant ON df.id = immigrant.id
    JOIN demo ON df.demo_id = demo.demo_id
    JOIN temp ON df.id = temp.temp_id
    
    

""").show(3, False)

+-------------+-------------+-------------------+-----------------+-------------+------+----------------+-------------------+
|arrival_month|departure_day|citizenship_country|residence_country|visa_category|gender|total_population|average_temperature|
+-------------+-------------+-------------------+-----------------+-------------+------+----------------+-------------------+
|4            |3            |AUSTRIA            |AUSTRIA          |Business     |M     |864816          |13.6               |
|4            |9            |DENMARK            |DENMARK          |Business     |M     |669469          |6.4                |
|4            |6            |FINLAND            |FINLAND          |Pleasure     |F     |8550405         |8.6                |
+-------------+-------------+-------------------+-----------------+-------------+------+----------------+-------------------+
only showing top 3 rows



In [48]:
# Save in parquet format. To do so, comment it out and run it.
# dim_country_table.write.mode('overwrite').parquet('dim_country')
# dim_temperature_table.write.mode('overwrite').partitionBy('city', 'port_code').parquet('dim_temperature')
# dim_demographic_table.write.mode('overwrite').partitionBy('city', 'port_code').parquet('dim_demographic')
# dim_visa.write.mode('overwrite').parquet('dim_visa')
# dim_immigrant.write.mode('overwrite').parquet('dim_immigrant')
# dim_time_table.write.mode('overwrite').partitionBy('date').parquet('dim_date')
# fact_immigration.write.mode('overwrite').partitionBy('arrival_date').parquet('fact_immigration')


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [45]:
# Perform quality checks here
def quality_checks(spark_df, table_name):
    """
        It checks table's completness and if there is any null value.
        
        :param: spark_df: spark dataframe
        :param: table_name: table name
    """
    
    print('Completeness quality check has started.')
    all_count = spark_df.count()
    
    if all_count == 0:
        print(f'Quality check failed for {table_name}. It does not have data.')
    else:
        print(f'Quality check passed for {table_name}. It has {all_count} rows.')
    
    print('NA values quality check has started.')
    na_values = spark_df.toPandas().isna().sum()
    
    if na_values == 0:
        print(f'NA quality check passed. There is not NA value in the {table_name}.')
    else:
        print(f'NA quality check failed. It was found {na_values} in {table_name}.')

#### 4.3 Data dictionary 
Description of the data:

#### Dimension tables:
**country_table**
- country_code: integer that represents a country.
- country_name: name of a country.

**dim_visa**
- visa_id: An integer which represents a type of visa.
- visa_type: A properly type of visa issued by U.S.
- visa_category: A category the visa type belongs to.

**dim_date**
- date: date based on immigrant's arrival.
- year: year extracted from arrival's date.
- month: month extracted from arrival's date.
- week: week extracted from arrival's date.
- day: day extracted from arrival's date.

**dim_immigrant**
- id: unique value for an immigrant.
- age: immigrant's age.
- biryear: immigrant's year of birth.
- gender: immgrant's gender.

**dim_demographic**
- demo_id: unique value for demo info.
- port_code: port code where an immigrant arrived.
- state_code: state code where the port code is located.
- city: name of the city for a port code.
- median_age: median age for the corresponding city.
- male_population: number of male in a city.
- female_population: number of female in a city.
- total_population: total population for a city.
- number_of_veterans: total people considered veteran.
- foreign_born: number of people who was born abroad.
- avg_household_size: average size of a family living in a house.
- american_indian_and_alaska_native: number of indian and alaska native people for a city.
- asian: number of asian people for a city.
- black_or_african_american: number of black or african american people for a city.
- hispanic_or_latino: number of hispanic or latino people for a city.
- white: number of white people for a city.

**dim_temperature**
- id: unique value for temperature info in the table.
- city: city name where the average temperature was calculated.
- month: reference month when the average temperature was calculated.
- port_code: port code where an immigrant arrived.
- state_code: state code where the port code is located.
- average_temperature: monthly average temperature calculated.

##### Fact table
**fact_immigration**
- id: unique value for an immigration fact.
- arrival_date: the date an immigrant arrived in U.S.
- departure_date: the date an immigrant left in U.S.
- citizenship_code: code for the country an immigrant's citizenship.
- residence_code: code for the country an immigrant's residence.
- visa_id: unique value for a visa issued by U.S.
- demo_id: unique value for a demographic information.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
For this project the chosen technology was spark mainly because of the following:
    - Spark is easy-to-use and the learning curve fot those ones who already know pandas is not steep.
    - With spark it is possible to handle many different file formats.
    - Spark was built with the goal to work with big data, hence it is totally compatible with cloud technologies such as AWS.
    * Propose how often the data should be updated and why.
    - Since immigration data is updated monthly, so do our data.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - Spark was built with the goal to handle big data. In this case, it would be interesting to work with cloud technology such as AWS. In case already working with cloud computing, we may consider increase the number of nodes. 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - For this purpose, we could use Apache Airflow in order to schedule the pipeline.
 * The database needed to be accessed by 100+ people.
     - In this case, the recommended option would be use a cloud datawarehouse solution such as Amazon Redshift, but taking into consideration the costs involved.