# US Immigration
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
import psycopg2
import boto3
from sql_queries import create_table_queries, drop_table_queries, copy_table_queries

In [2]:
# Read in the data here
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
global_temp_df = pd.read_csv(fname)

In [3]:
global_temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [2]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
immigration_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
airport_codes_df = pd.read_csv('data/airport-codes_csv.csv')
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [6]:
us_demo = pd.read_csv('data/us-cities-demographics.csv')
us_demo.head()

Unnamed: 0.1,Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,Number_of_Veterans,Foreign-born,Average_household_size,State_code,Race,Count
0,0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
us_demo = us_demo.drop('Unnamed: 0', axis = 1)

In [8]:
us_demo.head()

Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,Number_of_Veterans,Foreign-born,Average_household_size,State_code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [9]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [None]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

In [4]:
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [10]:
# Performing cleaning tasks here
# converting Label Descriptions SAS file to CSVs
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

In [11]:
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [12]:
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}

In [10]:
i94cit_res_df = pd.read_csv('data/i94cit_res.csv')
i94cit_res_df.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [11]:
i94cit_res_df.code = i94cit_res_df.code.drop_duplicates()

In [12]:
i94port_df = pd.read_csv('data/i94port.csv')
i94port_df.head()

Unnamed: 0,code,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [13]:
i94port_df.code.count()

660

In [14]:
i94port_df.code.nunique()

660

In [15]:
i94mode_df = pd.read_csv('data/i94mode.csv')
i94mode_df.head()

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [16]:
i94addr_df = pd.read_csv('data/i94addr.csv')
i94addr_df.head()

Unnamed: 0,code,addr
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [20]:
i94visa = pd.read_csv('data/i94visa.csv')

In [21]:
i94visa.head()

Unnamed: 0,code,type
0,1,Business
1,2,Pleasure
2,3,Student


#### immigration_df clean ####

In [23]:
immigration_df.isnull().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,False,False,False,False,False,False,False,True,True,True,...,False,True,False,False,True,True,True,False,True,False
1,False,False,False,False,False,False,False,False,False,True,...,False,True,False,False,False,True,True,False,False,False
2,False,False,False,False,False,False,False,False,False,False,...,True,False,False,False,False,True,False,False,False,False
3,False,False,False,False,False,False,False,False,False,False,...,True,False,False,False,True,True,False,False,False,False
4,False,False,False,False,False,False,False,False,False,False,...,True,False,False,False,True,True,False,False,False,False


In [24]:
immigration_df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [25]:
immigration_df.shape

(3096313, 28)

In [26]:
# Exploring immigration_df data to ensure no outliers
immigration_df.i94cit.unique()

array([ 692.,  254.,  101.,  102.,  103.,  104.,  105.,  107.,  108.,
        109.,  110.,  111.,  112.,  113.,  114.,  115.,  116.,  117.,
        118.,  119.,  121.,  122.,  123.,  124.,  126.,  127.,  128.,
        129.,  130.,  131.,  133.,  135.,  140.,  141.,  145.,  147.,
        148.,  151.,  152.,  153.,  154.,  155.,  156.,  157.,  159.,
        162.,  163.,  164.,  165.,  166.,  167.,  201.,  203.,  204.,
        206.,  207.,  209.,  213.,  218.,  220.,  236.,  242.,  244.,
        245.,  249.,  250.,  251.,  253.,  255.,  256.,  257.,  258.,
        260.,  261.,  262.,  263.,  266.,  267.,  268.,  272.,  273.,
        274.,  296.,  297.,  298.,  299.,  301.,  304.,  310.,  311.,
        315.,  316.,  320.,  323.,  324.,  326.,  329.,  332.,  336.,
        338.,  339.,  340.,  342.,  343.,  344.,  345.,  348.,  350.,
        352.,  368.,  369.,  370.,  371.,  372.,  373.,  375.,  376.,
        382.,  383.,  384.,  385.,  386.,  387.,  388.,  389.,  390.,
        391.,  392.,

In [27]:
immigration_df.i94res.unique()

array([ 692.,  276.,  101.,  110.,  117.,  112.,  251.,  102.,  103.,
        104.,  111.,  119.,  123.,  124.,  131.,  135.,  260.,  296.,
        297.,  343.,  438.,  526.,  579.,  582.,  688.,  689.,  691.,
        108.,  121.,  129.,  272.,  373.,  514.,  581.,  105.,  107.,
        116.,  209.,  113.,  118.,  130.,  245.,  321.,  322.,  368.,
        464.,  109.,  340.,  122.,  206.,  207.,  255.,  261.,  316.,
        409.,  411.,  504.,  507.,  508.,  509.,  511.,  512.,  585.,
        586.,  601.,  604.,  687.,  690.,  694.,  696.,  736.,  749.,
        154.,  114.,  127.,  315.,  145.,  115.,  249.,  264.,  214.,
        350.,  369.,  528.,  575.,  577.,  693.,  695.,  745.,  236.,
        273.,  332.,  532.,  602.,  721.,  218.,  126.,  324.,  128.,
        516.,  584.,  157.,  143.,  213.,  263.,  339.,  344.,  387.,
        513.,  525.,  527.,  529.,  140.,  141.,  167.,  576.,  151.,
        152.,  153.,  162.,  603.,  155.,  156.,  159.,  163.,  164.,
        165.,  166.,

In [3]:
immigration_df.i94port.unique()

array(['XXX', 'ATL', 'WAS', 'NYC', 'TOR', 'BOS', 'HOU', 'MIA', 'CHI',
       'LOS', 'CLT', 'DEN', 'DAL', 'DET', 'NEW', 'FTL', 'LVG', 'ORL',
       'NOL', 'PIT', 'SFR', 'SPM', 'POO', 'PHI', 'SEA', 'SLC', 'TAM',
       'HAM', 'NAS', 'VCV', 'MAA', 'AUS', 'HHW', 'OGG', 'PHO', 'SDP',
       'SFB', 'EDA', 'MON', 'CLG', 'DUB', 'FMY', 'YGF', 'SAJ', 'CIN',
       'BAL', 'RDU', 'WPB', 'STT', 'OAK', 'NSV', 'SNA', 'OTT', 'X96',
       '5KE', 'CLE', 'HAR', 'PSP', 'CHR', 'HAL', 'SAA', 'KOA', 'SHA',
       'WIN', 'BGM', 'NCA', 'OPF', 'SAI', 'JFA', 'AGA', 'ONT', 'CLM',
       'STL', 'W55', 'CHS', 'SNJ', 'SRQ', 'ANC', 'LNB', 'LIH', 'MIL',
       'INP', 'KAN', 'ROC', 'SAC', 'BRO', 'LAR', 'RNO', 'SGR', 'ELP',
       'MCA', 'MDT', 'SPE', 'FPR', 'SYR', 'ICT', 'MLB', 'ADS', 'TUC',
       'DLR', 'CAE', 'CHA', 'HSV', 'WIL', 'HPN', 'HEF', 'BRG', 'BED',
       'DAB', 'JAC', 'FRB', 'SWF', 'KEY', 'PTK', 'MWH', 'X44', 'MYR',
       'APF', 'ATW', 'PVD', 'BUF', 'PIE', 'MHT', 'BDL', 'NYL', 'VNY',
       '5T6', 'LEX',

In [4]:
# Dropping rows with "XXX" in i94port
immigration_df.drop(immigration_df[immigration_df['i94port'] == 'XXX'].index, inplace = True)

In [5]:
# Checking if XXX is dropped
immigration_df.i94port.unique()

array(['ATL', 'WAS', 'NYC', 'TOR', 'BOS', 'HOU', 'MIA', 'CHI', 'LOS',
       'CLT', 'DEN', 'DAL', 'DET', 'NEW', 'FTL', 'LVG', 'ORL', 'NOL',
       'PIT', 'SFR', 'SPM', 'POO', 'PHI', 'SEA', 'SLC', 'TAM', 'HAM',
       'NAS', 'VCV', 'MAA', 'AUS', 'HHW', 'OGG', 'PHO', 'SDP', 'SFB',
       'EDA', 'MON', 'CLG', 'DUB', 'FMY', 'YGF', 'SAJ', 'CIN', 'BAL',
       'RDU', 'WPB', 'STT', 'OAK', 'NSV', 'SNA', 'OTT', 'X96', '5KE',
       'CLE', 'HAR', 'PSP', 'CHR', 'HAL', 'SAA', 'KOA', 'SHA', 'WIN',
       'BGM', 'NCA', 'OPF', 'SAI', 'JFA', 'AGA', 'ONT', 'CLM', 'STL',
       'W55', 'CHS', 'SNJ', 'SRQ', 'ANC', 'LNB', 'LIH', 'MIL', 'INP',
       'KAN', 'ROC', 'SAC', 'BRO', 'LAR', 'RNO', 'SGR', 'ELP', 'MCA',
       'MDT', 'SPE', 'FPR', 'SYR', 'ICT', 'MLB', 'ADS', 'TUC', 'DLR',
       'CAE', 'CHA', 'HSV', 'WIL', 'HPN', 'HEF', 'BRG', 'BED', 'DAB',
       'JAC', 'FRB', 'SWF', 'KEY', 'PTK', 'MWH', 'X44', 'MYR', 'APF',
       'ATW', 'PVD', 'BUF', 'PIE', 'MHT', 'BDL', 'NYL', 'VNY', '5T6',
       'LEX', 'NOR',

In [31]:
immigration_df.i94bir.unique()

array([  25.,   55.,   28.,    4.,   57.,   63.,   46.,   48.,   52.,
         33.,   58.,   56.,   62.,   49.,   43.,   53.,   74.,   37.,
         65.,   35.,   32.,   38.,   68.,   61.,   41.,   45.,   54.,
         29.,   42.,   34.,   47.,   64.,   27.,   59.,   60.,   66.,
         51.,   22.,   39.,   20.,   50.,   44.,   40.,   31.,   23.,
         36.,    2.,    0.,   70.,   26.,   30.,   16.,   14.,   21.,
         24.,    1.,   77.,   73.,   71.,    6.,   72.,    5.,   76.,
         69.,   67.,    3.,   10.,   18.,   19.,   11.,   17.,    9.,
          8.,   12.,   75.,    7.,   13.,   15.,   82.,   84.,   78.,
         81.,   87.,   79.,   80.,   83.,   91.,   85.,   86.,   88.,
         90.,   89.,   97.,   96.,   93.,   92.,  100.,   95.,   98.,
         94.,   99.,   nan,  109.,  108.,  107.,  101.,  105.,  102.,
        103.,   -3.,  114.,  110.,  111.])

In [6]:
immigration_df = immigration_df[['cicid','i94yr','i94mon','i94cit','i94res','i94port','arrdate','i94mode','i94addr','depdate','i94bir','i94visa','count','visapost','biryear','gender','airline','visatype']]
immigration_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,visapost,biryear,gender,airline,visatype
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,SEO,1991.0,M,,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,,1961.0,M,OS,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,,1988.0,,AA,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,,2012.0,,AA,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,,1959.0,,AZ,B1


In [7]:
immigration_df = immigration_df.dropna()
immigration_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,visapost,biryear,gender,airline,visatype
12,27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,58.0,1.0,1.0,TIA,1958.0,M,LH,B1
13,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20549.0,56.0,1.0,1.0,TIA,1960.0,F,LH,B1
14,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20561.0,62.0,2.0,1.0,TIA,1954.0,M,AZ,B2
15,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NJ,20578.0,49.0,2.0,1.0,TIA,1967.0,M,OS,B2
16,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NY,20611.0,43.0,2.0,1.0,TIA,1973.0,M,OS,B2


In [8]:
#cicid is primary key
immigration_df.cicid.drop_duplicates()

12              27.0
13              28.0
14              29.0
15              30.0
16              31.0
17              33.0
20              36.0
21              37.0
22              38.0
23              39.0
24              40.0
25              41.0
26              42.0
27              47.0
28              48.0
29              49.0
30              50.0
31              51.0
32              52.0
34              54.0
539            609.0
540            610.0
541            612.0
542            613.0
543            620.0
544            621.0
545            624.0
546            625.0
547            626.0
548            627.0
             ...    
3028561    6063608.0
3028562    6063609.0
3028563    6063610.0
3028564    6063611.0
3028565    6063612.0
3028566    6063613.0
3028567    6063614.0
3028568    6063615.0
3028569    6063616.0
3028570    6063617.0
3028571    6063618.0
3028572    6063619.0
3028573    6063620.0
3028574    6063621.0
3028575    6063622.0
3028576    6063623.0
3028577    60

In [35]:
# Final Immigration Table
immigration_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,visapost,biryear,gender,airline,visatype
12,27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,58.0,1.0,1.0,TIA,1958.0,M,LH,B1
13,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20549.0,56.0,1.0,1.0,TIA,1960.0,F,LH,B1
14,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20561.0,62.0,2.0,1.0,TIA,1954.0,M,AZ,B2
15,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NJ,20578.0,49.0,2.0,1.0,TIA,1967.0,M,OS,B2
16,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NY,20611.0,43.0,2.0,1.0,TIA,1973.0,M,OS,B2


In [9]:
# Exporting cleaned data to .csv file
immigration_df.to_csv('immigration_data_cleaned.csv', index = False)

#### global_temp clean ####

In [37]:
global_temp_df.count()

dt                               8599212
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8599212
Country                          8599212
Latitude                         8599212
Longitude                        8599212
dtype: int64

In [38]:
global_temp_df = global_temp_df.dropna()

In [39]:
global_temp_df.count()

dt                               8235082
AverageTemperature               8235082
AverageTemperatureUncertainty    8235082
City                             8235082
Country                          8235082
Latitude                         8235082
Longitude                        8235082
dtype: int64

In [40]:
global_temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E


In [41]:
global_temp_df.AverageTemperature.nunique()

111994

In [42]:
print(global_temp_df.AverageTemperature.min())
print(global_temp_df.AverageTemperature.max())

-42.704
39.651


In [43]:
global_temp_df = global_temp_df.drop_duplicates()

In [44]:
#Exporting cleaned data to .csv file
global_temp_df.to_csv('global_temp_clean.csv', index = False)

#### us_demo clean ####

In [45]:
us_demo.shape

(2875, 12)

In [46]:
us_demo = us_demo.drop_duplicates()
us_demo = us_demo.dropna()

In [47]:
us_demo.head()

Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,Number_of_Veterans,Foreign-born,Average_household_size,State_code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [48]:
us_demo.shape

(2875, 12)

In [49]:
us_demo.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2875 entries, 0 to 2874
Data columns (total 12 columns):
City                      2875 non-null object
State                     2875 non-null object
Median_Age                2875 non-null float64
Male_Population           2875 non-null float64
Female_Population         2875 non-null float64
Total_Population          2875 non-null int64
Number_of_Veterans        2875 non-null float64
Foreign-born              2875 non-null float64
Average_household_size    2875 non-null float64
State_code                2875 non-null object
Race                      2875 non-null object
Count                     2875 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 292.0+ KB


In [50]:
# Exporting cleaned data to .csv file
us_demo.to_csv('us_demo_clean.csv', index = False)

#### airport_codes_df clean ####

In [51]:
airport_codes_df.shape

(55075, 12)

In [52]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [53]:
# Parse coordinates column into longitude and latitude
airport_codes_df['Longitude'] = airport_codes_df['coordinates'].apply(lambda x: x.split(',')[0])
airport_codes_df['Latitude'] = airport_codes_df['coordinates'].apply(lambda x: x.split(',')[1])

In [54]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,Longitude,Latitude
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",-74.93360137939453,40.07080078125
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",-151.695999146,59.94919968
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",-86.77030181884766,34.86479949951172
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087",-91.254898,35.6087


In [55]:
airport_codes_df = airport_codes_df[['ident','type','name','elevation_ft','continent','iso_country','iso_region','municipality','gps_code','iata_code','local_code','Longitude','Latitude']]
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,Longitude,Latitude
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,-74.93360137939453,40.07080078125
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,-151.695999146,59.94919968
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,-86.77030181884766,34.86479949951172
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,-91.254898,35.6087


In [56]:
airport_codes_df.Longitude = airport_codes_df.Longitude.astype(float).round(2)
airport_codes_df.Latitude = airport_codes_df.Latitude.astype(float).round(2)

In [57]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,Longitude,Latitude
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,-74.93,40.07
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,-101.47,38.7
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,-151.7,59.95
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,-86.77,34.86
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,-91.25,35.61


In [58]:
# Replacing continent and iata_code NaN (North America) to NorAm to eliminate descriptancies
airport_codes_df.continent = airport_codes_df.continent.astype(str).apply(lambda x: x.replace('nan','NorAm'))
airport_codes_df.iata_code = airport_codes_df.iata_code.astype(str).apply(lambda x: x.replace('nan','NorAm'))
airport_codes_df = airport_codes_df.dropna()
airport_codes_df.elevation_ft = airport_codes_df.elevation_ft.astype(object)
airport_codes_df = airport_codes_df.drop_duplicates()

In [59]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,Longitude,Latitude
0,00A,heliport,Total Rf Heliport,11,NorAm,US,US-PA,Bensalem,00A,NorAm,00A,-74.93,40.07
1,00AA,small_airport,Aero B Ranch Airport,3435,NorAm,US,US-KS,Leoti,00AA,NorAm,00AA,-101.47,38.7
2,00AK,small_airport,Lowell Field,450,NorAm,US,US-AK,Anchor Point,00AK,NorAm,00AK,-151.7,59.95
3,00AL,small_airport,Epps Airpark,820,NorAm,US,US-AL,Harvest,00AL,NorAm,00AL,-86.77,34.86
5,00AS,small_airport,Fulton Airport,1100,NorAm,US,US-OK,Alex,00AS,NorAm,00AS,-97.82,34.94


In [60]:
airport_codes_df.isnull().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,Longitude,Latitude
0,False,False,False,False,False,False,False,False,False,False,False,False,False
1,False,False,False,False,False,False,False,False,False,False,False,False,False
2,False,False,False,False,False,False,False,False,False,False,False,False,False
3,False,False,False,False,False,False,False,False,False,False,False,False,False
5,False,False,False,False,False,False,False,False,False,False,False,False,False


In [61]:
airport_codes_df.shape

(24387, 13)

In [62]:
airport_codes_df.Longitude = airport_codes_df.Longitude.astype(object)
airport_codes_df.Latitude = airport_codes_df.Latitude.astype(object)
airport_codes_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 24387 entries, 0 to 54896
Data columns (total 13 columns):
ident           24387 non-null object
type            24387 non-null object
name            24387 non-null object
elevation_ft    24387 non-null object
continent       24387 non-null object
iso_country     24387 non-null object
iso_region      24387 non-null object
municipality    24387 non-null object
gps_code        24387 non-null object
iata_code       24387 non-null object
local_code      24387 non-null object
Longitude       24387 non-null object
Latitude        24387 non-null object
dtypes: object(13)
memory usage: 2.6+ MB


In [63]:
#primary key
airport_codes_df.ident.nunique()

24387

In [64]:
# Exporting cleaned data to .csv file
airport_codes_df.to_csv('airport_data_cleaned.csv', index = False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Fact Table ####

In [65]:
immigration_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,visapost,biryear,gender,airline,visatype
12,27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,58.0,1.0,1.0,TIA,1958.0,M,LH,B1
13,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20549.0,56.0,1.0,1.0,TIA,1960.0,F,LH,B1
14,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20561.0,62.0,2.0,1.0,TIA,1954.0,M,AZ,B2
15,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NJ,20578.0,49.0,2.0,1.0,TIA,1967.0,M,OS,B2
16,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NY,20611.0,43.0,2.0,1.0,TIA,1973.0,M,OS,B2


#### Dimension Tables ####

In [67]:
i94port_df.head()

Unnamed: 0,code,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [64]:
i94mode_df.head()

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [65]:
i94addr_df.head()

Unnamed: 0,code,addr
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [66]:
us_demo.head()

Unnamed: 0,City,State,Median_Age,Male_Population,Female_Population,Total_Population,Number_of_Veterans,Foreign-born,Average_household_size,State_code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [67]:
i94visa

Unnamed: 0,code,type
0,1,Business
1,2,Pleasure
2,3,Student


In [68]:
i94cit_res_df.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [69]:
i94cit_res_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 289 entries, 0 to 288
Data columns (total 2 columns):
code       289 non-null int64
country    289 non-null object
dtypes: int64(1), object(1)
memory usage: 4.6+ KB


![Data Model](F&Dcapstone.PNG)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [2]:
# Write code here

#creating necessary tables
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('capstone.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()

    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

In [3]:
#inserting values from S3 into Redshift
def load_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()
        
def main():
    config = configparser.ConfigParser()
    config.read('capstone.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()
    
    load_tables(cur,conn)

    
if __name__ == "__main__":
    main()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

### Perform quality checks here
- To ensure all data integrity, and completeness in the Amazon Redshift database, Count checks of all tables in rows should match the count checks of the csv files. 

#### Airport_codes_df count

In [69]:
airport_codes_df.shape

(24387, 13)

#### Global_temp count

In [70]:
global_temp_df.shape

(8235082, 7)

#### immigration_df count

In [71]:
immigration_df.shape

(1058572, 18)

#### us_demographics count

In [72]:
us_demo.shape

(2875, 12)

#### i94addr, i94cit_res, i94mode, i94port, i94visa count

In [78]:
print('shape of i94: '+str(i94addr_df.shape))
print('shape of i94cit_res: '+str(i94cit_res_df.shape))
print('shape of i94mode: '+str(i94mode_df.shape))
print('shape of i94port: '  +str(i94port_df.shape))
print('shape of i94visa: ' +str(i94visa.shape))

shape of i94: (55, 2)
shape of i94cit_res: (289, 2)
shape of i94mode: (4, 2)
shape of i94port: (660, 2)
shape of i94visa: (3, 2)


### Checking Redshift DB for corresponding counts

In [4]:
redshift_endpoint = ""
redshift_user = ""
redshift_pass = ""
port = 5439
dbname = ""

In [5]:
from sqlalchemy import create_engine
from sqlalchemy import text
engine_string = "postgresql+psycopg2://%s:%s@%s:%d/%s" \
% (redshift_user, redshift_pass, redshift_endpoint, port, dbname)
engine = create_engine(engine_string)

In [6]:
sql = """SELECT count(*) from airport_code"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,24388


In [7]:
sql = """SELECT count(*) from global_temp_city"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,8235083


In [8]:
sql = """SELECT count(*) from i94_immigration_table"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,1058573


In [9]:
sql = """SELECT count(*) from us_demo"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,2876


In [10]:
sql = """SELECT count(*) from i94port"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,661


In [11]:
sql = """SELECT count(*) from i94addr"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,56


In [12]:
sql = """SELECT count(*) from i94cit_res"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,290


In [13]:
sql = """SELECT count(*) from i94mode"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,5


In [14]:
sql = """SELECT count(*) from i94visa"""
pd.read_sql_query(text(sql), engine)

Unnamed: 0,count
0,4


All data is successfully in Amazon Redshift DB

### Data Integrity check

- To ensure data integrity, each table must have a column that will uniquely identity a row. In other words, each table must have a PRIMARY KEY associated with it to ensure every single row in the table is unique. 
- To ensure this is achieved, the number of unique rows in a column must be equivalent to the number of rows in the entire table. Please see below for the data checks.

#### Fact Table

In [25]:
row_count = immigration_df.cicid.count()
primary_key_count = immigration_df.cicid.nunique()

if row_count == primary_key_count:
    print('Pass Data Integrity Check with correct constraints (Primary key)')
else:
    print('Does not pass Data Integrity Check')

Pass Data Integrity Check with correct constraints (Primary key)


#### Dimension tables

In [27]:
row_count = i94addr_df.code.count()
primary_key_count = i94addr_df.code.nunique()

if row_count == primary_key_count:
    print('Pass Data Integrity Check with correct constraints (Primary key)')
else:
    print('Does not pass Data Integrity Check')

Pass Data Integrity Check with correct constraints (Primary key)


In [28]:
row_count = i94cit_res_df.code.count()
primary_key_count = i94cit_res_df.code.nunique()

if row_count == primary_key_count:
    print('Pass Data Integrity Check with correct constraints (Primary key)')
else:
    print('Does not pass Data Integrity Check')

Pass Data Integrity Check with correct constraints (Primary key)


In [29]:
row_count = i94mode_df.code.count()
primary_key_count = i94mode_df.code.nunique()

if row_count == primary_key_count:
    print('Pass Data Integrity Check with correct constraints (Primary key)')
else:
    print('Does not pass Data Integrity Check')

Pass Data Integrity Check with correct constraints (Primary key)


In [30]:
row_count = i94port_df.code.count()
primary_key_count = i94port_df.code.nunique()

if row_count == primary_key_count:
    print('Pass Data Integrity Check with correct constraints (Primary key)')
else:
    print('Does not pass Data Integrity Check')

Pass Data Integrity Check with correct constraints (Primary key)


In [31]:
row_count = i94visa.code.count()
primary_key_count = i94visa.code.nunique()

if row_count == primary_key_count:
    print('Pass Data Integrity Check with correct constraints (Primary key)')
else:
    print('Does not pass Data Integrity Check')

Pass Data Integrity Check with correct constraints (Primary key)


### Data Quality Checks Complete

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Dimension Tables

Tables will contain the following columns from the I94 data.

I94ADDR : This format shows all the valid ADDR for each corresponding code

I94CIT : This format shows all the valid CIT for each corresponding code

I94MODE : This format shows all the valid MODE for each corresponding code

I94VISA : Visa code (Business/Pleasure/Student)

I94PORT : map the city/country/location to the corresponding I94 port code.


#### Fact Tables

fact_immigration allows queries in line with the intended purpose of the project:

CICID : Identification Number

I94YR : 4 digit year

I94MON : Numeric month

I94CIT : This format shows all the valid and invalid codes for processing

I94RES : This format shows all the valid RES for each corresponding code

I94PORT : This format shows all the valid and invalid codes for processing

ARRDATE : Arrival date

I94MODE : There are missing values as well as not reported (9)

I94ADDR : This format shows all the valid ADDR for each corresponding code

DEPDATE : Departure Date

I94BIR : Age of Respondent in Years

I94VISA : Type of VISA

VISAPOST : Visa Post

BIRYEAR : Birth Year

GENDER : Gender (Sex)

AIRLINE : Type of Airline

VISATYPE : Visa Type



#### Step 5: Complete Project Write Up
 * What's the goal? What queries will you want to run? How would Spark or Airflow be incorporated? Why did you choose the model you chose?
        - The I94 immigration data, US demographical and airport data are going to be used in this project. A data base in snow flake schema is planned to be built and stored in AWS S3 bucket. The ETL processing is performed with Apache Spark in local mode.This work can be of great help for those doing analysis at immigration services. Therefore the snow flake schema is suitable for this relation. Furthermore, the airport information are connected with port information in the fact table.


* Clearly state the rationale for the choice of tools and technologies for the project.
        - Python Pandas was used to explore the data since it has an intuitive and easy to use API. Cloud services such as Amazon Web Services (AWS) S3 buckets and Redshift databases were utilized. AWS S3 was utilized to store the necessary data files where if needed can be easily retrieved. A Redshift data warehouse was created to store the data. In addition, Apache Spark was chosen to build the ETL pipeline since it can handle large amounts of data simply by scaling up the hardware.
  
  
* Propose how often the data should be updated and why: 
        - Pipeline will be scheduled monthly as immigration data is the primary datasource and is on a monthly granularity
    
    
* Post your write-up and final data model
        - In this project the snow flake schema is applied. The main entry information are contained in a fact table, and the respondent information can be extracted from the original I94 datasets. Because the address info is highly related to the visitors, it is much more clear to let the respondent table to include this information. And the address information is the key info of the demographical table (dataset).
    
    
* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x: 
         - Will have to use partitioning functionality in the dag, might also need to use Cloud services like AWS EMR to use spark for processing data.
         
 * The data populates a dashboard that must be updated on a daily basis by 7am every day: 
         - Will need to update the schedule of the DAG accordingly as make sure we have data needed for the dashboard.
         
 * The database needed to be accessed by 100+ people: 
         - The more people accessing the database the more cpu resources you need to get a fast experience. By using a distributed database you can to improve your replications and partitioning to get faster query results for each user.
