# Project Title
### Data Engineering Capstone Project

#### Project Summary
I94 Immigration open ended Data is provided in the workspace to answer various key insights on US Immigration.
Additional data files such as US Cities Demographics and Global temperature city wise also provided in the workspace.

##### Objective

Is to be provide anlaytics platform to enable data analyst for finding key insights. For this, need to create Analytics Database. The data to be populated by joining various input data pipeline through ETL tool.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data

    * Project is to create etl data pipeline to ingest data from datafiles and transform to fit into data model for further storing the it in data warehouse.

* Step 2: Explore and Assess the Data
    * Pyspark is used create dataframe where data from files are extracted.

    * Data Cleaning:

    * EDA has been carried about in every dataframes to clean the datasets for missing values to drop the fields. 
    * Rows are deleted for missing values in essential fields and duplicate values.
    * columns are also dropped for which missing values are more than 90%

* Transformation:

  * Date format is transformed from string values of I94 SAS data.
  * From data dictionary file country code is mapped for country name.
  * Aggregate city wise average temperature is calcualted.

* Step 3: Define the Data Model

* Data Model:
  * Star Schema Data model created with referring the ER Diagram
  * Primary keys identified in dimension table
  * Fact table created and relation with dimention table is defined by join table.

* Data Quality checks are done to confirm:

  * No null value present in unique id of dataframe
  * All values are unique in the key columns.
  * Data is populated in dataframes.

* Step 4: Run ETL to Model the Data


* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, DateType
from datetime import datetime, timedelta

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in the data here
pd_imm = pd.read_csv('immigration_data_sample.csv')


In [3]:
pd_imm.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [3]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().\
getOrCreate()

In [5]:
df_imm = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### 2.1 Load Immigration data

In [156]:
#write to parquet
df_imm.write.mode('overwrite').format('parquet').save("sas_data")

In [6]:
df_imm=spark.read.parquet("sas_data")
df_imm.show(1)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--

#### 2.1.1.a Explore Immigration Data for missing values

In [7]:
# Performing cleaning tasks here
print(f'Total rows in "df_imm" dataframe is: {df_imm.count()}')
df_imm.printSchema() 
df_imm.head()
df_imm.show(1)
df_imm.describe()


Total rows in "df_imm" dataframe is: 3096313
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (null

DataFrame[summary: string, cicid: string, i94yr: string, i94mon: string, i94cit: string, i94res: string, i94port: string, arrdate: string, i94mode: string, i94addr: string, depdate: string, i94bir: string, i94visa: string, count: string, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: string, dtaddto: string, gender: string, insnum: string, airline: string, admnum: string, fltno: string, visatype: string]

In [8]:
cols_counts_imm = df_imm.select([count(when(isnull(c), c)).alias(c) for c in df_imm.columns])

In [9]:
cols_counts_imm.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [10]:
total_row_imm = df_imm.count()

In [11]:
na_cols_imm = [c1 for c1 in cols_counts_imm.columns if cols_counts_imm.select(c1).first()[0] / total_row_imm >= 0.90]

In [12]:
na_cols_imm

['occup', 'entdepu', 'insnum']

In [13]:
#### 2.1.2 Cleaning Immigration Data for duplicates and missing values

In [14]:
df_imm_null_cols_dropped = df_imm.drop(*na_cols_imm)
df_imm_null_cols_dropped.count()
df_imm_null_cols_dropped.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [15]:
df_imm_null_rows_dropped = df_imm_null_cols_dropped.dropna(subset=['cicid','arrdate','visatype'])
df_imm_null_rows_dropped.count()

3096313

In [16]:
df_imm_cleaned = df_imm_null_rows_dropped.drop_duplicates(['cicid'])

In [17]:
df_imm_cleaned.count()

3096313

### 2.2 Load US Demographics data

In [4]:
# Read CSV
df_demog = spark.read.csv('us-cities-demographics.csv', sep=';', header=True)

In [5]:
df_demog.show(1)
df_demog.printSchema()

+-------+------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   City| State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+-------+------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|Wichita|Kansas|      34.6|         192354|           197601|          389955|             23978|       40270|                  2.56|        KS|American Indian a...| 8791|
+-------+------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
only showing top 1 row

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = tr

In [20]:
# Write in parquet
#df_demog.write.mode('overwrite').format('parquet').save("us-cities-demographics.parquet")
#df_demog = spark.read.format('parquet').load("us-cities-demographics.parquet")
cols_counts_demog = df_demog.select([count(when(isnull(c), c)).alias(c) for c in df_demog.columns])
cols_counts_demog.show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [21]:
total_row_demog = df_demog.count()
total_row_demog

2891

In [22]:
na_cols_demog = [c1 for c1 in cols_counts_demog if cols_counts_demog.select(c1).first()[0] / total_row_demog >= 0.90]

In [23]:
na_cols_demog

[]

In [24]:
df_demog = df_demog.dropna(subset=['Male Population',
        'Female Population',
        'Number of Veterans',
        'Foreign-born',
        'Average Household Size'])

df_demog= df_demog.drop_duplicates(subset=['City', 'State', 'State Code', 'Race'])

df_demog.count()

2875

### 2.3 Load Airport data

In [25]:
df_airport = spark.read.csv('airport-codes_csv.csv', sep=',', header=True)

In [26]:
df_airport.show(1)
df_airport.printSchema()

+-----+--------+-----------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|    type|             name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+--------+-----------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|heliport|Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
+-----+--------+-----------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
only showing top 1 row

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_

In [27]:
total_row_airport = df_airport.count()
total_row_airport

55075

In [28]:
cols_counts_airport = df_airport.select([count(when(isnull(c), c)).alias(c) for c in df_airport.columns])

In [29]:
na_cols_airport = [c1 for c1 in cols_counts_airport.columns if cols_counts_airport.select(c1).first()[0] / total_row_airport >= 0.90]

In [30]:
print([(c1, cols_counts_airport.select(c1).first()[0] / total_row_airport) for c1 in cols_counts_airport.columns])

[('ident', 0.0), ('type', 0.0), ('name', 0.0), ('elevation_ft', 0.127208352246936), ('continent', 0.0), ('iso_country', 0.0), ('iso_region', 0.0), ('municipality', 0.10305946436677259), ('gps_code', 0.25501588742623693), ('iata_code', 0.8331547889241943), ('local_code', 0.4791466182478438), ('coordinates', 0.0)]


In [31]:
na_cols_airport

[]

In [32]:
df_airport_null_cols_dropped = df_airport.drop(*na_cols_airport)

In [33]:
df_airport_missing_rows_dropped = df_airport_null_cols_dropped.dropna(how='all')

In [34]:
df_airport_cleaned = df_airport_missing_rows_dropped.drop_duplicates()

In [35]:
df_airport_cleaned.count()

55075

In [36]:
df_airport_cleaned.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



### 2.4 Load US Global Temperature by city data

In [6]:
df_temp = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)


In [8]:
df_temp.write.partitionBy('Country').mode('overwrite').parquet('GlobalLandTemperaturesByCity.parquet')

In [9]:
df_temp = spark.read.format('parquet').load('GlobalLandTemperaturesByCity.parquet')

In [10]:
df_temp.show(5)
df_temp.printSchema()

+----------+--------------------+-----------------------------+------+--------+---------+-------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|  City|Latitude|Longitude|Country|
+----------+--------------------+-----------------------------+------+--------+---------+-------+
|1840-01-01|              -2.025|                        2.537| Taian|  36.17N|  117.35E|  China|
|1820-08-01|              20.146|                        2.286|Ürümqi|  44.20N|   87.20E|  China|
|1840-02-01|-0.20300000000000007|                        2.465| Taian|  36.17N|  117.35E|  China|
|1820-09-01|              15.331|                        1.775|Ürümqi|  44.20N|   87.20E|  China|
|1840-03-01|                4.82|           2.0180000000000002| Taian|  36.17N|  117.35E|  China|
+----------+--------------------+-----------------------------+------+--------+---------+-------+
only showing top 5 rows

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |--

In [11]:
df_temp = df_temp.drop('AverageTemperatureUncertainty', 'Latitude', 'Longitude')
df_temp.show(5)

+----------+--------------------+------+-------+
|        dt|  AverageTemperature|  City|Country|
+----------+--------------------+------+-------+
|1840-01-01|              -2.025| Taian|  China|
|1820-08-01|              20.146|Ürümqi|  China|
|1840-02-01|-0.20300000000000007| Taian|  China|
|1820-09-01|              15.331|Ürümqi|  China|
|1840-03-01|                4.82| Taian|  China|
+----------+--------------------+------+-------+
only showing top 5 rows



In [12]:
total_row_temp = df_temp.count()
total_row_temp 

8599212

In [13]:
cols_counts_temp = df_temp.select([count(when(isnull(c), c)).alias(c) for c in df_temp.columns])
cols_counts_temp.show()

+---+------------------+----+-------+
| dt|AverageTemperature|City|Country|
+---+------------------+----+-------+
|  0|            364130|   0|      0|
+---+------------------+----+-------+



In [42]:
na_cols_temp = [c1 for c1 in cols_counts_temp.columns if cols_counts_temp.select(c1).first()[0] / total_row_temp >= 0.90]

In [43]:
na_cols_temp # no columns have null values more than 90%

[]

In [44]:
df_temp = df_temp.dropna(subset=['AverageTemperature'])
df_temp.count()

8235082

In [45]:
df_temp_cleaned = df_temp.drop_duplicates(subset=['dt', 'City', 'Country'])

In [46]:
df_temp_cleaned.count()

8190783

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![Data Model](ImmigrationERD.png)
**Map of the conceptual data model** 

##### For the analytics Star-schema has been chosen which is composed of 4 Dimension tables and corresponding Fact table created as follow:

1. Immigration Calendar Dimension Table:
Primary key is chosen as arrdate which is extracted from immigration dataset.
Further from arrdata year, month, day, week and weekday have been extracted.

2. Country_temperature Dimension Table:
From Global Temperature City wise dataset, this dimension table is created to map country and average temperature.
Also from US demographic dataset the table is getting country code.


3. US Demographic Dimension Table:
The table is created from US Demographic dataset to find out various key sight on immigration.
The table enables the analyst for taking data driven decision from various joins statements.

4. Visa Type Dimension Table:
As required by analyst for decision to be taken based on visa type, this table is created from immigration dataset.

Immigration Fact Table: 
The centre table which joins other dimension table as M:1 mapping relation with common keys.

#### 3.2 Mapping Out Data Pipelines
1. The datasets are loaded into spark cluster into dataframe.
2. SAS fromat of Immigration dataset is converted to parquet. Also, the GlobalLandTemperatureDataSet downloaded from Kaggle is also converted to parquet format with partition by country.
3. All the dataframes are cleaned as per following strategy:

    * The columns are dropped where more than 90% values are missing.
    
    * The columns are dropped which are not required for this analysis.
    
    * The rows are dropped where particular columns are having no values.
    
    * The rows which are having duplicate values in set of columns are dropped
    
    
 4. Table creations as per the data model and loading of data from dataframes:
 
    * Created `US immigration calendar dimension table` and data is extracted and loaded from `cleaned immigration dataframe`
    
    * Created  `Country_temperature Dimension Table` and data is loaded by joining Global temperature dataframe and US Demographic dataframe
    
    * Created `US Demographic Dimension table` and data is extracted and loaded from respective dataframe
    
    * Created `visa type dimension table` and data is extracted and loaded from cleaned `immigration dataframe`
    
 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create dataframe for immigration calendar table consisting all distinct arrival dates from immigration dataframe

In [47]:
#df_imm_cleaned.select("arrdate").show(5)
conv_SAS_date = udf(lambda SAS_date: (datetime(1960, 1, 1).date() + timedelta(SAS_date)).isoformat())
df_imm_cal = df_imm_cleaned. \
    select(df_imm_cleaned['arrdate']). \
    withColumn('arrdate', conv_SAS_date(df_imm_cleaned.arrdate)). \
    distinct().\
    withColumn('arrival_day', dayofmonth('arrdate')).\
    withColumn('arrival_week', weekofyear('arrdate')).\
    withColumn('arrival_month', month('arrdate')).\
    withColumn('arrival_year', year('arrdate')).\
    withColumn('arrival_weekday', dayofweek('arrdate'))

df_imm_cal.show(5)

+----------+-----------+------------+-------------+------------+---------------+
|   arrdate|arrival_day|arrival_week|arrival_month|arrival_year|arrival_weekday|
+----------+-----------+------------+-------------+------------+---------------+
|2016-04-22|         22|          16|            4|        2016|              6|
|2016-04-15|         15|          15|            4|        2016|              6|
|2016-04-18|         18|          16|            4|        2016|              2|
|2016-04-09|          9|          14|            4|        2016|              7|
|2016-04-11|         11|          15|            4|        2016|              2|
+----------+-----------+------------+-------------+------------+---------------+
only showing top 5 rows



In [48]:
df_imm_cal.count()

30

##### Create dataframe for visa type table

In [49]:
df_visa = df_imm_cleaned.\
    select('visatype').\
    distinct().\
    withColumn('visa_type_key', monotonically_increasing_id()).\
    withColumnRenamed('visatype', 'visa_type')
df_visa.printSchema()
df_visa.show()
#print(df_visa.distinct().count())


root
 |-- visa_type: string (nullable = true)
 |-- visa_type_key: long (nullable = false)

+---------+-------------+
|visa_type|visa_type_key|
+---------+-------------+
|       F2| 103079215104|
|      GMB| 352187318272|
|       B2| 369367187456|
|       F1| 498216206336|
|      CPL| 601295421440|
|       I1| 704374636544|
|       WB| 738734374912|
|       M1| 747324309504|
|       B1| 807453851648|
|       WT| 884763262976|
|       M2|1151051235328|
|       CP|1314259992576|
|      GMT|1331439861760|
|       E1|1348619730944|
|        I|1391569403904|
|       E2|1554778161152|
|      SBP|1709396983808|
+---------+-------------+



##### Create Country_temperature dimention table

In [50]:
country_list = []
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    lines = f.readlines()
    #print(len(lines))
    for line in lines:
        #print(type(line))
        country_code,country_name = line.split('=')
        #print(country_code.strip(), country_name.strip()[1:-1])
        country_name_final = country_name.strip()[1:-1].capitalize()
        #print(dict_country[country_name_final])
        dict_country = dict(country_code=float(country_code.strip()), country_name=country_name_final)
        country_list.append(dict_country)

#print(country_list)       
# creating a dataframe
import warnings
with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    # code that produces a warning
    country_code_df = spark.createDataFrame(country_list)
    
country_code_df.show()
country_code_df.printSchema()
#country_code_df.select('country_code').show()


+------------+--------------------+
|country_code|        country_name|
+------------+--------------------+
|       582.0|Mexico air sea, a...|
|       236.0|         Afghanistan|
|       101.0|             Albania|
|       316.0|             Algeria|
|       102.0|             Andorra|
|       324.0|              Angola|
|       529.0|            Anguilla|
|       518.0|     Antigua-barbuda|
|       687.0|          Argentina |
|       151.0|             Armenia|
|       532.0|               Aruba|
|       438.0|           Australia|
|       103.0|             Austria|
|       152.0|          Azerbaijan|
|       512.0|             Bahamas|
|       298.0|             Bahrain|
|       274.0|          Bangladesh|
|       513.0|            Barbados|
|       104.0|             Belgium|
|       581.0|              Belize|
+------------+--------------------+
only showing top 20 rows

root
 |-- country_code: double (nullable = true)
 |-- country_name: string (nullable = true)



In [51]:
  #dt|AverageTemperature| City|Country
conv_avg_temp_float = udf(lambda x : float(x))
#get_country_code = udf(lambda x : )    
df_country_temp = df_temp.\
    groupBy(col('country').alias('country_name')).\
    agg(avg(conv_avg_temp_float(col('AverageTemperature'))).alias('average_temperature')) #.\
    #withColumn('country_code', get_country_code())
#df_country_temp = df_country_temp.withColumn('country_code', dict_country.get(df_country_temp.country_name))  
    
df_country_temp.show()
#df_country_temp.count()

+------------+-------------------+
|country_name|average_temperature|
+------------+-------------------+
|        Chad| 27.189829394812683|
|      Russia| 3.3472679828734857|
|    Paraguay| 22.784014312977153|
|       Yemen| 25.768407664453854|
|     Senegal|  25.98417669449079|
|      Sweden|  5.665518003790279|
|      Guyana|  26.54984937439856|
| Philippines|   26.5164624674648|
|       Burma| 26.016839989290045|
|     Eritrea| 24.001515877771144|
|    Djibouti| 29.152790108564506|
|    Malaysia|  26.43475662438397|
|   Singapore| 26.523102826510677|
|      Turkey| 12.951888167466578|
|      Malawi| 21.347872026498056|
|        Iraq| 19.884738137449155|
|     Germany|  8.482790790264009|
| Afghanistan| 13.816496896263578|
|    Cambodia| 26.918136297728335|
|      Jordan| 18.360980886539238|
+------------+-------------------+
only showing top 20 rows



In [52]:
df_country_temp = df_country_temp.join(country_code_df, df_country_temp.country_name == country_code_df.country_name).\
                    select(country_code_df.country_code, df_country_temp.country_name, df_country_temp.average_temperature)

In [53]:
df_country_temp.show()

+------------+------------+-------------------+
|country_code|country_name|average_temperature|
+------------+------------+-------------------+
|       384.0|        Chad| 27.189829394812683|
|       693.0|    Paraguay| 22.784014312977153|
|       158.0|      Russia| 3.3472679828734857|
|       216.0|       Yemen| 25.768407664453854|
|       391.0|     Senegal|  25.98417669449079|
|       130.0|      Sweden|  5.665518003790279|
|       603.0|      Guyana|  26.54984937439856|
|       243.0|       Burma| 26.016839989290045|
|       372.0|     Eritrea| 24.001515877771144|
|       260.0| Philippines|   26.5164624674648|
|       322.0|    Djibouti| 29.152790108564506|
|       273.0|    Malaysia|  26.43475662438397|
|       207.0|   Singapore| 26.523102826510677|
|       264.0|      Turkey| 12.951888167466578|
|       345.0|      Malawi| 21.347872026498056|
|       250.0|        Iraq| 19.884738137449155|
|       112.0|     Germany|  8.482790790264009|
|       236.0| Afghanistan| 13.816496896

In [54]:
df_country_temp = df_country_temp.withColumn('country_code', df_country_temp.country_code.cast(IntegerType()))

In [55]:
df_country_temp.printSchema()

root
 |-- country_code: integer (nullable = true)
 |-- country_name: string (nullable = true)
 |-- average_temperature: double (nullable = true)



##### Create US Demographic table dataframe

In [56]:
df_demog = df_demog.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')
df_demog = df_demog.withColumn('id', monotonically_increasing_id())
df_demog.show(2)

+------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+---+
|        City|     State|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|state_code|                Race|Count| id|
+------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+---+
|Arden-Arcade|California|      41.5|          47596|            48680|           96276|              6511|       13458|                  2.18|        CA|Black or African-...|13647|  0|
| Bloomington| Minnesota|      40.9|          43318|            43118|           86436|              6176|       10728|                   2.3|        MN|Black or African-...| 5828|  1|
+------------+----------+----------+---------------+-----------------+-----

##### Create fact table f_immigration

In [57]:
df_fact =   df_imm.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94res', 'country_id') \
            .withColumnRenamed('i94addr', 'state_code') 
df_fact = df_fact.join(df_visa, df_visa.visa_type == df_imm.visatype)

get_datetime = udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat())
df_fact = df_fact.withColumn("arrdate", get_datetime(df_fact.arrdate))
df_fact = df_fact.join(df_country_temp, df_country_temp.country_code == df_fact.country_id)


#df_fact.drop('visatype', 'visa_type', 'country_name', 'average_temperature')
df_fact.show(2)

+---------+------+------+------+----------+-------+----------+-------+----------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+---------+-------------+------------+------------+-------------------+
|record_id| i94yr|i94mon|i94cit|country_id|i94port|   arrdate|i94mode|state_code|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|visa_type|visa_type_key|country_code|country_name|average_temperature|
+---------+------+------+------+----------+-------+----------+-------+----------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+---------+-------------+------------+------------+-------------------+
|5761447.0|2016.0|   4.0| 299.0|     299.0|    LOS|2016-04-30|    1.0|        CA|20603.0

In [58]:
df_fact = df_fact.withColumn('country_id', df_fact.country_id.cast(IntegerType()))
df_fact = df_fact.withColumn('visa_type_key', df_fact.visa_type_key.cast(IntegerType()))
df_fact = df_fact.withColumn('record_id', df_fact.record_id.cast(IntegerType()))
df_fact = df_fact.withColumn('arrdate', df_fact.arrdate.cast(DateType()))
df_fact.printSchema()
df_fact.show()

root
 |-- record_id: integer (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- country_id: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [59]:
# Perform quality checks here
dict_tables = {
    'f_immigration': df_fact,
    'd_visa_type': df_visa,
    'd_immigration_calendar': df_imm_cal,
    'd_usa_demographics': df_demog,
    'd_country_temp': df_country_temp
}

In [63]:
# Check if data is available in dataframes
for table, df in dict_tables.items():
    # quality check for table
    total_count = df.count()
    if total_count == 0:
        print(f"Failed: {table} has zero records!")
    else:
        print(f"Passed: {table} has {total_count} records.")

Passed: f_immigration has 1976407 records.
Passed: d_visa_type has 17 records.
Passed: d_immigration_calendar has 30 records.
Passed: d_usa_demographics has 2875 records.
Passed: d_country_temp has 131 records.


In [62]:
# Check if 'id' columns conatains unique values
for table, df in dict_tables.items():
    #print(df.columns)
    # quality check for table
    unique_row_count = int(df.select(countDistinct(df.columns[0])).first()[0])
    total_count = df.count()
    #print(f'{unique_row_count}:: {total_count}')
    if table == 'd_usa_demographics': continue
    if unique_row_count == total_count:
        print(f"Passed: {table}.{df.columns[0]} contains unique values")
    else: 
        print(f"Failed: {table}.{df.columns[0]} contains duplicate values in unique id column")

Passed: f_immigration.record_id contains unique values
Passed: d_visa_type.visa_type contains unique values
Passed: d_immigration_calendar.arrdate contains unique values
Passed: d_country_temp.country_code contains unique values


In [61]:
# Check if 'id' columns contains any null value
def check_null_id(df):
    null_count = df.filter(col(df.columns[0]) == '').count()
    #print(null_count)
    if null_count == 0:
        print(f"Passed: {df.columns[0]} unique id doesn't contain null values")
    else: (f"Failed: {df.columns[0]} unique id contains null value")

for table, df in dict_tables.items():
    check_null_id(df)

0
Passed: record_id unique id doesn't contain null values
0
Passed: visa_type unique id doesn't contain null values
0
Passed: arrdate unique id doesn't contain null values
0
Passed: City unique id doesn't contain null values
0
Passed: country_code unique id doesn't contain null values


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### Fact Table
|Feature|Description|
|-------|-----------|
record_id|Unique record ID
country_residence_code|3 digit code for immigrant country of residence
visa_type_key|A numerical key that links to the visa_type dimension table
state_code|US state of arrival
i94yr|4 digit year
i94mon|Numeric month
i94port|Port of admission
arrdate|Arrival Date in the USA
i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
i94addr|USA State of arrival
depdate|Departure Date from the USA
i94bir|Age of Respondent in Years
i94visa|Visa codes collapsed into three categories
count|Field used for summary statistics
dtadfile|Character Date Field - Date added to I-94 Files
visapost|Department of State where where Visa was issued
occup|Occupation that will be performed in U.S
entdepa|Arrival Flag - admitted or paroled into the U.S.
entdepd|Departure Flag - Departed, lost I-94 or is deceased
entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence
matflag|Match flag - Match of arrival and departure records
biryear|4 digit year of birth
dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)
gender|Non-immigrant sex

##### Country Dimension Table
* The `country_code` and `country_name` fields come from the labels description SAS file  
* the `average_temperature` data comes from the `GlobalLandTemperatureByCities` file.

Feature|Description
---------|----------
country_code|Unique country code
country_name|Name of country
average_temperature|Average temperature of country

##### Visa Type Dimension Table
* `visa_type` is taken from SAS files
* `visa_type_key` is generated column

Feature|Description
---------|----------
visa_type_key|Unique id for each visa issued
visa_type|Name of visa

##### Immigration Calendar Dimension Table
* The `arrdate` from the immigration dataset is extracted and trnasformed to date type.
* Also other columns are extracted from `arrdate` field

Feature|Description
---------|----------
arrdate|Arrival date into US
arrival_year|Arrival year into US
arrival_month|Arrival Month
arrival_day|Arrival Day
arrival_week|Arrival Week
arrival_weekday|Arrival WeekDay

##### US Demographics Dimension Table - data dictionary
This dataset is extracted from the `us-cities-demographics` file.

Feature|Description
---------|----------
id|Record id
state_code|US state code
City|City Name
State|US State where city is located
Median Age|Median age of the population
Male Population|Count of male population
Female Population|Count of female population
Total Population|Count of total population
Number of Veterans|Count of total Veterans
Foreign born|Count of residents of the city that were not born in the city
Average Household Size|Average city household size
Race|Respondent race
Count|Count of city's individual per race

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. Rationale for the choice of tools and technologies for the project

* Apache spark is selected for this project to develop an ETL tool which has following features:

* Spark enables to extract from various file format including SAS format using config jar files.
* Apache Spark processes dataframes in-memory enables to develop lightning-fast ETL tools for analytics purpose.
* Spark has vast APIs to work on big data for transformation of datatype to creating SQL Queries

2. US Immigration data is updated monthly hence the data should be updated monthly

3. Using spark we can develop scalable solution, hence Spark can handle data even if data volume increased by 100x

4. For data population of a dashboard that must be updated on a daily basis by 7am every day, the data pipeline (ETL) can be scheduled using Apache Airflow

5. For multiuser access of data, data should be loaded in a Datawarehouse eg. Amazon Redshift from ETL