# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project aims to study patterns in the behavior of immigrants entering the United States of America. Which countries are these people coming from? Are they disproportinately male or female? Is there is a preferred point of entry in the US for the immigrants? Last, but not the least is there any seasonality in the data and is immigration driven by the seasons of the country the immigrants are coming from?

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Importing Necessary libraries 

In [3]:
import re
import os
import boto3
import configparser

import numpy as np
import pandas as pd
import psycopg2

### Configuration

In [5]:
config = configparser.ConfigParser()
config = configparser.ConfigParser()
config.read('dl.cfg')
#aws_access_key = config['AWS']['AWS_ACCESS_KEY_ID']
#aws_secret_key = config['AWS']['AWS_SECRET_ACCESS_KEY']

['dl.cfg']

In [6]:
#s3_creds = {'region_name':"us-west-2",
#            'aws_access_key_id': aws_access_key,
#            'aws_secret_access_key': aws_secret_key}
                          
client = boto3.client('s3')#, **s3_creds)
resource = boto3.resource('s3')#, **s3_creds)
bucket = resource.Bucket(config['S3']['BUCKET'])

### Step 1: Scope the Project and Gather Data

#### Scope 
This project deals with engineering a data pipeline to create a Data Warehouse for the immigration data of USA. The immigration data has been enriched with information on various airports across the globe, U.S. city demographic data and world temperature data.

For the purpose of this project, the data is first loaded into S3. From S3 the data is cleaned and transformed in Python following which it is loaded into a staging areas on S3. From S3 data is dumped to a data warehouse on Reshift in the form of a star schema. The last step is checking if the tables were properly loaded into Redshift.


#### Describe and Gather Data 

The following datasets are included in this project.

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. This data contains information on Immigrants arrival and departure from US, the airline used, the type of visa immigrants had etc.
* World Temperature Data: This data contains global land temperature by city. This data set comes from Kaggle.
* U.S. City Demographic Data: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from OpenSoft.
* Airport Code Table: This is a simple table of airport codes and corresponding cities. This data comes from https://datahub.io/core/airport-codes#data.

### Creating the lookup tables from .SAS files (One-time exercise)

In [6]:
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

In [7]:
def code_mapper(file, idx):
    """
    Extracts required information from the .SAS file read in python
    """
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [8]:
## Create lookup tables

i94cit_res = pd.DataFrame.from_dict(data=code_mapper(f_content, "i94cntyl"), orient='index').reset_index().drop_duplicates()
i94port = pd.DataFrame.from_dict(data=code_mapper(f_content, "i94prtl"), orient='index').reset_index().drop_duplicates()
i94mode = pd.DataFrame.from_dict(data=code_mapper(f_content, "i94model"), orient='index').reset_index().drop_duplicates()
i94addr = pd.DataFrame.from_dict(data=code_mapper(f_content, "i94addrl"), orient='index').reset_index().drop_duplicates()
i94visa = pd.DataFrame.from_dict(data={'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}, orient='index').reset_index().drop_duplicates()


In [9]:
## Setting column names for Lookup tables
i94cit_res.columns = ['id', 'country']
i94port.columns = ['id', 'port']
i94mode.columns = ['id', 'mode']
i94addr.columns = ['id', 'state']
i94visa.columns = ['id', 'visa']

In [10]:
## Get size of lookup files
x = {}
x['country_data'] = i94cit_res.shape
x['port_data'] = i94port.shape
x['mode_data'] = i94mode.shape
x['state_data'] = i94addr.shape
x['visa_data'] = i94visa.shape

In [11]:
for key, value in x.items():
    print(f"The size of {key} is {value[0]} rows and {value[1]} columns")

The size of country_data is 289 rows and 2 columns
The size of port_data is 660 rows and 2 columns
The size of mode_data is 4 rows and 2 columns
The size of state_data is 55 rows and 2 columns
The size of visa_data is 3 rows and 2 columns


In [12]:
# Cleaning data and writing to .csv files
i94cit_res.to_csv('lookup/country.csv', header=True)
i94port.to_csv('lookup/port.csv', header=True)
i94mode.to_csv('lookup/mode.csv', header=True)
i94addr.to_csv('lookup/state.csv', header=True)
i94visa.to_csv('lookup/visa.csv', header=True)

### Gather all data in S3 bucket

In [603]:
def uploadDirectory(path, bucketname, prefix):
    """
    This function uploads a folder to S3 bucket
    """
    for root,dirs,files in os.walk(path):
        for file in files:
            client.upload_file(os.path.join(root,file), bucketname, prefix + '/' + file)

In [30]:
## Upload lookup tables
uploadDirectory('lookup', bucket.name, 'udacity-capstone/rawdata/lookup')

In [604]:
## Upload immigration data
uploadDirectory('sas_data', bucket.name, 'udacity-capstone/rawdata/immigration')

In [32]:
## Upload Demographics data
response = client.upload_file('airport-codes_csv.csv', bucket.name , 'udacity-capstone/rawdata/airport/airport-codes.csv')

In [33]:
## Upload Airpot data
response = client.upload_file('us-cities-demographics.csv', bucket.name, 'udacity-capstone/rawdata/demogs/demogs.csv')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [34]:
def count_nulls(df):
    """
    Counts number of columns having NA values and return the list of those columns as a dataframe if there is any

    Parameters
    ----------
    df: pd.DataFrame
        Data

    Returns
    ----------
    pd.DataFrame
        Number of NA/Null values in each column unless there is not any.
    """
    # check missing values, including infinites, for numeric columns
    na_df = pd.DataFrame(df.select_dtypes(include=[np.number]).replace([np.inf, -np.inf], np.nan).isnull().sum(),
                         columns=["Null_Count"])  # Detect NaN in numeric arrays, None/NaN in object arrays
    na_df = na_df.append(pd.DataFrame(df.select_dtypes(include=['object']).isin(["", None, "null", "Null", np.nan,
                                                                               "None", "none", "nan", "NaN",pd.NaT,"NaT",
                                                                               np.inf]).sum(), columns=["Null_Count"]))

    na_df = na_df.append(pd.DataFrame(df.select_dtypes(include=['datetime64']).isin(["", None,
                                                                                     np.nan,pd.NaT,"NaT"]).sum(),
                                      columns=["Null_Count"]))

    na_df = na_df[na_df.Null_Count != 0].sort_values("Null_Count", ascending=False)

    if len(na_df) != 0:
        print("Number of columns with NA/Null values: ", len(na_df))
        return na_df
    else:
        print("There is no any column with NA/Null values!")
        return None

### Explore Immigration data

In [613]:
## Get list of immigration files
files = []
keyword = config['S3']['rawdata'] + '/' + config['RAWDATA']['immigration_raw'] + '/'
keyword = r'%s' % keyword
for my_bucket in bucket.objects.all():
    string = re.compile(keyword) 
    check = string.search(my_bucket.key) 
    if check is not None:
        string = re.compile(r'.crc')
        remove = string.search(my_bucket.key) ## .crc files were created in S3 bucket. Thus removing those 
        if remove is None:
            files.append(my_bucket.key)

In [614]:
df = list()

for i in files:
    tmp = pd.read_parquet('s3://' + bucket.name + '/' + i)
    df.append(tmp)

In [615]:
df = pd.concat(df)

In [616]:
df.shape

(3096313, 28)

In [490]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [491]:
df.columns.tolist()

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'arrdate',
 'i94mode',
 'i94addr',
 'depdate',
 'i94bir',
 'i94visa',
 'count',
 'dtadfile',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'biryear',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype']

In [492]:
count_nulls(df)/df.shape[0]

Number of columns with NA/Null values:  17


Unnamed: 0,Null_Count
entdepu,0.999873
occup,0.997376
insnum,0.963276
visapost,0.607577
gender,0.133794
i94addr,0.0492818
depdate,0.0460086
entdepd,0.0447077
matflag,0.0447077
airline,0.0270086


Columns 'entdepu' 'occup' 'insnum' are >95% nulls and thus will be dropped

Further, since the purpose of this project is to study immigration patterns, the following columns can be dropped: dtaddto, count, entdepa, entdepd, matflag, admnum

In [493]:
drop_null_columns = ['entdepu', 'occup', 'insnum']
drop_useless_columns = ['dtaddto' ,'count', 'entdepa', 'entdepd', 'matflag', 'admnum', 'dtadfile']

In [494]:
df = df.drop(drop_null_columns, axis = 1)

In [495]:
df = df.drop(drop_useless_columns, axis = 1)

In [496]:
## Check duplicates in data
df.drop_duplicates().shape

(3096313, 18)

No Duplicates found in the Immigration data

#### Handling data formats

In [497]:
df.dtypes

cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port      object
arrdate     float64
i94mode     float64
i94addr      object
depdate     float64
i94bir      float64
i94visa     float64
visapost     object
biryear     float64
gender       object
airline      object
fltno        object
visatype     object
dtype: object

cicid , i94cit, i94res, i94mode, i94bir, i94visa, i94yr, i94mon and biryear columns can be converted to integer fields.

In [498]:
## Date and alike fields
df[['arrdate', 'depdate']].head()

Unnamed: 0,arrdate,depdate
0,20573.0,
1,20551.0,
2,20545.0,20691.0
3,20545.0,20567.0
4,20545.0,20567.0


arrdate, depdate are SAS dates formats. These will be converted to date formats

The data type conversion will be done after handling null values in each columns

#### Handling missing values

In [499]:
x = count_nulls(df)

Number of columns with NA/Null values:  9


In [500]:
x

Unnamed: 0,Null_Count
visapost,1881250
gender,414269
i94addr,152592
depdate,142457
airline,83627
fltno,19549
i94bir,802
biryear,802
i94mode,239


In [501]:
df[x.index].head()

Unnamed: 0,visapost,gender,i94addr,depdate,airline,fltno,i94bir,biryear,i94mode
0,,,,,,,37.0,1979.0,
1,SEO,M,AL,,,296.0,25.0,1991.0,1.0
2,,M,MI,20691.0,OS,93.0,55.0,1961.0,1.0
3,,,MA,20567.0,AA,199.0,28.0,1988.0,1.0
4,,,MA,20567.0,AA,199.0,4.0,2012.0,1.0


Replacing NULLS in i94bir, biryear and i94mode with -10 as default value.

None present in other columns seems fine. No changes will be made here

In [505]:
remove_na_cols = ['i94bir', 'biryear', 'i94mode']
to_int_cols = ["cicid" , "i94cit", "i94res", "i94mode", "i94bir", "i94visa", "i94yr", "i94mon", "biryear"]

In [503]:
df.loc[:,remove_na_cols] = df.loc[:,remove_na_cols].fillna(-10)

In [506]:
df.loc[:,to_int_cols] = df.loc[:,to_int_cols].astype(int)

In [507]:
# Convert SAS dates
df.loc[:,'arrdate'] = pd.to_timedelta(df.loc[:,'arrdate'], unit='D') + pd.Timestamp('1960-1-1')
df.loc[:,'depdate'] = pd.to_timedelta(df.loc[:,'depdate'], unit='D') + pd.Timestamp('1960-1-1')

In [543]:
df.loc[:,'arrdate'] = df.loc[:,'arrdate'].dt.date
df.loc[:,'depdate'] = df.loc[:,'depdate'].dt.date

In [177]:
df.dtypes

cicid                int64
i94yr                int64
i94mon               int64
i94cit               int64
i94res               int64
i94port             object
arrdate     datetime64[ns]
i94mode              int64
i94addr             object
depdate     datetime64[ns]
i94bir               int64
i94visa              int64
dtadfile             int64
visapost            object
biryear              int64
gender              object
airline             object
fltno               object
visatype            object
dtype: object

In [178]:
count_nulls(df)

Number of columns with NA/Null values:  6


Unnamed: 0,Null_Count
visapost,1881249
gender,414268
i94addr,152591
depdate,142456
airline,83626
fltno,19548


Data cleaning and transformations for immigration data completed. Immigration table will be our Fact table.

Unique dates will be extracted from arrdate & depdate to form time dimension table

#### Create time dimension table

In [565]:
time = df['arrdate'].unique()

In [566]:
time = np.append(time, df['depdate'].unique())

In [567]:
time = pd.DataFrame(time)

In [568]:
time.columns = ['date']

In [569]:
time['date'] = pd.to_datetime(time['date'], errors = 'coerce')

In [574]:
time = time.dropna()

In [575]:
time['day'] = time['date'].dt.day.astype(int)
time['month'] = time['date'].dt.month.astype(int)
time['year'] = time['date'].dt.year.astype(int)
time['weekofyear'] = time['date'].dt.week.astype(int)
time['dayofweek'] = time['date'].dt.dayofweek.astype(int)

In [576]:
time.head()

Unnamed: 0,date,day,month,year,weekofyear,dayofweek
0,2016-04-29,29,4,2016,17,4
1,2016-04-07,7,4,2016,14,3
2,2016-04-01,1,4,2016,13,4
3,2016-04-02,2,4,2016,13,5
4,2016-04-03,3,4,2016,13,6


### Explore Airport codes

In [180]:
df = pd.read_csv('s3://' + bucket.name + '/udacity-capstone/rawdata/airport/airport-codes.csv')

In [185]:
df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [186]:
df.shape

(55075, 12)

In [187]:
count_nulls(df)

Number of columns with NA/Null values:  7


Unnamed: 0,Null_Count
iata_code,45886
continent,27719
local_code,26389
gps_code,14045
elevation_ft,7006
municipality,5676
iso_country,247


In [184]:
df.drop_duplicates().shape

(55075, 12)

No duplicates found in Airport codes

In [206]:
# Split latitude and longtitude into separate columns
coordinates = df['coordinates'].str.split(",", expand = True)
df['latitude'] = coordinates[0]
df['longtitude'] = coordinates[1]

In [217]:
## Loading Airport lookup table from SAS description
port = pd.read_csv("s3://" + bucket.name + '/udacity-capstone/rawdata/lookup/port.csv')

In [215]:
port.shape

(660, 3)

In [216]:
port.head()

Unnamed: 0.1,Unnamed: 0,id,port
0,0,ALC,"ALCAN, AK"
1,1,ANC,"ANCHORAGE, AK"
2,2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,3,DAC,"DALTONS CACHE, AK"
4,4,PIZ,"DEW STATION PT LAY DEW, AK"


Port file has 660 rows vs ~55k in Airport codes. Mapping the port names in lookup table vs Airport codes table

In [219]:
coordinates = port['port'].str.split(",", expand = True)
port['port'] = coordinates[0]
port['state'] = coordinates[1]

In [223]:
df[['name']].isin(port['port']).sum()

name    0
dtype: int64

No match between the two tables. Thus, dropping Airport codes table and making lookup table as a Dimension Table

In [228]:
port = port.drop(['Unnamed: 0'], axis = 1)

### Explore US Demographics data

In [230]:
bucket.name

'kashish-ups-uw'

In [333]:
df = pd.read_csv('s3://' + bucket.name + '/udacity-capstone/rawdata/demogs/demogs.csv', sep = ";")

In [318]:
df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [233]:
df.shape

(2891, 12)

In [234]:
count_nulls(df)

Number of columns with NA/Null values:  5


Unnamed: 0,Null_Count
Average Household Size,16
Number of Veterans,13
Foreign-born,13
Male Population,3
Female Population,3


Low missing value count in Demographics data

In [235]:
df.drop_duplicates().shape

(2891, 12)

No row duplicates found but 2891 seems too high for 50 odd US states. The data seems to be at City level. Need to check if city is duplicated.

In [238]:
df.groupby('City').agg({'Race': 'count'})

Unnamed: 0_level_0,Race
City,Unnamed: 1_level_1
Abilene,5
Akron,5
Alafaya,4
Alameda,5
Albany,10
...,...
Yonkers,5
Yorba Linda,5
Youngs,5
Yuba City,5


In [291]:
df[df['City'] == 'Albany']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
1165,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,American Indian and Alaska Native,445
1260,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,White,17160
1470,Albany,New York,32.8,47627.0,50825.0,98452,3643.0,11948.0,2.08,NY,American Indian and Alaska Native,1611
1616,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,Asian,650
1809,Albany,New York,32.8,47627.0,50825.0,98452,3643.0,11948.0,2.08,NY,Hispanic or Latino,9368
2000,Albany,New York,32.8,47627.0,50825.0,98452,3643.0,11948.0,2.08,NY,White,58368
2050,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,Black or African-American,53440
2278,Albany,New York,32.8,47627.0,50825.0,98452,3643.0,11948.0,2.08,NY,Black or African-American,31303
2472,Albany,New York,32.8,47627.0,50825.0,98452,3643.0,11948.0,2.08,NY,Asian,8090
2552,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,Hispanic or Latino,1783


In [320]:
df[df['State'] == 'Arkansas']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
38,North Little Rock,Arkansas,33.6,31671.0,34835.0,66506,4130.0,2787.0,2.62,AR,Hispanic or Latino,4860
51,Jonesboro,Arkansas,32.6,35666.0,38240.0,73906,3682.0,3222.0,2.44,AR,Black or African-American,14599
82,Springdale,Arkansas,31.8,36840.0,43614.0,80454,3397.0,19969.0,3.04,AR,American Indian and Alaska Native,547
141,Fort Smith,Arkansas,34.9,43346.0,44849.0,88195,3408.0,13177.0,2.44,AR,White,66004
302,Fort Smith,Arkansas,34.9,43346.0,44849.0,88195,3408.0,13177.0,2.44,AR,Black or African-American,9851
318,Fayetteville,Arkansas,27.1,41959.0,40873.0,82832,4744.0,6313.0,2.28,AR,White,68830
367,North Little Rock,Arkansas,33.6,31671.0,34835.0,66506,4130.0,2787.0,2.62,AR,White,34118
455,North Little Rock,Arkansas,33.6,31671.0,34835.0,66506,4130.0,2787.0,2.62,AR,Black or African-American,30766
468,Little Rock,Arkansas,36.6,96997.0,100989.0,197986,12343.0,16640.0,2.36,AR,White,102312
651,Jonesboro,Arkansas,32.6,35666.0,38240.0,73906,3682.0,3222.0,2.44,AR,White,56626


In [342]:
df[df['State'] == 'New York']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
48,Yonkers,New York,38.0,96580.0,104538.0,201118,4801.0,61247.0,2.8,NY,Hispanic or Latino,73608
106,Buffalo,New York,33.1,124537.0,133529.0,258066,11231.0,24630.0,2.27,NY,Asian,14518
217,New Rochelle,New York,40.6,38871.0,40967.0,79838,2780.0,26960.0,2.85,NY,White,44435
222,Syracuse,New York,30.3,69462.0,74690.0,144152,5845.0,17733.0,2.39,NY,Asian,9386
225,Buffalo,New York,33.1,124537.0,133529.0,258066,11231.0,24630.0,2.27,NY,Hispanic or Latino,29656
392,Syracuse,New York,30.3,69462.0,74690.0,144152,5845.0,17733.0,2.39,NY,White,88679
401,Cheektowaga,New York,40.7,37476.0,38599.0,76075,5408.0,3532.0,2.3,NY,White,63541
409,Brentwood,New York,34.2,31395.0,32397.0,63792,1492.0,27058.0,4.98,NY,American Indian and Alaska Native,4242
494,Yonkers,New York,38.0,96580.0,104538.0,201118,4801.0,61247.0,2.8,NY,Black or African-American,38731
537,Mount Vernon,New York,38.5,31876.0,36745.0,68621,2064.0,23777.0,2.85,NY,American Indian and Alaska Native,356


For duplicate city rows all columns are same except for the last column 'Count' which is the # of people of that race

Exploring data at Race and State level

In [239]:
df.groupby('Race').agg({'Race': 'count'})

Unnamed: 0_level_0,Race
Race,Unnamed: 1_level_1
American Indian and Alaska Native,539
Asian,583
Black or African-American,584
Hispanic or Latino,596
White,589


All races have nearly equal entries in the data

In [249]:
df.groupby('State').agg({"Race": "nunique"}).reset_index().groupby('Race').agg({'State': 'count'})

Unnamed: 0_level_0,State
Race,Unnamed: 1_level_1
3,1
5,48


Since our Fact table is at State level. Demographics table will be summarized at group level and merged with State lookup table

In [394]:
df.columns = df.columns.str.lower().str.replace(' ', '_')

In [407]:
tmp = pd.pivot_table(df, values = 'count', columns = ['race'] , index = 'state_code')

In [408]:
tmp.columns = tmp.columns.str.lower().str.replace(' ', '_')

In [422]:
tmp.columns = ['american_indian_and_alaska_native', 'asian',
       'black_or_african-american', 'hispanic_or_latino', 'white']

In [425]:
tmp = tmp.reset_index()

In [426]:
type(tmp)
#tmp = tmp.drop('race', axis = 1)

pandas.core.frame.DataFrame

In [430]:
## Summarizing Demographics table at State level
df = df.groupby(['state_code', 'state', 'city']).agg({'median_age': np.mean,
                                                 'male_population': np.mean,
                                                 'female_population': np.mean,
                                                 'total_population': np.mean,
                                                 'number_of_veterans': np.mean,
                                                 'foreign-born': np.mean,
                                                 'average_household_size': np.mean                                                }).reset_index(). \
                                                groupby(['state_code', 'state']). \
                                                agg({'median_age': np.mean,
                                                 'male_population': np.sum,
                                                 'female_population': np.sum,
                                                 'total_population': np.sum,
                                                 'number_of_veterans': np.sum,
                                                 'foreign-born': np.sum,
                                                 'average_household_size': np.mean}).reset_index()

In [438]:
tmp.index = tmp['state_code']

In [439]:
df.index = df['state_code']

In [440]:
tmp.head()

Unnamed: 0_level_0,state_code,american_indian_and_alaska_native,asian,black_or_african-american,hispanic_or_latino,white
state_code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
AK,AK,36339.0,36825.0,23107.0,27261.0,212696.0
AL,AL,1347.333333,4109.857143,74438.285714,5616.142857,71274.285714
AR,AR,1563.5,4412.4,24934.666667,12968.833333,64122.166667
AZ,AZ,8106.75,14323.9375,18513.875,94259.8125,224475.6875
CA,CA,3087.584615,33409.779412,15051.536765,71944.992701,108796.562044


In [441]:
df.head()

Unnamed: 0_level_0,state_code,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign-born,average_household_size
state_code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
AK,AK,Alaska,32.2,152945.0,145750.0,298695,27492.0,33258.0,2.77
AL,AL,Alabama,36.228571,497248.0,552381.0,1049629,71543.0,52154.0,2.434286
AR,AR,Arkansas,32.766667,286479.0,303400.0,589879,31704.0,62108.0,2.53
AZ,AZ,Arizona,35.0375,2227455.0,2272087.0,4499542,264505.0,682313.0,2.774375
CA,CA,California,36.182482,12278281.0,12544179.0,24822460,928270.0,7448257.0,3.100949


In [443]:
df = df.drop('state_code', axis = 1)
tmp = tmp.drop('state_code', axis = 1)

In [444]:
df = df.join(tmp)

In [447]:
df = df.reset_index()

In [453]:
df = df.fillna(0)

Merge state data with loopup table

In [250]:
state = pd.read_csv('s3://' + bucket.name + '/udacity-capstone/rawdata/lookup/state.csv')

In [251]:
state.shape

(55, 3)

In [252]:
state.head()

Unnamed: 0.1,Unnamed: 0,id,state
0,0,AL,ALABAMA
1,1,AK,ALASKA
2,2,AZ,ARIZONA
3,3,AR,ARKANSAS
4,4,CA,CALIFORNIA


In [448]:
state = state.drop('Unnamed: 0', axis = 1)

In [449]:
df = df.merge(state, left_on = 'state_code', right_on = 'id', how = "right")

In [454]:
df = df.fillna(-10)

#### Explore Land Temperature data

This data was not present in the workspace folder given for the project. Hence, it is being skipped for the purpose of this data model.

#### Explore Country/Mode/Visa lookup tables

##### Country data

In [455]:
country = pd.read_csv("s3://" + bucket.name + '/udacity-capstone/rawdata/lookup/country.csv')

In [456]:
country.shape

(289, 3)

In [457]:
country.head()

Unnamed: 0.1,Unnamed: 0,id,country
0,0,582,Mexico
1,1,236,AFGHANISTAN
2,2,101,ALBANIA
3,3,316,ALGERIA
4,4,102,ANDORRA


In [459]:
count_nulls(country)

There is no any column with NA/Null values!


In [460]:
country.drop_duplicates().shape

(289, 3)

In [462]:
country = country.drop('Unnamed: 0', axis = 1)

In [463]:
country.head()

Unnamed: 0,id,country
0,582,Mexico
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


##### Mode data

In [464]:
mode = pd.read_csv("s3://" + bucket.name + '/udacity-capstone/rawdata/lookup/mode.csv')

In [465]:
mode.shape

(4, 3)

In [466]:
mode.head()

Unnamed: 0.1,Unnamed: 0,id,mode
0,0,1,Air
1,1,2,Sea
2,2,3,Land
3,3,9,Not reported


In [467]:
mode = mode.drop('Unnamed: 0', axis = 1)

##### Visa data

In [468]:
visa = pd.read_csv("s3://" + bucket.name + '/udacity-capstone/rawdata/lookup/visa.csv')

In [469]:
visa.shape

(3, 3)

In [470]:
visa.head()

Unnamed: 0.1,Unnamed: 0,id,visa
0,0,1,Business
1,1,2,Pleasure
2,2,3,Student


In [471]:
visa = visa.drop('Unnamed: 0', axis = 1)

The above 3 tables will form a part of our Dimensions table in the data model

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The data model chosen for this project is the Star schema. The following tables will be part of the final data model:

* Fact Table: We want to study immigration data to the U.S. Thus, the immigration data provided was chosen as the Fact table. Further, each row in the data corresponds to one immigration instance. This table will join to the below dimension tables.

* Dimension Tables:
    * Port data: This table has the details on the port of entry . Joins with Fact table on i94port
    * U.S. Demographics data: This table has the demographic details of each state in the U.S. Joins with fact table on i94addr.
    * Country data: This table has country level data. Joins with Fact table on i94cit and i94res
    * Mode data: Mode of transportaton used for arriving into U.S.
    * Visa data: Visa type used for entry into U.S.
    * Time data: Information on day/week/month etc. for all unique dates in Immigration data

The above star schema will be hosted in a data warehouse on Redshift. 

#### 3.2 Mapping Out Data Pipelines

The following steps need toe be followed to complete the data pipeline:

* Load Immigration data from S3
* Clean and Transform the data, thereby, creating our Fact table
* Select unique dates from arrival date and departure dates columns
* Extract day/month/year etc. from above dates to creare our Time dimension table
* Dump above two tables to S3 for staging
* Load and clean the remaining data i.e. country, mode, visa, port and U.S. demographics data
* Dump above data to S3 staging
* Load the data into the necessary tables on Redshift
* Run data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [1]:
import etl

In [2]:
etl.main()

Loading Immigration data
Immigration data loaded.
Cleaning of Immigration data completed. Writing to S3
Data quality check: Immigration data has 3096313 rows and 18 columns
Immigration data written to S3
Creating time dimension table
Data quality check: Time data has 235 rows and 6 columns
Time dimension table written to S3
Loading Demogs data and state lookup table
Data loaded
Cleaning Demographics data
Data Cleaning completed
Data quality check: Demogs data has 55 rows and 14 columns
Demogs dimension table written to S3
Reading data
Data loaded
Data cleaned
Data quality check: Country data has 289 rows and 2 columns
Data quality check: Visa data has 3 rows and 2 columns
Data quality check: Mode data has 4 rows and 2 columns
Writing data to S3
Data written to S3
Loading immigration table to Redshift
Table immigration loaded to Redshift
Data Quality check: Loading ml_analytics.fact_immigration table to Redshift took 0 minutes
Loading time table to Redshift
Table time loaded to Redshift

#### 4.2 Data Quality Checks

The data quality checks that are incoporated into this pipeline are:
* Checking data being stored to staging doesn't have 0 rows
* Checking time taken to load the files from S3 to Redshift
* Checking if the tables in Redshift are not empty
 
Run Quality Checks

The first two data quality checks have been incorporated as part of the pipeline. Running the last one below

In [36]:
## Connect to DB
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['REDSHIFT'].values()))
cur = conn.cursor()

In [37]:
tables = ['fact_immigration', 'dim_time', 'dim_demogs', 'dim_country', 'dim_mode', 'dim_visa']

In [38]:
schema = "ml_analytics"
command = f"""select count(1) from {{}}.{{}}"""
for i in tables:
    cur.execute(command.format(schema, i))
    conn.commit()
    x = cur.fetchall()
    print(f"Table {i} contains {x[0][0]} rows")

Table fact_immigration contains 12385252 rows
Table dim_time contains 940 rows
Table dim_demogs contains 110 rows
Table dim_country contains 289 rows
Table dim_mode contains 16 rows
Table dim_visa contains 3 rows


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Note: Nulls in integer columns are coded as -10 and those in character variables as None.

#### Immigration Fact Table

This data comes from the immigration data of the U.S. government. Following are the fields in this table:

* cicid : Unique Identifier (Primary Key)
* i94yr : Year of incident
* i94mon : Month of incident
* i94cit : 3 digit code of the country the person was born in. Foreign Key to the country dimension table
* i94res :  3 digit code of the country the person resides in. Foreign Key to the country dimension table
* i94port : Port of Entry into U.S. Foreign Key to port table
* arrdate : Arrival date of the person into U.S. Foreign Key to the time dimension table
* i94mode : Mode of arrival into U.S. Foreign Key to the mode dimension table
* i94addr : State of arrival into U.S. Foreign Key to the state level
* depdate : Date of departure from U.S. Foreign Key to the time dimension table
* i94bir : Age of Person in Years
* i94visa : Type of visa issued. Foreign key to the visa dimension table
* visapost : Department of State where where Visa was issued           
* biryear : Year of birth of the person
* gender : Gender of the person
* airline : Airline the person used
* fltno : Flight No. the person used
* visatype : Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

#### Demogs dimension Table:

This data comes from the U.S. Demographics data and SAS description file provided for the project. Following are the fields in this table:

* id : State Code. Foreign Key to Fact table. Also the primary key
* state : Name of the State
* median_age : Average of the median age at city level
* male_population : Total male population
* female_population : Total female population
* total_population : Total population
* number_of_veterans : # of veterans residing in the state
* foreign_born : # of foreign-born person residing in the state
* average_household_size : Average of the average household size at city level
* american_indian_and_alaska_native : # of people residing who are of race American India/Native Alaskan
* asian: # of people residing who are of race American India/Native Alaskan
* black_or_african : # of people residing who are of race Black/African-American
* hispanic_or_latino : # of people residing who are of race Hispanic/Latino
* white : # of people residing who are of race White

#### Port dimension Table:

This data comes from the SAS description file provided for the project. Following are the fields in this table:

* id : Foreign Key to the Fact table. Also the Primary Key
* port : Port of Entry into U.S.
* state : State of Entry into U.S.

#### Country dimension Table:

This data comes from the SAS description file provided for the project. Following are the fields in this table:

* id : Foreign Key to the Fact table. Also the Primary Key
* country : Country Name

#### Mode dimension Table:

This data comes from the SAS description file provided for the project. Following are the fields in this table:

* id : Foreign Key to the Fact table. Also the Primary Key
* mode : Mode of Transport used

#### Visa dimension Table:

This data comes from the SAS description file provided for the project. Following are the fields in this table:

* id : Foreign Key to the Fact table
* visa : Type of visa issued

#### Time dimension Table:

This data comes from the unique dates coming from the immigration data. Following are the fields in this table:

* date : Date in the format YYYY-MM-DD. Also the Primary Key.
* day : Day part
* month : Month part
* year : Year part
* weekofyear : Week of the year
* dayofweek : Day of the week


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

We will be using AWS for creating and managing our data-warehouse. The prime objective for this is to host everything on the cloud since AWS provides economic options for processing and storing of data. The following tools will be used for creating the pipeline:
* S3: For storing raw and staging data. With S3 we can be worry-free that our data is secure and properly backed-up. Further, AWS provides great tools to interface with data stored in S3.
* Redshift: For storing our data-warehouse. Redshift has a simple SQL like structure so will be easy for analytics team to work on this database. Further, being an MPP database we can secured that our database can handle possible increases in data size in future. 
* EC2: for running the python code which will clean and transform the data for staging. It is assumed that AWS credentials are configured on the EC2 server. If not, credentials need to be configured in either os environment variables through a config file or stored in ~/.aws/credentials/. This, provides an easy to use interface where in the credentials can be configured in EC2 without exposing the credentials to the user.

* Propose how often the data should be updated and why.

The update of Immigration data depends on the plan purchasesd (monthly, quarterly, annual etc.). The immigration fact table and correspondingly the time dimension table will be updated as frequently as we get newer data from U.S. immigration bureau. Along with this, we will need to update port, mode and visa dimension tables at the same frequency.

Demographics data update depends on how often demographics data is updated by the U.S. Government (annually, bi-annually etc.)

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.

If the data was increased to 100x, we will need to change the data pipeline to consume only newer data. This can be achieved easily in Airlow which will also help us manage and monitor the pipeline at scale.

Further, we would need to either add more EC2 clusters or use Spark for processing of data (depending on size of the refreshed data).

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

In this case, we can configure our pipelines to update every night. Airflow would be implemented here to schedule and monitor the health of the pipeline. Retries can be added in the airflow module to re-run the pipeline in case of failure.

 * The database needed to be accessed by 100+ people.

Redshift is easily-scalable and thus, we can add more nodes to the Redshift cluster to handle all the extra load. If user base grows even further, it might make sense to move to a Data lake structure to avoid the high-prices of Redhisft cluster to handly say 1000+ users.