# US IMMIGRATIONS ARRIVALS ANALYSIS
### Data Engineering Capstone Project
#### Project Summary
Analysing US immigration inbound travellers for the Month of April 2016.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Doing all the Imports here
import psycopg2
import pandas as pd
from sql_queries import *
import os
import glob
import psycopg2.extras as extras

In [2]:
#connecting to immigrations postgres database
conn = psycopg2.connect("host=127.0.0.1 dbname=immigrationsdb user=student password=student")
cur = conn.cursor()

### Step 1: Scope the Project and Gathering Data

#### Scope 
The purpose of this Database is to have a well-designed schema that helps to make use of the sas data of arrivals to US travel data . Having a dataset in form of a Postgres database tables will allow us to analyze travellers coming to the US and to get data reports to analyse travellers behaviour. Another purpose of this database is to have an ETL pipeline that helps inserting new data to the tables so that we have an updated table with new data on inbound trafic to US each time we run the ETL pipeline.


#### Describing and Gathering Data 
Source of the Data sets used are given below:

1. i94 immigration Data come  from the US National Tourism and Trade Office immigration
2. World Temperature Data: This dataset came from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
3. U.S. City Demographic Data: This data comes from [OpenSoft.](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
4. Ports data: This data came from [fam.state.gov](https://fam.state.gov/fam/09FAM/09FAM010205.html)
5. Airlines Data: This data came from [Data.world](https://data.world/tylerudite/airports-airlines-and-routes)

In [3]:
# Read in the data here
demo_df = pd.read_csv('data_sets/us-cities-demographics.csv', delimiter = ';')
port_df = pd.read_csv('data_sets/port_codes.csv')
airlines_df = pd.read_csv('data_sets/airlines.csv')

In [4]:
#reading global temperature csv file from the attached disk
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(fname)

In [1]:
# Creating parquet file of immigration data for faster read to data frame

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

# Reading immigration data in spark data frame
df_spark_apr = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet data to spark df
df_spark_apr.write.parquet("sas_apr_16")

AnalysisException: 'path file:/home/workspace/sas_apr_16 already exists.;'

In [5]:
# Reading Immigration data
#pip install -U fastparquet to install the engine

label = pd.read_parquet('sas_apr_16', engine='fastparquet')

In [6]:
label.count()

cicid       3096313
i94yr       3096313
i94mon      3096313
i94cit      3096313
i94res      3096313
i94port     3096313
arrdate     3096313
i94mode     3096074
i94addr     2943721
depdate     2953856
i94bir      3095511
i94visa     3096313
count       3096313
dtadfile    3096312
visapost    1215063
occup          8126
entdepa     3096075
entdepd     2957884
entdepu         392
matflag     2957884
biryear     3095511
dtaddto     3095836
gender      2682044
insnum       113708
airline     3012686
admnum      3096313
fltno       3076764
visatype    3096313
dtype: int64

## Step 2: Exploring and Assessing the Data
#### Exploring the Data 
Identifying data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Documenting steps necessary to clean the data

### #1: `Ports` Data frame
#### Extracting Data from Ports data file to be used for immigration table later.
- Selecting rows where code is not null in a dataframe
- Spliting Location column values at the delimiter ',' to have city and state code in separate columns
- adding the separated columns in the main port_df data fram as city and state
- Deleting the original Location column
- Renaming the Code column to i94port for later use

In [7]:
# Displaying first row of the data
port_df.head(1)

Unnamed: 0,Code,Location
0,ABE,"Aberdeen, WA"


In [8]:
port_df.dropna(subset =['Code'],inplace=True)

df = port_df['Location'].str.split(',', n=1, expand=True)

port_df["City"] = df[0]

port_df["State"] = df[1]

port_df.drop(columns=["Location"],inplace=True)

port_df.rename(columns={'Code': 'i94port', 'State': 'state_code', 'City': 'city'}, inplace=True)

port_df.head(1)

Unnamed: 0,i94port,city,state_code
0,ABE,Aberdeen,WA


### #2: `Race` Table
#### Cleaning and Extracting Data for Race Table


In [9]:
#Displaying data
demo_df.head(1)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924


#### Removing duplicates by choosing unique values
Aggregating the dataframe on State, State Code and Race, and adding the Count collumn to get total no. of different races as race count column.

In [10]:
agg_df=(demo_df.groupby(['State','State Code','Race'], as_index=False)
    .agg({'Count':'sum'})
    .rename(columns={'Count':'Race Count'})
    )

agg_df.head(1)

Unnamed: 0,State,State Code,Race,Race Count
0,Alabama,AL,American Indian and Alaska Native,8084


#### Cleaning Race column data
By using `df.pivot_table` Selecting all the unique values of the Race and adding them as columns with their corresponding race count values for a specific state.

In [11]:
pt = agg_df.pivot_table(values='Race Count', columns='Race', 
                    index=['State','State Code',])

pt.columns.name=None

race_df = pt.reset_index()

race_df.rename(columns={'State Code': 'state_code', 'American Indian and Alaska Native': 'americanindian_and_alaskanative', 'Black or African-American': 'black_or_africanamerican', 'Hispanic or Latino':'hispanic_or_latino'}, inplace=True)

race_df.head(1)

Unnamed: 0,State,state_code,americanindian_and_alaskanative,Asian,black_or_africanamerican,hispanic_or_latino,White
0,Alabama,AL,8084.0,28769.0,521068.0,39313.0,498920.0


### #3: `Demographics` Table
#### Cleaning and Extracting Data for Demographics Table


In [12]:
# Displaying the data

demo_df.head(1)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924


#### Cleaning data and droping duplicates
1. Removing the race and count columns as they are not needed in this table
2. Sorting the data by City 
3. The data does contain some duplicate values which are being removed
4. Adding a unique column for indexing and naming it City id

In [13]:
cdemo_df = demo_df.drop(['Race', 'Count'], axis=1)

cdemo_df.sort_values("City",inplace=True)

cdemo_df.drop_duplicates(inplace=True)

cdemo_df.reset_index(level=0, inplace=True)

cdemo_df.rename(columns={'index':'city_id', 'Median Age':'median_age', 'Male Population':'male_population','Female Population':'female_population', 'Total Population':'total_population' ,'Number of Veterans':'number_of_veterans' ,'Foreign-born':'foreign_born', 'Average Household Size':'average_household_size', 'State Code':'state_code'}, inplace=True )

cdemo_df.head(1)

Unnamed: 0,city_id,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code
0,2727,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX


### #4: `Airlines` Table
#### Cleaning and Extracting Data for Airlines Table


In [14]:
# Displaying the airlines data

airlines_df.head(6)

Unnamed: 0,Airline ID,Name,Alias,IATA,ICAO,Callsign,Country,Active
0,-1,Unknown,\N,-,,\N,\N,Y
1,1,Private flight,\N,-,,,,Y
2,2,135 Airways,\N,,GNL,GENERAL,United States,N
3,3,1Time Airline,\N,1T,RNX,NEXTIME,South Africa,Y
4,4,2 Sqn No 1 Elementary Flying Training School,\N,,WYT,,United Kingdom,N
5,5,213 Flight Unit,\N,,TFU,,Russia,N


#### Cleaning Data
1. Only selecting the rows where iata column is not null
2. Selecting IATA, Airline ID, Name, Country and Active columns for our table
3. Filtering the data for only the active flights with active flag 'Y'
4. Droping all rows with duplicate IATA Code to avoind any descrepancies in our data.

In [15]:
ana_df = airlines_df[airlines_df['IATA'].notna()]

iata_df = ana_df[['IATA','Airline ID','Name','Country','Active']]

af_df=iata_df.loc[iata_df['Active'] == 'Y']

air_df = iata_df.drop_duplicates(subset='IATA', keep=False,)

air_df.rename(columns={'Airline ID':'airline_id'}, inplace=True )


air_df.head(1)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


Unnamed: 0,IATA,airline_id,Name,Country,Active
3,1T,3,1Time Airline,South Africa,Y


### #5: `Temperature` Table
#### Cleaning and Extracting Data for Temperature Table

In [16]:
# Displaying data
temp_df.head(1)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E


### Cleaning the temperature data
1. Filtering out data only for United States
2. Selecting data with non null values of Average Temperature
3. Checking for any null values in the new data frame.
4. Droping Latitude and Longitude columns from our data to aggregate our data in next step


In [17]:
ust_df= temp_df.loc[temp_df['Country'] == 'United States']
temp_c = ust_df[['dt','AverageTemperature','AverageTemperatureUncertainty','City' ]]

avgt = temp_c[ust_df['AverageTemperature'].notna()]

avgt.isnull().sum()

dt                               0
AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
dtype: int64

In [18]:
# Displaying data frame
avgt.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City
47555,1820-01-01,2.101,3.217,Abilene
47556,1820-02-01,6.926,2.853,Abilene
47557,1820-03-01,10.767,2.395,Abilene
47558,1820-04-01,17.989,2.202,Abilene
47559,1820-05-01,21.809,2.036,Abilene


#### Removing duplicates for temperature table
As the columns for dt and city were getting duplicate because of coordinates, Aggregating data on dt and city for average values of temperature  and temperature uncetainity to get unique aggregated collumns to avoid any duplicate values

In [19]:
gtemp_df=(avgt.groupby(['dt','City'], as_index=False)
    .agg({'AverageTemperature':'mean','AverageTemperatureUncertainty':'mean'})
    )

In [20]:
gtemp_df.rename(columns={'AverageTemperature':'average_temperature','AverageTemperatureUncertainty':'average_temperature_uncertainty'}, inplace=True )
gtemp_df.head(1)

Unnamed: 0,dt,City,average_temperature,average_temperature_uncertainty
0,1743-11-01,Akron,3.209,1.961


In [21]:
gtemp_df.count()

dt                                 639649
City                               639649
average_temperature                639649
average_temperature_uncertainty    639649
dtype: int64

### #6: `Immigrations` Table
#### Cleaning and Extracting Data for Immigrations Table

#### Cleaning data for Immigration data frame
1. Selecting collumns needed by the table.
2. Checking for any null data
3. Merging the data frame with the port table data frame to add a new column for city, Also droping the state column.

In [22]:
labelf = label [['cicid','i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode' , 'i94addr', 'i94visa', 'airline']]

In [23]:
labelf.isnull().sum()

cicid           0
i94cit          0
i94res          0
i94port         0
arrdate         0
i94mode       239
i94addr    152592
i94visa         0
airline     83627
dtype: int64

In [24]:
# replacing na values in i94mode as code 9 which is "Not Reported"

labelf["i94mode"].fillna("9.0", inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  downcast=downcast,


In [25]:
labelf.isnull().sum()

cicid           0
i94cit          0
i94res          0
i94port         0
arrdate         0
i94mode         0
i94addr    152592
i94visa         0
airline     83627
dtype: int64

In [26]:
merge = labelf.merge(port_df,how='left')
immig_df=merge.drop(["state_code"],1)
it_df = immig_df[['cicid','i94cit','i94res','i94port','city','i94addr','arrdate','i94mode','airline']]


#### Converting fomat of `arrdate` from SAS Format to date format

In [27]:
it_df['arrdate'] = pd.to_timedelta(it_df["arrdate"], unit='D') + pd.Timestamp('1960-1-1')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [28]:
it_df.head(7)

Unnamed: 0,cicid,i94cit,i94res,i94port,city,i94addr,arrdate,i94mode,airline
0,6.0,692.0,692.0,XXX,,,2016-04-29,9.0,
1,7.0,254.0,276.0,ATL,Atlanta,AL,2016-04-07,1.0,
2,15.0,101.0,101.0,WAS,Washington,MI,2016-04-01,1.0,OS
3,16.0,101.0,101.0,NYC,New York,MA,2016-04-01,1.0,AA
4,17.0,101.0,101.0,NYC,New York,MA,2016-04-01,1.0,AA
5,18.0,101.0,101.0,NYC,New York,MI,2016-04-01,1.0,AZ
6,19.0,101.0,101.0,NYC,New York,NJ,2016-04-01,1.0,AZ


### Step 3: Defining the Data Model
#### 3.1 Conceptual Data Model
- Below you can see the data model with snowflake schema designed for our postgress database. Having a dataset in form of a Postgres database tables will allow us to analyze travel activity at different port of entry in US. Using this database model will allow us to have an ETL pipeline that helps inserting new data to the tables so that we can run analytical queries.
- In the Center we have **immigrations** table as fact table. And the rest as Dimension Tables.
- The `golden key icon` indicates the primary key of the table.
- The `pointed arrows` shows the relations between the fact and dimension tables.

<img src="data_sets/ERD.PNG">


### Step 4: Runing Pipelines to Model the Data 
#### 4.1 Creating the data model

Building the data pipelines to create the data model.
Now in the next step, we have created a function that loads the all rows from the data frame to the sql tables.

Next we will run this function on all the data frames created for their respective tables.

In [29]:
def execute_values(conn, df, table):
    """
    Using psycopg2.extras.execute_values() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()


In [30]:
# adding gtemp_df data values to postgres sql table "temperature"

execute_values(conn, gtemp_df, 'temperature')

execute_values() done


In [31]:
# adding port_df data values to postgres sql table "race"

execute_values(conn, race_df, 'race')

execute_values() done


In [32]:
# adding air_df data values to postgres sql table "airlines"

execute_values(conn, air_df, 'airlines')

execute_values() done


In [33]:
# adding cdemo_df data values to postgres sql table "demographics"

execute_values(conn, cdemo_df, 'demographics')

execute_values() done


In [34]:
# adding it_df data values to postgres sql table "immigrations"

execute_values(conn, it_df, 'immigrations')

execute_values() done


#### 4.2 Data Quality Checks
Runing Quality Checks to check the following on immigration table fact table:
* Integrity constraints on the relational database to check for any null Values
* Testing the data type for the i94mode
* Checking for unique values in ccid primary key by running check for duplicate values
 


In [59]:
#connecting to the database

%load_ext sql
%sql postgresql://student:student@127.0.0.1/immigrationsdb

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


'Connected: student@immigrationsdb'

#### Integrity constraints on the relational database to check for any null Values

In [60]:
%%sql 

SELECT cicid
FROM immigrations
WHERE cicid IS NULL 


 * postgresql://student:***@127.0.0.1/immigrationsdb
0 rows affected.


cicid


#### Checking data type for mode of transport as it should be 1,2,3 or 9

In [61]:
%%sql

SELECT i94mode
FROM immigrations
WHERE not i94mode in (1.0, 2.0, 3.0, 9.0)

 * postgresql://student:***@127.0.0.1/immigrationsdb
0 rows affected.


i94mode


#### Duplicate value checks to ensure correctness

In [62]:
%%sql

SELECT cicid
FROM (SELECT cicid, SUM(1) AS count 
      FROM immigrations 
      GROUP BY 1) AS test
WHERE count > 1

 * postgresql://student:***@127.0.0.1/immigrationsdb
0 rows affected.


cicid


#### 4.3 Data dictionary 
A data dictionary for our data model. For each field, a brief description of what the data is and where it came from is provided. A data dictionary is include in a separate file for immigration data types name **`I94_dictionary.txt.`**

#### FACT TABLE - Immigrations

**I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.

**Ports Codes Data**: Data for entry of port codes used to add city column to immigration table came from [US Deparment of State Website](https://fam.state.gov/fam/09FAM/09FAM010205.html)

**Immigrations Table Columns**

Columns:
- `cicid`  Unique Identifier for every arrival to US as per US National Tourism and Trade Office.
-`i94cit` Country of Citizenship
-`i94res` Conuntry of Residency
-`i94port` Port of Entry
-`City` Name of City of the port
-`i94addr` Code of the State
-`arrdate` Date of arrival
-`i94mode` Mode of travel (Sea, air or Land)
-`airline` IATA Code of Airline used to travel



#### DIMENSION TABLES- Race, Demographics, Airlines and Temperature

**Demographics Data** - This data comes from [Opensoft.](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). Using US demographics data we have created 2 tables `Race and Demographics Tables`.

**Race Table Columns**

- `State` US State name.
- `state_code` 3 letter code for the state
- `americanindian_and_alaskanative` Ameican Indian and Alaskan Native Population of the state.
- `asian` Asian Population of the state.
- `black_or_africanamerican` Black or African American Population of the state.
- `hispanic_or_latino` Hispanic or Latino Population of the state.
- `white` White Population of the state.

**Demographics Table Columns**
- `city_id` Unique Identifier for every City.*(Prmary Key)*
- `city` Name of City
- `state` Name of State
- `median_age` Median Age of the total poulation of the City.
- `male_population` Total Male population of the City.
- `female_population` Total Femal Population of the City.
- `total_population` Total Population fot he City.
- `number_of _veterans` Total number of Veterans in the City.
- `foreign_born` Total number of foreign born population of the city.
- `average_household_size` Average Household member size of the city.
- `state_code` Code of the State of city.

**Airline Data** = This data from [data.world](https://data.world/tylerudite/airports-airlines-and-routes)

**Airlines Table Columns**
- `iata` IATA Code of the Airlines
- `airline_id` Airline ID also another unique identifier for the airlines
- `	name` Name of the Airlines
- `Country` Country of the Owner of Airlines
- `Active` Status of the Airlines Active or not.

**Global Temperature Data** - This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

**Temperature Table Columns**
- `dt`  Date when the temperature was recorded
- `city` Name of the City 
- `average_temperature` Average temperature for that day of the city.
- `	average_temperature_uncertainty` Average of Temperature Uncertainity for that date.



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

The Project Writeup is in a Mark down file called **`Project Write-up.md`**