# Project Title
### Data Engineering Capstone Project

#### Project Summary
An ETL pipeline for I94 immigration, global land temperatures and US demographics datasets is created to form an analytics database on immigration events. A use case for model is to find patterns of immigration to the United States.

<br>
The project follows the follow steps:

* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from datetime import datetime
import pyspark.sql.functions as F


In [2]:
pd.set_option('max_colwidth', 500)
pd.set_option('max_columns', 500)

### Step 1: Scope the Project and Gather Data

### Scope 
- I94 immigration data and US demographic datasets were implemented to setup a data pipeline with fact and dimension tables wherein they are used for analytics.
### Describe and Gather Data 



#### The following datasets are available to create the analytics database:
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
* World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).
* U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
* Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data/).


### The Datasets used:
| Data Set | Format |
|---|---|
| I94 Immigration Data | SAS |
| U.S. City Demographic Data | CSV |

### The Tools used:
- Pandas: exploratory data analysis on small data set
- PySpark: data processing on large data set
- Python: data processing
- AWS S3: data storage

## Explore the Data
- Pandas and PySpark are used to explore with data analysis to get insights on these data sets.
- Datasets are split and paired into dimensional tables and renamed for better understood columns.
- Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic.

## Cleaning Steps
- Transform `arrdate`, `depdate` from SAS time format into PySpark DateType.
- Parse `Labels_Description` file to get auxiliary dimension tables `country_code`, `city _code`, `state _code`, `visa_code`, `transportation`.
- Tranform `city`, `state` to upper case to match `city_code` and `state_code` table

## Reading Immigration Data Sample 

In [5]:
# Read in the data here
imm_sample = spark.read.format("csv").option("header", True).option("inferSchema", True).load("source_data/immigration_data_sample.csv")

In [5]:
imm_sample.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


## Reading Full Immigration Data 

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()


In [96]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [9]:
%%time
df_spark.count()

CPU times: user 6.01 ms, sys: 0 ns, total: 6.01 ms
Wall time: 37.3 s


3096313

In [10]:
df_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [52]:
df_spark.count()

3096313

In [53]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

## Reading US City Demographics 

In [6]:
city_dem = spark.read.format("csv").option("header", True).option("delimiter", ";").option("inferSchema", True).load("source_data/us-cities-demographics.csv")

In [12]:
city_dem.count()

2891

In [51]:
city_dem.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [43]:
city_dem.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229,62432,118661,6634,7517,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712,41971,80683,4815,8355,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762,43270,85032,5783,3269,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751,58077,109828,5204,16315,2.65,NC,Asian,11060


## Reading Global Land Temperatures By City

In [28]:
%%time
temp_city = spark.read.format("csv").option("header", True).option("inferSchema", True).load("../../data2/GlobalLandTemperaturesByCity.csv")

CPU times: user 21.9 ms, sys: 2.3 ms, total: 24.2 ms
Wall time: 2min 20s


In [14]:
temp_city.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [29]:
%%time
temp_city.count()

CPU times: user 1.93 ms, sys: 335 µs, total: 2.27 ms
Wall time: 7.05 s


8599212

In [25]:
temp_city.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



## Reading Airport Codes

In [15]:
airport_codes = spark.read.format("csv").option("header", True).option("inferSchema", True).load("source_data/airport-codes_csv.csv")

In [16]:
airport_codes.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [31]:
airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [63]:
%%time
#write to parquet
# df_spark.write.parquet("sas_data")
df_imm=spark.read.parquet("sas_data")

CPU times: user 1.77 ms, sys: 311 µs, total: 2.08 ms
Wall time: 371 ms


In [35]:
df_spark.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,57.0,2.0,1.0,20160430,ACK,,G,O,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,66.0,2.0,1.0,20160430,ACK,,G,O,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,41.0,2.0,1.0,20160430,ACK,,G,O,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,27.0,2.0,1.0,20160430,ACK,,G,O,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,26.0,2.0,1.0,20160430,ACK,,G,O,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


### Data Preparation: I94_SAS_Labels_Descriptions.SAS

In [7]:
def sas_labels_descr_parser(source_file, value, cols):
    file_string = ''
    
    with open(source_file) as f:
        file_string = f.read()
    
    file_string = file_string[file_string.index(value):]
    file_string = file_string[:file_string.index(';')]
    
    line_list = file_string.split('\n')[1:]
    codes = []
    values = []
    
    for line in line_list:
        
        if '=' in line:
            code, val = line.split('=')
            code = code.strip()
            val = val.strip()

            if code[0] == "'":
                code = code[1:-1]

            if val[0] == "'":
                val = val[1:-1]

        codes.append(code)
        values.append(val)
        

                                 
    return pd.DataFrame(list(zip(codes,values)), columns=cols)

#     return spark.createDataFrame(list(zip(codes,values))).toDF(*cols)


In [8]:
from pyspark.sql.types import IntegerType
df_country_code = sas_labels_descr_parser('I94_SAS_Labels_Descriptions.SAS', 'i94cntyl', ['code', 'country'])
df_country_code = df_country_code.drop_duplicates()
df_country_code = spark.createDataFrame(df_country_code)
df_country_code = df_country_code.withColumn("code", df_country_code["code"].cast(IntegerType()))
df_country_code.limit(10).toPandas()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no land arrivals)"
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


In [9]:
df_city_code = sas_labels_descr_parser('I94_SAS_Labels_Descriptions.SAS', 'i94prtl', ['code', 'city'])
df_city_code = df_city_code.drop_duplicates()
df_city_code.head()

Unnamed: 0,code,city
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [10]:
df_state_code = sas_labels_descr_parser('I94_SAS_Labels_Descriptions.SAS', 'i94addrl', ['code', 'port'])
df_state_code = df_state_code.drop_duplicates()
df_state_code.head()

Unnamed: 0,code,port
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [11]:
df_visa_code = sas_labels_descr_parser('I94_SAS_Labels_Descriptions.SAS', 'I94VISA', ['code', 'type'])
df_visa_code = df_visa_code.drop_duplicates()
df_visa_code = spark.createDataFrame(df_visa_code)
df_visa_code = df_visa_code.withColumn("code", df_visa_code["code"].cast(IntegerType()))
df_visa_code.limit(10).toPandas()

Unnamed: 0,code,type
0,1,Business
1,2,Pleasure
2,3,Student


In [12]:
df_transportation = sas_labels_descr_parser('I94_SAS_Labels_Descriptions.SAS', 'i94model', ['code', 'mode'])
df_transportation = df_transportation.drop_duplicates()
df_transportation = spark.createDataFrame(df_transportation)
df_transportation = df_transportation.withColumn("code", df_transportation["code"].cast(IntegerType()))
df_transportation.limit(10).toPandas()

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


<generator object DataFrame.iteritems at 0x7f305a8d7a40>

#### Cleaning Steps
Document steps necessary to clean the data

In [75]:
imm_sample.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: integer (nullable = true)
 |-- airline: string (nullable = 

In [19]:
df_fact = imm_sample[['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa', 'airline']]
# new_columns = ['id', 'year', 'month', 'country_code', 'city_code', 'state_code', 'arrival_date', 'departure_date', 'transportation', 'visa']
df_fact = df_fact.toDF('cicid', 'year', 'month', 'country_code', 'city_code', 'state_code', 'arrival_date', 'departure_date', 'transportation', 'visa', 'airline')

In [18]:
df_fact.limit(5).toPandas()

NameError: name 'df_fact' is not defined

In [23]:
df_fact.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- year: double (nullable = true)
 |-- month: double (nullable = true)
 |-- country_code: double (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- arrival_date: double (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- transportation: double (nullable = true)
 |-- visa: double (nullable = true)



In [106]:
dd.count()

113708

In [13]:
df_dim_ident = imm_sample[['cicid', 'i94cit', 'i94res', 'biryear', 'gender']]
df_dim_ident= df_dim_ident.toDF('cicid', 'citizen_country', 'residence_country', 'birth_year', 'gender')
df_dim_ident.limit(5).toPandas()

Unnamed: 0,cicid,citizen_country,residence_country,birth_year,gender
0,4084316.0,209.0,209.0,1955.0,F
1,4422636.0,582.0,582.0,1990.0,M
2,1195600.0,148.0,112.0,1940.0,M
3,5291768.0,297.0,297.0,1991.0,M
4,985523.0,111.0,111.0,1997.0,F


In [14]:
df_dim_flight = imm_sample[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]
df_dim_flight = df_dim_flight.toDF('cicid', 'airline', 'admin_num', 'flight_number', 'visa_type')
df_dim_flight.limit(5).toPandas()

Unnamed: 0,cicid,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


In [25]:
df_dim_flight.filter('flight_number is null').count()

8

In [18]:
df_dim_flight.count()

992

In [26]:
df_dim_temp = temp_city.filter("Country == 'United States'")
df_dim_temp = df_dim_temp[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_dim_temp = df_dim_temp.toDF('date', 'avg_temp', 'avg_temp_uncertn', 'city', 'country')
df_dim_temp.limit(5).toPandas()

Unnamed: 0,date,avg_temp,avg_temp_uncertn,city,country
0,1820-01-01,2.101,3.217,Abilene,United States
1,1820-02-01,6.926,2.853,Abilene,United States
2,1820-03-01,10.767,2.395,Abilene,United States
3,1820-04-01,17.989,2.202,Abilene,United States
4,1820-05-01,21.809,2.036,Abilene,United States


In [29]:
dt = df_dim_temp.filter(df_dim_temp.date > '2015-01-01')

In [30]:
%%time
dt.sort('date').show(truncate=False)

+----+--------+----------------+----+-------+
|date|avg_temp|avg_temp_uncertn|city|country|
+----+--------+----------------+----+-------+
+----+--------+----------------+----+-------+

CPU times: user 9.98 ms, sys: 1.52 ms, total: 11.5 ms
Wall time: 1min 13s


In [125]:
df_dim_temp.where(F.col('date').between('2016-01-01','2016-12-31')).show(truncate=False)

+----+--------+----------------+----+-------+
|date|avg_temp|avg_temp_uncertn|city|country|
+----+--------+----------------+----+-------+
+----+--------+----------------+----+-------+



### No temperature data available for US cities in 2016 

### ---------------------------------------------------------------------------------

In [15]:
df_dim_city_pop = city_dem[['City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
df_dim_city_pop= df_dim_city_pop.toDF('city', 'state', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race')
df_dim_city_pop.limit(5).toPandas()

Unnamed: 0,city,state,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,Maryland,40601,41862,1562,30908,Hispanic or Latino
1,Quincy,Massachusetts,44129,49500,4147,32935,White
2,Hoover,Alabama,38040,46799,4819,8229,Asian
3,Rancho Cucamonga,California,88127,87105,5821,33878,Black or African-American
4,Newark,New Jersey,138040,143873,5829,86253,White


In [16]:
df_dim_city_stats = city_dem[['City', 'State', 'Median Age', 'Average Household Size']]
df_dim_city_stats= df_dim_city_stats.toDF('city', 'state', 'median_age', 'avg_household_size')
df_dim_city_stats.limit(5).toPandas()

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


In [20]:
def sas_date_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
df_fact = df_fact.toPandas()

In [21]:
df_fact.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 10 columns):
cicid             1000 non-null float64
year              1000 non-null float64
month             1000 non-null float64
country_code      1000 non-null float64
city_code         1000 non-null object
state_code        941 non-null object
arrival_date      1000 non-null float64
departure_date    951 non-null float64
transportation    1000 non-null float64
visa              1000 non-null float64
dtypes: float64(8), object(2)
memory usage: 78.2+ KB


In [21]:
df_fact['arrival_date'] = sas_date_to_datetime(df_fact['arrival_date'])
df_fact['departure_date'] = sas_date_to_datetime(df_fact['departure_date'])

In [12]:
df_fact.head()

Unnamed: 0,cicid,year,month,country_code,city_code,state_code,arrival_date,departure_date,transportation,visa,airline
0,4084316.0,2016.0,4.0,209.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,JL
1,4422636.0,2016.0,4.0,582.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,*GA
2,1195600.0,2016.0,4.0,148.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,LH
3,5291768.0,2016.0,4.0,297.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,QR
4,985523.0,2016.0,4.0,111.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,


In [22]:
df_dim_city_pop = df_dim_city_pop.withColumn('city', F.upper(F.col('city')))
df_dim_city_pop = df_dim_city_pop.withColumn('state', F.upper(F.col('state')))

# df_dim_city_stats
# df_dim_city_pop.toDF(*[c.upper() for c in df.columns])

In [25]:
df_dim_city_pop.show()

+----------------+--------------+--------+----------+------------+------------+--------------------+
|            city|         state|male_pop|female_pop|num_vetarans|foreign_born|                race|
+----------------+--------------+--------+----------+------------+------------+--------------------+
|   SILVER SPRING|      MARYLAND|   40601|     41862|        1562|       30908|  Hispanic or Latino|
|          QUINCY| MASSACHUSETTS|   44129|     49500|        4147|       32935|               White|
|          HOOVER|       ALABAMA|   38040|     46799|        4819|        8229|               Asian|
|RANCHO CUCAMONGA|    CALIFORNIA|   88127|     87105|        5821|       33878|Black or African-...|
|          NEWARK|    NEW JERSEY|  138040|    143873|        5829|       86253|               White|
|          PEORIA|      ILLINOIS|   56229|     62432|        6634|        7517|American Indian a...|
|        AVONDALE|       ARIZONA|   38712|     41971|        4815|        8355|Black or Afr

In [23]:
df_dim_city_stats = df_dim_city_stats.withColumn('city', F.upper(F.col('city')))
df_dim_city_stats = df_dim_city_stats.withColumn('state', F.upper(F.col('state')))


In [27]:
df_dim_city_stats.show()

+----------------+--------------+----------+------------------+
|            city|         state|median_age|avg_household_size|
+----------------+--------------+----------+------------------+
|   SILVER SPRING|      MARYLAND|      33.8|               2.6|
|          QUINCY| MASSACHUSETTS|      41.0|              2.39|
|          HOOVER|       ALABAMA|      38.5|              2.58|
|RANCHO CUCAMONGA|    CALIFORNIA|      34.5|              3.18|
|          NEWARK|    NEW JERSEY|      34.6|              2.73|
|          PEORIA|      ILLINOIS|      33.1|               2.4|
|        AVONDALE|       ARIZONA|      29.1|              3.18|
|     WEST COVINA|    CALIFORNIA|      39.8|              3.56|
|        O'FALLON|      MISSOURI|      36.0|              2.77|
|      HIGH POINT|NORTH CAROLINA|      35.5|              2.65|
|          FOLSOM|    CALIFORNIA|      40.9|              2.62|
|          FOLSOM|    CALIFORNIA|      40.9|              2.62|
|    PHILADELPHIA|  PENNSYLVANIA|      3

# Sample Query on The Data Model:

In [None]:
df_fact_immi.printSchema()

In [21]:
df_fact_immi=spark.read.parquet("output/df_fact_immi")
df_dim_ident=spark.read.parquet("output/df_dim_ident")
df_dim_flight=spark.read.parquet("output/df_dim_flight")
df_dim_city_pop=spark.read.parquet("output/df_dim_city_pop")
df_dim_city_stats=spark.read.parquet("output/df_dim_city_stats")
df_country_code=spark.read.parquet("output/df_country_code")
df_city_code=spark.read.parquet("output/df_city_code")
df_state_code=spark.read.parquet("output/df_state_code")
df_visa_code=spark.read.parquet('output/df_visa_code')
df_transportation=spark.read.parquet('output/df_transportation')

In [37]:
city_dem.select('city', 'Foreign-born').sort(F.desc('Foreign-born')).show()

+-----------+------------+
|       city|Foreign-born|
+-----------+------------+
|   New York|     3212500|
|   New York|     3212500|
|   New York|     3212500|
|   New York|     3212500|
|   New York|     3212500|
|Los Angeles|     1485425|
|Los Angeles|     1485425|
|Los Angeles|     1485425|
|Los Angeles|     1485425|
|Los Angeles|     1485425|
|    Houston|      696210|
|    Houston|      696210|
|    Houston|      696210|
|    Houston|      696210|
|    Houston|      696210|
|    Chicago|      573463|
|    Chicago|      573463|
|    Chicago|      573463|
|    Chicago|      573463|
|    Chicago|      573463|
+-----------+------------+
only showing top 20 rows



In [38]:
city_born = city_dem.select('City', 'Foreign-born').dropDuplicates().sort(F.desc('Foreign-born'))

In [39]:
city_born = city_born.withColumn('City', F.upper(F.col('City')))

In [40]:
city_immi_nums = df_fact_immi.groupBy('city_code').count().sort(F.desc('count'))

In [41]:
city_nums = df_city_code.join(city_immi_nums, df_city_code.code ==  city_immi_nums.city_code,"left")

In [42]:
city_nums = city_nums.select('city', 'count').sort(F.desc('count'))

In [43]:
city_nums = city_nums.withColumn('city', F.split(city_nums['city'], ',').getItem(0))

In [44]:
city_born.join(city_nums,city_born.City ==  city_nums.city,"inner").show()

+---------------+------------+---------------+------+
|           City|Foreign-born|           city| count|
+---------------+------------+---------------+------+
|       NEW YORK|     3212500|       NEW YORK|485916|
|          MIAMI|      260789|          MIAMI|343941|
|    LOS ANGELES|     1485425|    LOS ANGELES|310163|
|  SAN FRANCISCO|      297199|  SAN FRANCISCO|152586|
|        ORLANDO|       50558|        ORLANDO|149195|
|        CHICAGO|      573463|        CHICAGO|130564|
|        HOUSTON|      696210|        HOUSTON|101481|
|FORT LAUDERDALE|       47582|FORT LAUDERDALE| 95977|
|        ATLANTA|       32016|        ATLANTA| 92579|
|      LAS VEGAS|      127609|      LAS VEGAS| 89280|
|         DALLAS|      326825|         DALLAS| 71809|
|         BOSTON|      190123|         BOSTON| 57354|
|        SEATTLE|      119840|        SEATTLE| 47719|
|        PHOENIX|      300702|        PHOENIX| 38890|
|        DETROIT|       39861|        DETROIT| 37832|
|          TAMPA|       5879

> As the query above shows that a direct correlation between cities with largest counts of immigrants and largest counts of foreign-born exists

___

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
This project implements the Star Schema data model to use it for the purpose of analytics and BI dashboards.
#### 3.2 Mapping Out Data Pipelines


![alt text](project model.png)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Data Pipeline Steps:

- Assume our datasets reside in an S3 bucket
- **[Source_S3_Bucket]**/18-83510-I94-Data-2016/*.sas7bdat
- **[Source_S3_Bucket]**/I94_SAS_Labels_Descriptions.SAS
- **[Source_S3_Bucket]**/us-cities-demographics.csv
- Perform some data perparations and clean-ups on our datasets.
- Transform I94_Immigration dataset into a fact table - partitioned by `state` - and 2 dimension tables.
- Parsing label description file to get auxiliary descriptive tables
- Transform US City Demographics dataset into 2 dimension tables
- Populate analytics tables and store in destination S3 bucket

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### Data quality checks includes

- All PK columns in the relational data model have no null values


- No empty table after running ETL data pipeline


run `data_quality.py` to start the data quality check.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### The full source data dictionary:
Refer to [data_dictionary.md](/data_dictionary)
<br>
#### The data model dictionary:
Refer to [model_data_dictionary.md](/model_data_dictionary)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Tools and Technologies:
+ AWS S3 for data storage
+ Pandas for sample data set exploratory data analysis
+ PySpark for large data set data processing to transform staging table to dimensional table

#### how often the data should be updated and why:

- ***Pipeline will be scheduled monthly as immigration data is the primary datasource is on a monthly schedule***

#### Write a description of how you would approach the problem differently under the following scenarios:

#### The data was increased by 100x.
- Spark can handle the increase. if the standanlone mode cannot, [AWS EMR](https://aws.amazon.com/emr/) is preferabele to make use of multiple nodes adding to the computing power through distributed systems.

<br>

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
- [Apache Airflow](https://airflow.apache.org/) will be used to schedule and run data pipelines.

<br>

#### The database needed to be accessed by 100+ people.
- We shall move our analytics database into [Amazon Redshift](https://aws.amazon.com/redshift/) as it can handle up to 500 concurrent connections per cluster. 