# Data Pipe line for Analyzing US immigration data
### Data Engineering Capstone Project

#### Project Summary

Build a data pipeline to analyze the US Immigration data to provide valuable insights into the immigration patters. 
Few of the many questions that can be answered are  
 1. Most preferred Airline
 2. % of people traveeling in each of the Visa Categogry
 3. Avg Duration of stay in each of the Visa Category
 4. Most busy Airport
 5. Number of People potentially over staying than allowed as per the Visa Categogy
 6. Weather in which travel is maximum and minimum
 7. Most travelled destination
 8. Top country from where tourists are coming
 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Import the required libraries

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import types as T

### Step 1: Scope the Project and Gather Data

#### Scope 
Analyze the data to understand the relation ship of Entities and their attributes

#### Describe and Gather Data 
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office.  
   A data dictionary is included in the workspace.Data comes from https://travel.trade.gov/.  
2. World Temperature Data: This dataset came from Kaggle. URL for data source is https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
3. U.S. City Demographic Data: This data comes from OpenSoft.  
   URL for this datasource is https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
4. Airport Code Table: This is a simple table of airport codes and corresponding cities.  
   URL for data source is https://datahub.io/core/airport-codes#data

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [2]:
spark = SparkSession.builder.\
        config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

In [3]:
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [4]:
df_spark=spark.read.parquet("sas_data")

In [5]:
df_spark.show()

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [6]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
initial_num_records = df_spark.count()

In [8]:
print("Initialnumber of records in i94 dataset from Apr 16 is : {}".format(initial_num_records))

Initialnumber of records in i94 dataset from Apr 16 is : 3096313


In [9]:
#define function to count missing values in each column
#Code referenced from https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes ]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [10]:
count_missings(df_spark)

Unnamed: 0,count
entdepu,3095921
occup,3088187
insnum,2982605
visapost,1881250
gender,414269
i94addr,152592
depdate,142457
matflag,138429
entdepd,138429
airline,83627


In [11]:
# create datetime column from original timestamp column
df_spark_new = df_spark.withColumn('depdate', F.date_format(df_spark.depdate.cast(dataType=T.TimestampType()), "yyyy-MM-dd"))\
                       .withColumn('dtaddto', F.date_format(df_spark.dtaddto.cast(dataType=T.TimestampType()), "yyyy-MM-dd"))\
                       .withColumn('biryear', df_spark.biryear.cast(dataType=T.IntegerType()))\
                       .withColumn('i94yr', df_spark.i94yr.cast(dataType=T.IntegerType()))\
                       .withColumn('i94mon', df_spark.i94mon.cast(dataType=T.IntegerType()))\
                       .withColumn('arrdate', F.date_format(df_spark.arrdate.cast(dataType=T.TimestampType()), "yyyy-MM-dd"))\
                       .withColumn('dtadfile', F.to_date('dtadfile','yyyyMMdd'))

In [12]:
df_spark_new.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable =

In [13]:
df_spark_new.show()

+---------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+--------------+-----+--------+
|    cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+--------------+-----+--------+
|5748517.0| 2016|     4| 245.0| 438.0|    LOS|1970-01-01|    1.0|     CA|1970-01-01|  40.0|    1.0|  1.0|2016-04-30|     SYD| null|      G|      O|   null|      M|   1976|   null|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0| 2016|     4| 245.0| 438.0|    LOS|1970-01-01|    1.0|     NV

In [14]:
#show min and max dates
print("min arrdate i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'arrdate': 'min'}).collect()[0][0]))
print("max arrdate i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'arrdate': 'max'}).collect()[0][0]))

min arrdate i94 dataset from Apr 16 is : 1970-01-01
max arrdate i94 dataset from Apr 16 is : 1970-01-01


In [15]:
#show min and max dates
print("min depdate i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'depdate': 'min'}).collect()[0][0]))
print("max depdate i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'depdate': 'max'}).collect()[0][0]))

min depdate i94 dataset from Apr 16 is : 1970-01-01
max depdate i94 dataset from Apr 16 is : 1970-01-01


In [16]:
#show min and max dates
print("min i94yr i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'i94yr': 'min'}).collect()[0][0]))
print("max i94yr i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'i94yr': 'max'}).collect()[0][0]))

min i94yr i94 dataset from Apr 16 is : 2016
max i94yr i94 dataset from Apr 16 is : 2016


In [17]:
#show min and max dates
print("min dtadfile i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'dtadfile': 'min'}).collect()[0][0]))
print("max dtadfile i94 dataset from Apr 16 is : {}".format(df_spark_new.agg({'dtadfile': 'max'}).collect()[0][0]))

min dtadfile i94 dataset from Apr 16 is : 2013-08-11
max dtadfile i94 dataset from Apr 16 is : 2016-09-19


In [18]:
df_spark_new.groupBy("i94port").agg(count("*").alias('Count')).sort('count', ascending=False).show()

+-------+------+
|i94port| Count|
+-------+------+
|    NYC|485916|
|    MIA|343941|
|    LOS|310163|
|    SFR|152586|
|    ORL|149195|
|    HHW|142720|
|    NEW|136122|
|    CHI|130564|
|    HOU|101481|
|    FTL| 95977|
|    ATL| 92579|
|    LVG| 89280|
|    AGA| 80919|
|    WAS| 74835|
|    DAL| 71809|
|    BOS| 57354|
|    SEA| 47719|
|    PHO| 38890|
|    DET| 37832|
|    TAM| 25632|
+-------+------+
only showing top 20 rows



In [19]:
df_spark_new.groupBy("visatype").agg(count("*").alias('Count')).sort('count', ascending=False).show()

+--------+-------+
|visatype|  Count|
+--------+-------+
|      WT|1309059|
|      B2|1117897|
|      WB| 282983|
|      B1| 212410|
|     GMT|  89133|
|      F1|  39016|
|      E2|  19383|
|      CP|  14758|
|      E1|   3743|
|       I|   3176|
|      F2|   2984|
|      M1|   1317|
|      I1|    234|
|     GMB|    150|
|      M2|     49|
|     SBP|     11|
|     CPL|     10|
+--------+-------+



### From above data the top 5 Visa cateogeries under which people are travelling are
  1. WT - waiver-tourist
  2. B2 - visitor visa
  3. WB - waiver- business
  4. B1 - business visa
  5. GMT - Global Marine Travel

## Read demographic data

In [20]:
#assign us cities demographic data csv file to a variable
fname_demo = 'us-cities-demographics.csv'

In [21]:
df_demo = spark.read.format('csv').options(header='true', inferschema='true', sep=';').load(fname_demo)

In [22]:
df_demo.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [23]:
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [24]:
print("Initial number of records in US cities demographic dataset  is : {}".format(df_demo.count()))

Initial number of records in US cities demographic dataset  is : 2891


In [25]:
#count number of nulls in each of the columns
count_missings(df_demo)

Unnamed: 0,count
Average Household Size,16
Number of Veterans,13
Foreign-born,13
Male Population,3
Female Population,3
City,0
State,0
Median Age,0
Total Population,0
State Code,0


In [26]:
df_demo.groupBy("city","Total Population").agg(F.sum("Male Population").alias('MaleCount'),F.sum("Female Population").alias('FemaleCount')).sort('Total Population', ascending=False).show()

+-------------+----------------+---------+-----------+
|         city|Total Population|MaleCount|FemaleCount|
+-------------+----------------+---------+-----------+
|     New York|         8550405| 20408490|   22343535|
|  Los Angeles|         3971896|  9794990|   10064490|
|      Chicago|         2720556|  6600075|    7002705|
|      Houston|         2298628|  5748430|    5744710|
| Philadelphia|         1567442|  3706350|    4130860|
|      Phoenix|         1563001|  3934165|    3880840|
|  San Antonio|         1469824|  3607025|    3742095|
|    San Diego|         1394907|  3469130|    3505405|
|       Dallas|         1300082|  3195095|    3305315|
|     San Jose|         1026919|  2591585|    2543010|
|       Austin|          931840|  2378590|    2280610|
| Jacksonville|          868031|  2096015|    2244140|
|San Francisco|          864816|  2198760|    2125320|
|     Columbus|          849067|  2069905|    2175430|
| Indianapolis|          848423|  2053075|    2189040|
|   Fort W

In [27]:
df_demo.groupBy("city","Race","Total Population").agg(count("*")).sort('Total Population', ascending=False).drop("count(1)","Total Population").show()

+-----------+--------------------+
|       city|                Race|
+-----------+--------------------+
|   New York|               White|
|   New York|Black or African-...|
|   New York|               Asian|
|   New York|  Hispanic or Latino|
|   New York|American Indian a...|
|Los Angeles|  Hispanic or Latino|
|Los Angeles|Black or African-...|
|Los Angeles|               Asian|
|Los Angeles|American Indian a...|
|Los Angeles|               White|
|    Chicago|               White|
|    Chicago|Black or African-...|
|    Chicago|American Indian a...|
|    Chicago|               Asian|
|    Chicago|  Hispanic or Latino|
|    Houston|               White|
|    Houston|               Asian|
|    Houston|Black or African-...|
|    Houston|  Hispanic or Latino|
|    Houston|American Indian a...|
+-----------+--------------------+
only showing top 20 rows



## Read Global Temperature Data

In [28]:
fname_temp = '../../data2/GlobalLandTemperaturesByCity.csv'

In [29]:
df_temp = spark.read.format('csv').options(header='true', inferschema='true', sep=',').load(fname_temp)

In [30]:
df_temp.show()

+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|                 dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01 00:00:00| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01 00:00:00|             10.644|           1.2

In [31]:
df_temp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [32]:
#the defined function count_missings requires the columns to be string, so type casting the column to string
changedTypedf = df_temp.withColumn("dt", col("dt").cast("string"))

In [33]:
#count number of nulls in each of the columns
count_missings(changedTypedf)

Unnamed: 0,count
AverageTemperature,364130
AverageTemperatureUncertainty,364130
dt,0
City,0
Country,0
Latitude,0
Longitude,0


In [34]:
print("Initial number of records in global temparture dataset  is : {}".format(df_temp.count()))

Initial number of records in global temparture dataset  is : 8599212


## Read Airport Codes

In [35]:
fname_airport = 'airport-codes_csv.csv'

In [36]:
df_airport = spark.read.format('csv').options(header='true', inferschema='true', sep=',').load(fname_airport)

In [37]:
df_airport.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [38]:
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [39]:
#count number of nulls in each of the columns
count_missings(df_airport)

Unnamed: 0,count
iata_code,45886
local_code,26389
gps_code,14045
elevation_ft,7006
municipality,5676
ident,0
type,0
name,0
continent,0
iso_country,0


## Read i94port.txt

In [40]:
fname_port = 'i94port.txt'

In [41]:
df_port = spark.read.format('csv').options(header='false', inferschema='false', sep='=').load(fname_port)

## Clean the data by removing the enclosing quotes

In [42]:
df_port = df_port.withColumn('port_code',F.regexp_replace('_c0', "'", ''))\
       .withColumn('port_name',F.regexp_replace('_c1', "'", ''))

## Drop the default columns

In [43]:
df_port = df_port.drop('_c0','_c1')

In [44]:
df_port.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port_name: string (nullable = true)



In [45]:
df_port.show(truncate =False)

+---------+----------------------------------+
|port_code|port_name                         |
+---------+----------------------------------+
|   ALC	  |	ALCAN, AK                        |
|   ANC	  |	ANCHORAGE, AK                    |
|   BAR	  |	BAKER AAF - BAKER ISLAND, AK     |
|   DAC	  |	DALTONS CACHE, AK                |
|   PIZ	  |	DEW STATION PT LAY DEW, AK       |
|   DTH	  |	DUTCH HARBOR, AK                 |
|   EGL	  |	EAGLE, AK                        |
|   FRB	  |	FAIRBANKS, AK                    |
|   HOM	  |	HOMER, AK                        |
|   HYD	  |	HYDER, AK                        |
|   JUN	  |	JUNEAU, AK                       |
|   5KE	  |	KETCHIKAN, AK                    |
|   KET	  |	KETCHIKAN, AK                    |
|   MOS	  |	MOSES POINT INTERMEDIATE, AK     |
|   NIK	  |	NIKISKI, AK                      |
|   NOM	  |	NOM, AK                          |
|   PKC	  |	POKER CREEK, AK                  |
|   ORI	  |	PORT LIONS SPB, AK               |
|   SKA	  |	S

## Read i94cntyl.txt

In [46]:
fname_cntyl = 'i94cntyl.txt'

In [47]:
df_cntyl = spark.read.format('csv').options(header='false', inferschema='false', sep='=').load(fname_cntyl)

In [48]:
df_cntyl = df_cntyl.withColumn('code',F.regexp_replace('_c0', "'", ''))\
       .withColumn('country',F.regexp_replace('_c1', "'", ''))

In [49]:
df_cntyl = df_cntyl.drop('_c0','_c1')

In [50]:
df_cntyl.printSchema()

root
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)



In [51]:
df_cntyl.show(truncate =False)

+-------+-----------------------------------------------------------+
|code   |country                                                    |
+-------+-----------------------------------------------------------+
|   582 |  MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|
|   236 |  AFGHANISTAN                                              |
|   101 |  ALBANIA                                                  |
|   316 |  ALGERIA                                                  |
|   102 |  ANDORRA                                                  |
|   324 |  ANGOLA                                                   |
|   529 |  ANGUILLA                                                 |
|   518 |  ANTIGUA-BARBUDA                                          |
|   687 |  ARGENTINA                                                |
|   151 |  ARMENIA                                                  |
|   532 |  ARUBA                                                    |
|   438 |  AUSTRALIA

#### Cleaning Steps
Remove the insignificant columns and remove NAN values

## Create dimension table dim_countries

### Filter out the records where country value is either Invalid,or Collapsed or NoCountry Code

In [52]:
df_cntyl.filter(df_cntyl["country"].rlike("(?!INVALID:)|(?!Collapsed)|(?!No Country Code)")).createOrReplaceTempView("dim_countries")
dim_countries = spark.sql("""
                          SELECT CODE
                                ,COUNTRY
                          FROM DIM_COUNTRIES                  
                         """)

In [53]:
dim_countries.write.mode('overwrite').parquet('dim_countries.parquet')

In [54]:
dim_countries.show(truncate = False)

+-------+-----------------------------------------------------------+
|CODE   |COUNTRY                                                    |
+-------+-----------------------------------------------------------+
|   582 |  MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|
|   236 |  AFGHANISTAN                                              |
|   101 |  ALBANIA                                                  |
|   316 |  ALGERIA                                                  |
|   102 |  ANDORRA                                                  |
|   324 |  ANGOLA                                                   |
|   529 |  ANGUILLA                                                 |
|   518 |  ANTIGUA-BARBUDA                                          |
|   687 |  ARGENTINA                                                |
|   151 |  ARMENIA                                                  |
|   532 |  ARUBA                                                    |
|   438 |  AUSTRALIA

In [55]:
dim_countries.printSchema()

root
 |-- CODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)



## Create dimension table dim_ports

### Filter out the records for which either port code is not available or its value is Collapsed

In [56]:
df_port.filter(df_port["port_code"].rlike("(?!Collapsed)|(?!No PORT Code)")).createOrReplaceTempView("dim_ports")
dim_ports = spark.sql("""
                          SELECT port_code
                                ,port_name
                          FROM DIM_PORTS                  
                         """)

In [57]:
dim_ports.write.mode('overwrite').parquet('dim_ports.parquet')

In [58]:
dim_ports.show(truncate = False)

+---------+----------------------------------+
|port_code|port_name                         |
+---------+----------------------------------+
|   ALC	  |	ALCAN, AK                        |
|   ANC	  |	ANCHORAGE, AK                    |
|   BAR	  |	BAKER AAF - BAKER ISLAND, AK     |
|   DAC	  |	DALTONS CACHE, AK                |
|   PIZ	  |	DEW STATION PT LAY DEW, AK       |
|   DTH	  |	DUTCH HARBOR, AK                 |
|   EGL	  |	EAGLE, AK                        |
|   FRB	  |	FAIRBANKS, AK                    |
|   HOM	  |	HOMER, AK                        |
|   HYD	  |	HYDER, AK                        |
|   JUN	  |	JUNEAU, AK                       |
|   5KE	  |	KETCHIKAN, AK                    |
|   KET	  |	KETCHIKAN, AK                    |
|   MOS	  |	MOSES POINT INTERMEDIATE, AK     |
|   NIK	  |	NIKISKI, AK                      |
|   NOM	  |	NOM, AK                          |
|   PKC	  |	POKER CREEK, AK                  |
|   ORI	  |	PORT LIONS SPB, AK               |
|   SKA	  |	S

In [59]:
dim_ports.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port_name: string (nullable = true)



## Create dimension dim_temparature

### Remove records where AverageTemperature is null

In [60]:
#Remove null values from global temparature dataset

df_temp_cleaned = df_temp.filter(df_temp.AverageTemperature.isNotNull())

In [61]:
print("Number of records in global temparture dataset after removing nulls is : {}".format(df_temp_cleaned.count()))

Number of records in global temparture dataset after removing nulls is : 8235082


In [62]:
df_temp_cleaned.createOrReplaceTempView("dim_temparature")
dim_temparature = spark.sql("""
                                SELECT dt as date
                                      ,Country
                                      ,City
                                      ,AverageTemperature
                                      ,AverageTemperatureUncertainty
                                      ,Latitude
                                      ,Longitude
                                FROM DIM_TEMPARATURE                  
                            """)

In [63]:
dim_temparature.write.mode('overwrite').parquet('dim_temparature.parquet')

In [64]:
dim_temparature.show(truncate = False)

+-------------------+-------+-----+-------------------+-----------------------------+--------+---------+
|date               |Country|City |AverageTemperature |AverageTemperatureUncertainty|Latitude|Longitude|
+-------------------+-------+-----+-------------------+-----------------------------+--------+---------+
|1743-11-01 00:00:00|Denmark|Århus|6.068              |1.7369999999999999           |57.05N  |10.33E   |
|1744-04-01 00:00:00|Denmark|Århus|5.7879999999999985 |3.6239999999999997           |57.05N  |10.33E   |
|1744-05-01 00:00:00|Denmark|Århus|10.644             |1.2830000000000001           |57.05N  |10.33E   |
|1744-06-01 00:00:00|Denmark|Århus|14.050999999999998 |1.347                        |57.05N  |10.33E   |
|1744-07-01 00:00:00|Denmark|Århus|16.082             |1.396                        |57.05N  |10.33E   |
|1744-09-01 00:00:00|Denmark|Århus|12.780999999999999 |1.454                        |57.05N  |10.33E   |
|1744-10-01 00:00:00|Denmark|Århus|7.95               |

## Create dimension dim_demographics

In [65]:
df_demo.createOrReplaceTempView("dim_demographics")
dim_demographics = spark.sql("""
                                SELECT "State Code" as StateCode
                                      ,State
                                      ,City
                                      ,"Total Population" as TotalPopulation
                                      ,"Male Population" as MalePopulation
                                      ,"Female Population" as FemalePopulation
                                      ,"Median Age"as MedianAge
                                      ,"Average Household Size" as AvgHouseholdSize
                                      ,"Number of Veterans" as NumVeterans
                                      ,"Foreign-born" as ForeignBorn
                                      ,Race
                                FROM DIM_DEMOGRAPHICS                  
                            """)

In [66]:
dim_demographics.write.mode('overwrite').parquet('dim_demographics.parquet')

In [67]:
dim_demographics.show(truncate = False)

+----------+--------------+----------------+----------------+---------------+-----------------+----------+----------------------+------------------+------------+---------------------------------+
|StateCode |State         |City            |TotalPopulation |MalePopulation |FemalePopulation |MedianAge |AvgHouseholdSize      |NumVeterans       |ForeignBorn |Race                             |
+----------+--------------+----------------+----------------+---------------+-----------------+----------+----------------------+------------------+------------+---------------------------------+
|State Code|Maryland      |Silver Spring   |Total Population|Male Population|Female Population|Median Age|Average Household Size|Number of Veterans|Foreign-born|Hispanic or Latino               |
|State Code|Massachusetts |Quincy          |Total Population|Male Population|Female Population|Median Age|Average Household Size|Number of Veterans|Foreign-born|White                            |
|State Code|Alabama 

In [68]:
dim_demographics.printSchema()

root
 |-- StateCode: string (nullable = false)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- TotalPopulation: string (nullable = false)
 |-- MalePopulation: string (nullable = false)
 |-- FemalePopulation: string (nullable = false)
 |-- MedianAge: string (nullable = false)
 |-- AvgHouseholdSize: string (nullable = false)
 |-- NumVeterans: string (nullable = false)
 |-- ForeignBorn: string (nullable = false)
 |-- Race: string (nullable = true)



## Create dimension dim_airports

In [69]:
df_airport.createOrReplaceTempView("dim_airports")
dim_airports  = spark.sql("""
                                SELECT   ident
                                        ,type
                                        ,name
                                        ,elevation_ft
                                        ,continent
                                        ,iso_country
                                        ,iso_region
                                        ,municipality
                                        ,gps_code
                                        ,iata_code
                                        ,local_code
                                        ,coordinates
                                FROM DIM_AIRPORTS                  
                            """)

In [70]:
dim_airports.write.mode('overwrite').parquet('dim_airports.parquet')

In [71]:
dim_airports.show(truncate = False)

+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+---------------------------------------+
|ident|type         |name                              |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                            |
+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+---------------------------------------+
|00A  |heliport     |Total Rf Heliport                 |11          |NA       |US         |US-PA     |Bensalem    |00A     |null     |00A       |-74.93360137939453, 40.07080078125     |
|00AA |small_airport|Aero B Ranch Airport              |3435        |NA       |US         |US-KS     |Leoti       |00AA    |null     |00AA      |-101.473911, 38.704022                 |
|00AK |small_airport|Lowell Field                      |450         |N

In [72]:
dim_airports.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



## Create Fact  fact_immigration

### Remove records from I94 data having state code values for which data is available in dim_ports

In [73]:
df_spark_new.createOrReplaceTempView("fact_immigration")
fact_immigration  = spark.sql("""
                                SELECT   *
                                FROM FACT_IMMIGRATION
                                WHERE i94port in  (select port_code from dim_ports)
                            """)

In [74]:
df_spark_new = df_spark_new.withColumnRenamed('i94port','port_code')

In [75]:
df_spark_new = df_spark_new.join(dim_ports, ["port_code"], "leftanti")

In [76]:
df_spark_new = df_spark_new.withColumn("skey", F.monotonically_increasing_id())

### Create fact table 

In [77]:
df_spark_new.createOrReplaceTempView("fact_immigration")
fact_immigration = spark.sql("""
                                SELECT   skey 
                                        ,i94yr 
                                        ,i94mon  
                                        ,i94cit
                                        ,i94res
                                        ,port_code  
                                        ,arrdate  
                                        ,i94mode  
                                        ,i94addr  
                                        ,depdate  
                                        ,i94bir  
                                        ,i94visa  
                                        ,dtadfile  
                                        ,visapost  
                                        ,occup  
                                        ,entdepa  
                                        ,entdepd  
                                        ,matflag  
                                        ,biryear  
                                        ,dtaddto  
                                        ,gender  
                                        ,airline  
                                        ,visatype  
                                FROM fact_immigration                  
                            """)

In [78]:
fact_immigration.write.mode('overwrite').parquet('fact_immigration.parquet')

In [79]:
fact_immigration.show(truncate = False)

+----+-----+------+------+------+---------+----------+-------+-------+----------+------+-------+----------+--------+-----+-------+-------+-------+-------+-------+------+-------+--------+
|skey|i94yr|i94mon|i94cit|i94res|port_code|arrdate   |i94mode|i94addr|depdate   |i94bir|i94visa|dtadfile  |visapost|occup|entdepa|entdepd|matflag|biryear|dtaddto|gender|airline|visatype|
+----+-----+------+------+------+---------+----------+-------+-------+----------+------+-------+----------+--------+-----+-------+-------+-------+-------+-------+------+-------+--------+
|0   |2016 |4     |245.0 |438.0 |LOS      |1970-01-01|1.0    |CA     |1970-01-01|40.0  |1.0    |2016-04-30|SYD     |null |G      |O      |M      |1976   |null   |F     |QF     |B1      |
|1   |2016 |4     |245.0 |438.0 |LOS      |1970-01-01|1.0    |NV     |1970-01-01|32.0  |1.0    |2016-04-30|SYD     |null |G      |O      |M      |1984   |null   |F     |VA     |B1      |
|2   |2016 |4     |245.0 |438.0 |LOS      |1970-01-01|1.0    |WA 

In [80]:
fact_immigration.printSchema()

root
 |-- skey: long (nullable = false)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- port_code: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- visatype: string (nullable = true)




#### 3.1 Conceptual Data Model
The ETL Pipeline consist of below dimenion and Fact tables. Star Schema is being used to model the data.
 * Dimension Tables  
   * dim_countries  
   * dim_ports
   * dim_temparature  
   * dim_demographics  
   * dim_airports
 * Fact Table  
   * fact_immigration  
 
   
 * Conceptual Model (Since this is conceptual model, it does not show the Refrential Constraints)  
   
 ![ConceptuaModel](https://r766466c839826xjupyterlnnfq3jud.udacity-student-workspaces.com/files/Conceptual%20Model.PNG?_xsrf=2%7Cd0db3c9c%7C35f88a0bf6691a0cba5aa87c43349937%7C1585202570)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The ETL pipeline can be modeled using airflow hosted on AWS EC2 cluster. The data pipeline will look as below

S3 will be used as a data source and to store the dimension tables in parquet file format


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

create_dimension_tables >> load_dim_data_from_s3  
create_fact_table  
load_dim_data_from_s3 >> load_data_into_fact_table  
load_data_into_fact_table >> data_validation


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [81]:
#Fact table should not be empty
print("Total number of records in fact table is : {}".format(fact_immigration.count()))

Total number of records in fact table is : 3096313


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

* I94 Immigration Data  
  * I94YR - 4 digit year  
  * I94MON - Numeric month  
  * I94CIT - City where I94 was issued  
  * port_code - valid ports for processing  
  * ARRDATE - the Arrival Date in the USA.  
  * I94MODE - Mode of travel to USA  
  * I94ADDR -There is lots of invalid codes in this variable and the list below   
         shows what we have found to be valid, everything else goes into 'other'  
  * DEPDATE is the Departure Date from the USA. It is a SAS date numeric field that  
    a permament format has not been applied.  Please apply whichever date format  
    works for you. 
  * I94BIR - Age of Respondent in Years 
  * I94VISA - Visa codes collapsed into three categories:  
    * 1 = Business
    * 2 = Pleasure
    * 3 = Student
  * DTADFILE - Character Date Field - Date added to I-94 Files - CIC does not use 
  * VISAPOST - Department of State where where Visa was issued - CIC does not use 
  * OCCUP - Occupation that will be performed in U.S. - CIC does not use 
  * ENTDEPA - Arrival Flag - admitted or paroled into the U.S. - CIC does not use 
  * ENTDEPD - Departure Flag - Departed, lost I-94 or is deceased - CIC does not use 
  * ENTDEPU - Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use 
  * MATFLAG - Match flag - Match of arrival and departure records 
  * BIRYEAR - 4 digit year of birth 
  * DTADDTO - Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use 
  * GENDER - Non-immigrant sex 
  * AIRLINE - Airline used to arrive in U.S. 
  * VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
 * Pyspark isused to build the ETL code due to its efficieny in processing large volume of data.  
   Pyspark can also process multiple formats of data efficiently.
* Propose how often the data should be updated and why.  
   Data should be updated based on the available subscription of US I94 data. 
   Most recent data will ensure that the reports are current and up to date metrics to consumers of the data.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.  
   The ETL pipeline can be mored to a bigger EMR cluster in AWS using better instance types.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
   Airflow DAGs can be scheduled to populate the dashboard by 7am every day.
 * The database needed to be accessed by 100+ people.  
   Dimensiaonal model can be hosted in Redshift to handle the increasing workload