# Immigration in the US

## Udacity Data Engineering Capstone Project

### Project Summary
The US National Tourism and Trade Office provides a database of reports on the Visitor Arrivals Program (I-94 Record). Each report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). Data sources include: Overseas DHS/CBP I-94 Program data; Canadian visitation data (Stats Canada) and Mexican visitation data (Banco de Mexico) (source: https://travel.trade.gov/research/reports/i94/historical/2016.html).

The goal of this project is to build an ETL pipeline using the above plus supplementary sources (temperature, city demographics, airport codes) in order to build a database that can be used to derive insights on immigration patterns to the US. These insights include, but are not limited to:

* location where most visitors are coming from
* location where most visitors are headed to
* if temperature, city demographics, etc. are a factor in destination

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### Import Libraries

In [1]:
import pandas as pd
import datetime as dt
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, split, expr
from pyspark.sql.functions import year, month, dayofmonth, dayofweek 
from pyspark.sql.types import StructType

#### Create Spark Session

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will create an analytics database by loading in each of the data sets above, cleaning and exploring the data, creating our fact and dimension tables, and running queries to discover insights as discussed above. This project uses Apache Spark to load the data. We will also be running some data quality checks to ensure our pipeline runs as expected.

#### Describe and Gather Data

Below is a list of the data sets we are using:

1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office and can be found [here](https://travel.trade.gov/research/reports/i94/historical/2016.html).

2. World Temperature Data: This dataset comes from Kaggle and can be found [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

3. US City Demographic Data: This dataset comes from OpenSoft and can be found [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

4. Airport Code Table: This data comes from DataHub and can be found [here](https://datahub.io/core/airport-codes#data).

We will now load in each data set in order to take an initial look. We will also provide a data dictionary (column name + description) of each data set to get a better understanding of each data set in its raw form before we begin and cleaning or transformations.

### Immigration

This is a massive data set. We will load in the sample CSV file to get a feel for the data.

In [3]:
#read in immigration data
fname = 'immigration_data_sample.csv'
immigrationSample = spark.read.csv(fname,header=True,inferSchema=True)

immigrationSample = immigrationSample.drop('_c0')
immigrationSample.show(n=5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|5.6582674633E10|00782|      WT|
|4422636.0|2016.0|   4.0| 582.0| 582.0|    MCA|20567.0|    1.0|     TX|20568.0|  26.0|    2.

#### Raw Immigration Data Dictionary

| Column Name  | Description |
| ------------ | ----------------------------------------- |
| cicid        | identifier                                |
| i94yr        | 4 digit year                              |
| i94mon       | Numeric month                             |
| i94cit       | code for immigrant's birth country        |
| i94res       | code for immigrant's country of residence |
| i94port      | code for admission port                   |
| arrdate      | arrival date                              |
| i94mode      | mode of transportation                    |
| i94addr      | arrival state                             |
| depdate      | departure date                            |
| i94bir       | age                                       |
| i94visa      | visa code                                 |
| count        | Used for summary statistics               |
| dtadfile     | date added to file                        |
| visapost     | post where visa was issued                |
| occup        | occupation                                |
| entdepa      | arrival flag                              |
| entdepu      | departure flag                            |
| matflag      | match flag                                |
| biryear      | birth year                                |
| dtaddto      | admission date                            |
| gender       | gender                                    |
| insnum       | INS number                                |
| airline      | airline used to enter US                  |
| admnum       | admission number                          |
| fltno        | flight number                             |
| visatype     | category of visa                          |

### Temperature

In [4]:
#read in data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature = spark.read.csv(fname,header=True,inferSchema=True)

In [5]:
temperature.count()

8599212

In [6]:
temperature.show(n=5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



#### Raw Temperature Data Dictionary

| Column Name                   | Description |
| ----------------------------- | ---------------------------------------------- |
| dt                            | date                                           |
| AverageTemperature            | global average land temperature in celsius     |
| AverageTemperatureUncertainty | the 95% confidence interval around the average |
| City                          | name of city                                   |
| Country                       | name of country                                |
| Latitude                      | latitude of city                               |
| Longitude                     | longitude of city                              |

### Demographics

In [7]:
#read in data
fname = 'us-cities-demographics.csv'
demographics = spark.read.csv(fname,header=True,sep=';',inferSchema=True)

In [8]:
demographics.count()

2891

In [9]:
demographics.show(n=5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

#### Raw Demographics Data Dictionary

| Column Name            | Description |
| ---------------------- | -------------------------------------------------- |
| City                   | name of city                                       |
| State                  | state where city is located                        |
| Median Age             | median age of city                                 |
| Male Population        | count of males in the city                         |
| Female Population      | count of females in the city                       |
| Total Population       | count of all people in the city                    |
| Number of Veterans     | count of veterans in the city                      |
| Foreign-born           | count of people not born in the city               |
| Average Household Size | average number of people per household in the city |
| State Code             | code for the state                                 |
| Race                   | respondent's race                                  |
| Count                  | count by ethnicity for city                        |

### Airport

In [10]:
#read in data
fname = 'airport-codes_csv.csv'
airport = spark.read.csv(fname,header=True,inferSchema=True)

In [11]:
airport.count()

55075

In [12]:
airport.show(n=5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

#### Raw Airport Data Dictionary

| Column Name  | Description |
| ------------ | -------------------------------------------------------------- |
| ident        | identifier                                                     |
| type         | description of type of airport (heliport, small_airport, etc.) |
| name         | name of airport                                                |
| elevation_ft | elevation level (in feet)                                      |
| continent    | continent of airport location                                  |
| iso_country  | country of airport location                                    |
| iso_region   | region (i.e. state) of airport location                        |
| municipality | city of airport location                                       |
| gps_code     | GPS airport code                                               |
| iata_code    | IATA airport code                                              |
| local_code   | local airport code                                             |
| latitude     | latitude of airport location                                    |
| longitude    | longitude of airport location                                  |

### Step 2: Explore and Assess the Data

In this section, we will identify data quality issues (missing values, duplicate data, etc.) and list any necessary steps taken to clean the data.

#### Immigration

First, let's load in the immigration data set. This will be the foundation for  our fact table. 

We will discover that the immigration data is pretty clean as is.

In [13]:
# read in data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration = spark.read.format("com.github.saurfang.sas.spark").load(fname)

In [14]:
immigration.count()

3096313

In [15]:
immigration.show(n=5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [16]:
#this shows that there's no data with all null records
immigration.na.drop(how='all',subset=['cicid']).count()

3096313

In [17]:
#this shows that there's no duplicate data
immigration.drop_duplicates().count()

3096313

#### Temperature

We've already loaded in this data set above, so let's clean it up. 

You can observe straight away that there are records dating back to the 1700s. We won't want to include this data, because commerical airfare wasn't around back then. According to [this link](https://en.wikipedia.org/wiki/Airline#:~:text=Tony%20Jannus%20conducted%20the%20United,Petersburg%2DTampa%20Airboat%20Line.), the first commercial flight in the US occurred in 1914, so we will remove any data earlier than this. 

We only care about data for the United States, because that is where our immigration data is based. Thus, we can filter only for records from the US.

In addition, we will remove NaN values from the table as it doesn't make sense to analyze the data without a reading.

In [18]:
# drop where AverageTemperature = NaN
temperature = temperature.dropna(how='any',subset='AverageTemperature')

#filter where Country = USA
temperature = temperature.filter(temperature.Country=="United States")

#filter where dt >= 1914
temperature = temperature.filter(temperature.dt>="1914-01-01")

In [19]:
temperature.count()

307628

In [20]:
temperature.show(n=5)

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1914-01-01 00:00:00| 9.802999999999999|                        0.856|Abilene|United States|  32.95N|  100.53W|
|1914-02-01 00:00:00|              5.84|                        0.442|Abilene|United States|  32.95N|  100.53W|
|1914-03-01 00:00:00|11.574000000000002|                        0.426|Abilene|United States|  32.95N|  100.53W|
|1914-04-01 00:00:00|16.366999999999994|                        0.318|Abilene|United States|  32.95N|  100.53W|
|1914-05-01 00:00:00|            20.002|                        0.556|Abilene|United States|  32.95N|  100.53W|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---

#### Demographics

We will discover that the demographics data is pretty clean as is.

In [21]:
#this shows there's no data with all null records
demographics.na.drop(how='all').count()

2891

In [22]:
#this shows that there's no duplicate data
demographics.drop_duplicates().count()

2891

#### Airport

We've already loaded in this data set above, so let's clean it up.

We are going to shard out the coordinates by latitude and longitude, in order to be consistent with temperature table.

We only care about data for the United States, because that is where our immigration data is based. Thus, we can filter only for records from the US.

We can also remove the 'US-' portion of the iso_region (state code) as that is duplicate information to the iso_country column.

In [23]:
# separate coordinates
airport = airport.withColumn("coordinates",split("coordinates",","))
airport = airport.withColumn("latitude",airport.coordinates[0]).withColumn("longitude",airport.coordinates[1])
airport = airport.drop("coordinates")

# filter where iso_country = US
airport = airport.filter(airport.iso_country=="US")

# remove US- from iso_region
airport = airport.withColumn("iso_region",expr("substring(iso_region,4,length(iso_region))"))

In [24]:
airport.count()

22757

In [25]:
airport.show(n=5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+------------------+------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|          latitude|         longitude|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+------------------+------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|        PA|    Bensalem|     00A|     null|       00A|-74.93360137939453|    40.07080078125|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|        KS|       Leoti|    00AA|     null|      00AA|       -101.473911|         38.704022|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|        AK|Anchor Point|    00AK|     null|      00AK|    -151.695999146|

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model

For this project, we will create a star schema, using one fact table and five dimension tables. Since the goal of this project is to derive insights on immigration patterns to the US, we will use the immigration data set as the basis for our fact table. This table will be a subset of the "raw" immigration table from above.

We will have one dimension table for each remaining data set above (temperature, demographics, airport), plus tables for time and coordinates. The time table will allow us to aggregate the data using various time units. The cooridnates table will be derived from the airport table and consist of the city, latitude, and longitude.

Below is a visual of the conceptual data model:

![Data Dictionary](DataDictionary.png)

**Note:** You can also see this image in the file <code>DataDictionary.png</code>, located in this repository.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

We will define one Python function for the pipeline for each table. We will read in the data sets again, both for clarity and to include the cleaning steps from above (if any) in the full pipeline map. The following is how we will create each table:

**immigration:**

* load in immigration SAS file to Spark dataframe
* select relevant columns
* write to parquet

**temperature:**

* load in temperature CSV file to Spark dataframe
* clean the data as per part 2
* select relevant columns and normalize column names
* write to parquet

**demographics:**

* load in demographics CSV file to Spark dataframe
* select relevant columns and normalize column names
* write to parquet

**airport:**

* load in airport CSV file to Spark dataframe
* clean the data as per part 2
* select relevant columns and normalize column names
* write to parquet

**coordinates:**

* parse and add latitude/longitude columns as per part 2
* select releavant columns and normalize column names
* write to parquet

**time:**

* gather dates from immigration data
* parse out year, month, day, weekday
* write to parquet

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [26]:
#function for immigration fact table

def create_immigration_table(df, output_data):
    '''
    Creates the immigration fact table.
            Parameters:
                df: Spark dataframe
                output_data: location where results are stored
            Returns:
                None.
    '''
    
    print("Begin pipeline for immigration table.")
    print("Select and normalize relevant columns.")
    
    immigration_table = df.select("cicid","i94yr","i94mon","i94cit","i94res","i94port","arrdate","i94mode","i94addr","depdate","i94bir","i94visa","visapost","entdepa","entdepu","matflag","gender","visatype")
    
    # write to parquet file
    immigration_output_location = output_data + "immigration.parquet"
    immigration_table.write.parquet(immigration_output_location, mode="overwrite")
    
    print("Wrote immigration table.")

In [27]:
#function for temperature dimension table

def create_temperature_table(df, output_data):
    '''
    Creates the temperature dimension table.
            Parameters:
                df: Spark dataframe
                output_data: location where results are stored
            Returns:
                None.
    '''
    
    print("Begin pipeline for temperature table.")
    print("Cleaning temperature data.")
    
    # drop where AverageTemperature = NaN
    temperature_table = df.dropna(how='any',subset='AverageTemperature')
    #filter where Country = USA
    temperature_table = temperature_table.filter(temperature_table.Country=="United States")
    #filter where dt >= 1914
    temperature_table = temperature_table.filter(temperature_table.dt>="1914-01-01")
    
    print("Select and normalize relevant columns.")
    
    temperature_table = temperature_table.select("dt","City","AverageTemperature","AverageTemperatureUncertainty")
    temperature_table = temperature_table.withColumnRenamed("City","city").withColumnRenamed("AverageTemperature","avgTemp").withColumnRenamed("AverageTemperatureUncertainty","avgTempUncert")
    
    # write to parquet file
    temperature_output_location = output_data + "temperature.parquet"
    temperature_table.write.parquet(temperature_output_location, mode="overwrite")
    
    print("Wrote temperature table.")

In [28]:
#function for demographics dimension table

def create_demographics_table(df, output_data):
    '''
    Creates the demographics dimension table.
            Parameters:
                df: Spark dataframe
                output_data: location where results are stored
            Returns:
                None.
    '''
    
    print("Begin pipeline for demographics table.")
    print("Select and normalize relevant columns.")
    
    demographics_table = df.select("City","State Code","Median Age","Male Population","Female Population","Total Population","Number of Veterans","Foreign-born","Average Household Size","Race","Count")
    demographics_table = demographics_table.withColumnRenamed("City","city").withColumnRenamed("State Code","state").withColumnRenamed("Median Age","medAge").withColumnRenamed("Male Population","malePopulation").withColumnRenamed("Female Population","femalePopulation").withColumnRenamed("Total Population","totPopulation").withColumnRenamed("Number of Veterans","numVeterans").withColumnRenamed("Foreign-born","foreignBorn").withColumnRenamed("Average Household Size","avgHouseholdSize").withColumnRenamed("Race","race").withColumnRenamed("Count","count")

    # write to parquet file
    demographics_output_location = output_data + "demographics.parquet"
    demographics_table.write.parquet(demographics_output_location, mode="overwrite")
    
    print("Wrote demographics table.")

In [29]:
#function for airport dimension table

def create_airport_table(df, output_data):
    '''
    Creates the airport dimension table.
            Parameters:
                df: Spark dataframe
                output_data: location where results are stored
            Returns:
                None.
    '''
    
    print("Begin pipeline for airport table.")
    print("Cleaning aiport data.")
    
    # filter where iso_country = US
    airport_table = df.filter(df.iso_country=="US")
    # remove US- from iso_region
    airport_table = airport_table.withColumn("iso_region",expr("substring(iso_region,4,length(iso_region))"))
    
    print("Select and normalize relevant columns.")
    
    airport_table = airport_table.select("ident","type","name","elevation_ft","iso_region","municipality","gps_code","iata_code","local_code")
    airport_table = airport_table.withColumnRenamed("elevation_ft","elevation").withColumnRenamed("iso_region","state").withColumnRenamed("municipality","city")
    
    # write to parquet file
    airport_output_location = output_data + "airport.parquet"
    airport_table.write.parquet(airport_output_location, mode="overwrite")
    
    print("Wrote airport table.")

In [30]:
#function for coordinates dimension table

def create_coordinates_table(df, output_data):
    '''
    Creates the coordinates dimension table.
            Parameters:
                df: Spark dataframe
                output_data: location where results are stored
            Returns:
                None.
    '''
    
    print("Begin pipeline for coordinates table.")
    print("Cleaning coordinates data to create coordinates table.")
    
    # separate coordinates
    coordinates_table = df.withColumn("coordinates",split("coordinates",","))
    coordinates_table = coordinates_table.withColumn("latitude",coordinates_table.coordinates[0]).withColumn("longitude",coordinates_table.coordinates[1])
    coordinates_table = coordinates_table.drop("coordinates")
    # filter where iso_country = US
    coordinates_table = coordinates_table.filter(coordinates_table.iso_country=="US")
    # remove US- from iso_region
    coordinates_table = coordinates_table.withColumn("iso_region",expr("substring(iso_region,4,length(iso_region))"))
    
    print("Select and normalize relevant columns.")
    
    coordinates_table = coordinates_table.select("municipality","latitude","longitude")
    coordinates_table = coordinates_table.withColumnRenamed("municipality","city")
    coordinates_table = coordinates_table.withColumn('latitude',coordinates_table['latitude'].cast("double").alias('latitude'))
    coordinates_table = coordinates_table.withColumn('longitude',coordinates_table['longitude'].cast("double").alias('longitude'))
    
    # write to parquet file
    coordinates_output_location = output_data + "coordinates.parquet"
    coordinates_table.write.parquet(coordinates_output_location, mode="overwrite")
    
    print("Wrote coordinates table.")

In [31]:
#function for time dimension table

def create_time_table(df, output_data):
    '''
    Creates the time dimension table.
            Parameters:
                df: Spark dataframe
                output_data: location where results are stored
            Returns:
                None.
    '''
    
    print("Begin pipeline for time table.")
    print("Select and normalize relevant columns.")
    
    time_table = df.select("arrdate")
    int_to_date_udf = udf(lambda x: (dt.datetime(1900,1,1).date() + dt.timedelta(x)).isoformat())
    time_table = time_table.withColumn("arrdate", int_to_date_udf("arrdate"))
    time_table = time_table.withColumn("year",year("arrdate")).withColumn("month",month("arrdate")).withColumn("day",dayofmonth("arrdate")).withColumn("weekday",dayofweek("arrdate")).select("arrdate","year","month","day","weekday").drop_duplicates()  
    time_table = time_table.withColumnRenamed("arrdate","date")
    
    # write to parquet file
    time_output_location = output_data + "time.parquet"
    time_table.write.parquet(time_output_location, mode="overwrite")
    
    print("Wrote time table.")

In [32]:
#read in all data

immigration_fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration = spark.read.format("com.github.saurfang.sas.spark").load(immigration_fname)

temperature_fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature = spark.read.csv(temperature_fname,header=True,inferSchema=True)

demographics_fname = 'us-cities-demographics.csv'
demographics = spark.read.csv(demographics_fname,header=True,sep=';',inferSchema=True)

airport_fname = 'airport-codes_csv.csv'
airport = spark.read.csv(airport_fname,header=True,inferSchema=True)

In [33]:
#run each function using Spark dataframes defined above

output_data = "tables/"

create_immigration_table(immigration, output_data)
create_temperature_table(temperature, output_data)
create_demographics_table(demographics, output_data)
create_airport_table(airport, output_data)
create_coordinates_table(airport, output_data)
create_time_table(immigration, output_data)

Begin pipeline for immigration table.
Select and normalize relevant columns.
Wrote immigration table.
Begin pipeline for temperature table.
Cleaning temperature data.
Select and normalize relevant columns.
Wrote temperature table.
Begin pipeline for demographics table.
Select and normalize relevant columns.
Wrote demographics table.
Begin pipeline for airport table.
Cleaning aiport data.
Select and normalize relevant columns.
Wrote airport table.
Begin pipeline for coordinates table.
Cleaning coordinates data to create coordinates table.
Select and normalize relevant columns.
Wrote coordinates table.
Begin pipeline for time table.
Select and normalize relevant columns.
Wrote time table.


#### 4.2 Data Quality Checks
 
Let's run some data quality checks to ensure the pipeline ran as expected.

In [34]:
#function for data quality

def data_quality_checks(table_names):
    '''
    Runs data quality checks on the pipeline.
        Parameters:
            table_name: list of table names
        Returns:
            None.
    '''
    
    print("Begin data quality checks.")
    
    list_of_tables = os.listdir("tables")
    
    for table in table_names:
        file_name = table+".parquet"
        if (file_name in list_of_tables):
            print("Data quality existence check PASSED for "+table)
        else:
            print("Data quality existence check FAILED for "+table)
        
    for table in table_names:
        data = spark.read.parquet("./tables/"+table+".parquet")
        data.createOrReplaceTempView("data")
        tableCount = spark.sql("SELECT COUNT(*) FROM data")
        if tableCount == 0:
            print("Data quality count check FAILED for "+table)
        else:
            print("Data quality count check PASSED for "+table)
        
    print("Data quality checks complete.")

In [35]:
table_names = ["immigration", "temperature", "demographics", "airport", "coordinates", "time"]
data_quality_checks(table_names)

Begin data quality checks.
Data quality existence check PASSED for immigration
Data quality existence check PASSED for temperature
Data quality existence check PASSED for demographics
Data quality existence check PASSED for airport
Data quality existence check PASSED for coordinates
Data quality existence check PASSED for time
Data quality count check PASSED for immigration
Data quality count check PASSED for temperature
Data quality count check PASSED for demographics
Data quality count check PASSED for airport
Data quality count check PASSED for coordinates
Data quality count check PASSED for time
Data quality checks complete.


#### 4.3 Data dictionary

Below is the data dictionary:

![Data Dictionary](DataDictionary.png)

**Note:** You can also see the data dictionary in the file <code>DataDictionary.png</code>, located in this repository.

### Step 5: Complete Project Write Up

#### Rationale for Choice of Tools/Technologies:

Apache Spark was the main technology used for this project. Spark is ideal for rapid querying, analysis, and transformation of large data sets, as well as simplifying complex data pipelines. Given that the immigration data set was over 3 million rows alone for only one month of information, in combination with the temperature, airport, and demographics data sets, Spark was the best choice.

#### Frequency of Updates:

The data should be updated every month at most. Given that we only receive a temperature 

#### Scenarios To Address:

Problems constantly arise when dealing with databases. The underlying infrastructure must be equipped to handle such situations without major disruption to the end user. Below are example scenarios with an approach that ensure no major issues develop:

**1. Data volume increased by 100x:** Because we are using Spark, we can increase the number of nodes in order to handle the load. <br/><br/>
**2. Pipelines ran daily at 7am:** The user of Apache Airflow would be beneficial here. Apache Airflow is an open-source tool which structures data pipelines as DAGs that can run on a schedule. For this particular pipeline, a user could configure Airflow to run daily at 7am, in which it would stream the data from the source and update the tables accordingly. <br/><br/>
**3. Database needs access by 100+ people:** We can host the database in Amazon Redshift, which supports up to 500 concurrent connections (source: https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html).