# Immigration data pipeline
### Data Engineering Capstone Project

#### Project Summary
--This data intended to make build an ETL pipeline for the data scientists and analysts. To do so, we had to explore, clean and check our data at the basic level. We have have included 6 different data sources here. We have the data for all the airlines, cities, temperature, immigration information as well as port of entry dataset that I uploaded. 

--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
import psycopg2

import configparser
import os

from sqlalchemy import create_engine

from pyspark.sql import SparkSession

from pyspark.sql.functions import udf, col
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
from functions import cleaning_task, quality_check

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use?

First, I want to model the data with a star schema. But with a given data, it wasn't possilble until I found data set called 'port of entry' and with a little bit of transformation, I was able to create a star schema. 
Second, I want to check if there's any missing data in any one of the, 
Third, I want to transform the bigger data into parquet format, so that it would be easier for the data scientists for their ad hoc need. 
Fourth, I want to do basic data quality checks on each one of them by getting an hardware and column infmormation. 

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

With a temperature data, we have daily average temperature for all most of the big cities in the world. 
With us cities demographics data, we have a demographics data on most of the big cities in the US. 
With airports data, we have a detailed information on each of the domestic airports. 
With immigration data, we have a time series collective and exhaustive data on port of entries for a given time frame. 

And I gathered port of entry data that has the code for all of the port of entries in United States. 

In [2]:
# Read in the data here
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp = pd.read_csv(fname)
temp.head(n=2)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E


In [4]:
cities = pd.read_csv('us-cities-demographics.csv')
cities.head(n=2)

Unnamed: 0,City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count
0,Silver Spring;Maryland;33.8;40601;41862;82463;...
1,Quincy;Massachusetts;41.0;44129;49500;93629;41...


In [5]:
air = pd.read_csv('airport-codes_csv.csv')
air.head(n=2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [7]:

spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()
immig_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
#write to parquet
#immig_spark.write.parquet("sas_data_back")
immig_spark=spark.read.parquet("sas_data_back")
immig_spark.show(2)
print ('done reading')


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

### Step 2: Explore and Assess the Data

#### Cleaning Steps
Preliminary exploration showed that a minor percentage of the each of the data are missing, so I decided to drop any duplicate and then any missing data. 

Port data had some problems with trailing and leading whitespace, so trimmed the whitespaces.

In [8]:
# Performing cleaning tasks here

temp = cleaning_task(temp)

dt                               0.0
AverageTemperature               0.0
AverageTemperatureUncertainty    0.0
City                             0.0
Country                          0.0
Latitude                         0.0
Longitude                        0.0
dtype: float64

In [9]:
air = cleaning_task(air)

ident           0.0
type            0.0
name            0.0
elevation_ft    0.0
continent       0.0
iso_country     0.0
iso_region      0.0
municipality    0.0
gps_code        0.0
iata_code       0.0
local_code      0.0
coordinates     0.0
dtype: float64

In [10]:
cols = ''.join(cities.columns.tolist() )
cities = (cities[cols].str.split(";",expand=True,) )
cities.columns = (cols.split(';'))

cities = cleaning_task(cities)

City                      0.0
State                     0.0
Median Age                0.0
Male Population           0.0
Female Population         0.0
Total Population          0.0
Number of Veterans        0.0
Foreign-born              0.0
Average Household Size    0.0
State Code                0.0
Race                      0.0
Count                     0.0
dtype: float64

In [11]:
print (type(immig_spark))

from pyspark.sql.functions import col,sum

immig_spark = immig_spark.dropDuplicates()

immig_spark.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in immig_spark.columns)).show()

immig_spark.na.drop().show()


<class 'pyspark.sql.dataframe.DataFrame'>
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+---

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model: 

The mapping of the datasets can be connected through the following channels: 
  1. coordinates column of airlines data can be used to connect to longitude and lattitude columns of temperature data. 
  2. city column in temperature data can be used to connect the city column of the cities data. 
  3. i94port column from the immigration dataset and iata_code from airports dataset can be used to connect to the code column of the port dataset. 
  4. The location column of the port data can be further exploted by extracting city information column to connect to the cities, temperature. 
  
  To summarize, we have a star schema where the center or the fact table of the schema would be port dataset, while all the rest would dimensions tables.



#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model: 

'Port of Entry' csv file came with a four different columns, with city and location columns were repeated twice. So I transformed the dataset so that all the city data is in column while all the location data is in another column. 


 


In [12]:
temp.head(n=2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E


In [13]:
air.head(n=2)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
10376,ADC,small_airport,Andakombe Airport,3600.0,OC,PG,PG-EHG,Andekombe,AYAN,ADC,ADK,"145.744722222, -7.13722222222"


In [14]:
air[air.iata_code=='UTK']

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"


In [15]:
cities.head(n=2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723


In [9]:
pd.set_option('display.max_columns', 2000)

immig_df = immig_spark.limit(10).toPandas()
immig_df.head(n=10)


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,57.0,2.0,1.0,20160430,ACK,,G,O,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,66.0,2.0,1.0,20160430,ACK,,G,O,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,41.0,2.0,1.0,20160430,ACK,,G,O,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,27.0,2.0,1.0,20160430,ACK,,G,O,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,26.0,2.0,1.0,20160430,ACK,,G,O,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [17]:
port = pd.read_csv('port_of_entry.csv')
port1 = port.iloc[:, :2].head()
port2 = port.iloc[:, 2:].head()
port2.columns = ['Code','Location']
port = pd.concat([port1, port2])

port.head()

Unnamed: 0,Code,Location
0,ABE,"Aberdeen, WA"
1,ABQ,"Albuquerque, NM"
2,ADT,"Amistad Dam, TX"
3,ALP,"Alpena, MI"
4,AGN,"Algonac, MI"


In [18]:
port[['City','State']] =port.Location.str.split(",",expand=True)
port.drop(['Location'], axis = 1,inplace = True)
port.head()



Unnamed: 0,Code,City,State
0,ABE,Aberdeen,WA
1,ABQ,Albuquerque,NM
2,ADT,Amistad Dam,TX
3,ALP,Alpena,MI
4,AGN,Algonac,MI


In [19]:

port['State'] = port['State'].str.strip()
port['Code'] = port['Code'].str.strip()
port['City'] = port['City'].str.strip()

port.head()

Unnamed: 0,Code,City,State
0,ABE,Aberdeen,WA
1,ABQ,Albuquerque,NM
2,ADT,Amistad Dam,TX
3,ALP,Alpena,MI
4,AGN,Algonac,MI


## Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [29]:
# Write code here

immig_spark.write.partitionBy("i94mode").mode("Overwrite").parquet("immigration_table.parquet")


cities.to_json(r'cities.json')
air.to_json(r'air.json')
temp.to_json(r'temp.json')
port = port.reset_index(drop=True)
port.to_json(r'port.json')


print ('done')

done


"\n#df.to_parquet('df.parquet.gzip',compression='gzip')\ncities.to_parquet('cities.parquet.gzip', compression='gzip')\nair.to_parquet('air.parquet.gzip', compression='gzip')\ntemp.to_parquet('temp.parquet.gzip', compression='gzip')\nprint ('done')\n"

#### 4.2 Data Quality Checks and Data dictionary
The data quality checks I performed is applying pandas info() function to obtain general data integrity information on the column data type, size as well as data dictionary. All the data came from Udacity by default except the port data as this data came from customs agency in the following link: 
https://help.cbp.gov/app/answers/list/search/1/kw/list%20of%20Port%20codes/suggested/1
 
Run Quality Checks

In [2]:
# Perform quality checks here
cities = pd.read_json(r'cities.json')
quality_check(cities)

print ('done')

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2891 entries, 0 to 999
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2891 non-null object
Female Population         2891 non-null object
Total Population          2891 non-null int64
Number of Veterans        2891 non-null object
Foreign-born              2891 non-null object
Average Household Size    2891 non-null object
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(1), int64(2), object(9)
memory usage: 293.6+ KB
None
done


In [5]:
air = pd.read_json(r'air.json')
quality_check(air)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 678 entries, 10376 to 54777
Data columns (total 12 columns):
ident           678 non-null object
type            678 non-null object
name            678 non-null object
elevation_ft    678 non-null int64
continent       678 non-null object
iso_country     678 non-null object
iso_region      678 non-null object
municipality    678 non-null object
gps_code        678 non-null object
iata_code       678 non-null object
local_code      678 non-null object
coordinates     678 non-null object
dtypes: int64(1), object(11)
memory usage: 68.9+ KB
None


In [3]:
temp = pd.read_json(r'temp.json')

quality_check(temp)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB
None


In [4]:
port = pd.read_json(r'port.json')

quality_check(port)


<class 'pandas.core.frame.DataFrame'>
Int64Index: 10 entries, 0 to 9
Data columns (total 3 columns):
Code     10 non-null object
City     10 non-null object
State    10 non-null object
dtypes: object(3)
memory usage: 320.0+ bytes
None
done


In [10]:
immig_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 28 columns):
cicid       10 non-null float64
i94yr       10 non-null float64
i94mon      10 non-null float64
i94cit      10 non-null float64
i94res      10 non-null float64
i94port     10 non-null object
arrdate     10 non-null float64
i94mode     10 non-null float64
i94addr     10 non-null object
depdate     10 non-null float64
i94bir      10 non-null float64
i94visa     10 non-null float64
count       10 non-null float64
dtadfile    10 non-null object
visapost    10 non-null object
occup       0 non-null object
entdepa     10 non-null object
entdepd     10 non-null object
entdepu     0 non-null object
matflag     10 non-null object
biryear     10 non-null float64
dtaddto     10 non-null object
gender      10 non-null object
insnum      0 non-null object
airline     10 non-null object
admnum      10 non-null float64
fltno       10 non-null object
visatype    10 non-null object
dtypes: float64(13),

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
I've used pandas for the smaller data while pyspark for larger sets for efficiency purpose as pandas wasn't able to handle the immigration data. 
* Propose how often the data should be updated and why.
Data should be updated every month as customs information is released each month while all the other data is static per se. 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 I would definitely use AWS S3 bucket for AWS provides computational power and ample data storage for a reasonable price.  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 In this case, I would use pipeline tools such as Airflow or Luigi to take advantage of its automated workflow and scheduling tools. 
 * The database needed to be accessed by 100+ people.
 
 I would use AWS Redshift for this purpose as this makes it accessible for anyone, anywhere.