# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this project data gathered from four datasets with different sources for analysing US immigration data in a simple star schema. The main aim is to provide analytics to answer business questions which can be analyze and provide insight into the pattern of immigration. The analysis questions can be answered based on the data model using simple joins.
Spark was used for the ETL pipeline and The final data is stored in parquet files for analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
#pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=8f966b7f6f932f9fe523d94cfb924bb5b5a78e554aaa85fdd6c31b1cd2374373
  Stored in directory: c:\users\ascom\appdata\local\pip\cache\wheels\6c\e3\9b\0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.2
Note: you may need to restart the kernel to use updated packages.


In [3]:
# all imports and installs 
from datetime import datetime, timedelta, date
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

RuntimeError: Java gateway process exited before sending its port number

### Step 1: Scope the Project and Gather Data

#### Scope 
plan to do in the project is create a simple star schema data warehouse with 1 fact table and 5 dimensional tables saved in parquet format, for analytical purposes that allows better undersanding of the immigration trends to the US using 4 datasets I94 Immigration Data, World Temperature Data, U.S. City Demographic Data, and Airport Code Table, and manipulate them using Pyspark.

#### Describe and Gather Data 
The data sets:
[I94 Immigration Data] This data comes from the US National Tourism and Trade Office. the data in csv format.
includes data about the immigrants also the year, month, arrival and depture dates of immigrations and more.
[World Temperature Data]: dataset came from Kaggle. includes the date and average temperature for cities
[U.S. City Demographic Data]: This data comes from OpenSoft. includes demographics data for each city in U.S.
[Airport Code Table]: This data comes from datahub.io. includes a simple table of airport codes and corresponding cities. 

In [3]:
#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet
#df_spark.write.parquet("sas_data")

In [15]:
# Read in the immigration data sample
df_immigration=spark.read.parquet("sas_data")

#df_immigration=pd.read_csv("immigration_data_sample.csv")
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [22]:
# Read in the us-cities-demographics data
df_cities=spark.read.csv("us-cities-demographics.csv",header=True,sep=';')
df_cities.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [19]:
# Read in the airport-codes_csv data
df_airport=spark.read.csv("airport-codes_csv.csv",header=True)
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [3]:
# Read in the Tempratures data
df_temperature = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv', header=True)
df_temperature.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Immigration Data Cleaning

In [16]:
# Performing cleaning tasks

# Drop unnecessary columns in immigration data sample (df_immigration)

df_immigration = df_immigration.drop('count','occup','entdepa','entdepd','entdepu','matflag','insnum','admnum','dtadfile')
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,SYD,1976.0,10292016,F,QF,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,SYD,1984.0,10292016,F,VA,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,SYD,1987.0,10292016,M,DL,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,SYD,1987.0,10292016,F,DL,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,SYD,1988.0,10292016,M,DL,40,B1


In [17]:
#Dropping any rows of immigration DataFrame with duplicate CICID.
df_immigration = df_immigration.dropDuplicates(["cicid"])
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype
0,299.0,2016.0,4.0,103.0,103.0,NYC,20545.0,1.0,NY,20550.0,54.0,2.0,,1962.0,6292016,,OS,87,WT
1,305.0,2016.0,4.0,103.0,103.0,NYC,20545.0,1.0,NY,20555.0,63.0,2.0,,1953.0,6292016,,OS,87,WT
2,496.0,2016.0,4.0,103.0,103.0,CHI,20545.0,1.0,IL,20548.0,64.0,1.0,,1952.0,6292016,,OS,65,WB
3,558.0,2016.0,4.0,103.0,103.0,SFR,20545.0,1.0,CA,20547.0,42.0,1.0,,1974.0,6292016,M,LH,454,WB
4,596.0,2016.0,4.0,103.0,103.0,NAS,20545.0,1.0,FL,20547.0,24.0,2.0,,1992.0,6292016,M,UP,221,WT


In [18]:
# Drop rows with 100% missing values.

df_immigration = df_immigration.dropna(how='all')
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype
0,299.0,2016.0,4.0,103.0,103.0,NYC,20545.0,1.0,NY,20550.0,54.0,2.0,,1962.0,6292016,,OS,87,WT
1,305.0,2016.0,4.0,103.0,103.0,NYC,20545.0,1.0,NY,20555.0,63.0,2.0,,1953.0,6292016,,OS,87,WT
2,496.0,2016.0,4.0,103.0,103.0,CHI,20545.0,1.0,IL,20548.0,64.0,1.0,,1952.0,6292016,,OS,65,WB
3,558.0,2016.0,4.0,103.0,103.0,SFR,20545.0,1.0,CA,20547.0,42.0,1.0,,1974.0,6292016,M,LH,454,WB
4,596.0,2016.0,4.0,103.0,103.0,NAS,20545.0,1.0,FL,20547.0,24.0,2.0,,1992.0,6292016,M,UP,221,WT


In [19]:
#Remove rows with missing values in i94port, i94addr
df_immigration = df_immigration.dropna(how="any", subset=["i94port", "i94addr"])

In [20]:
# Get the states_codes.
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
state_codes = code_mapper(f_content, "i94addrl")
list_map = list(map(list, state_codes.items()))
state_codes_df = spark.createDataFrame(list_map, ['state_code', 'state'])
state_codes_df.limit(5).toPandas()

Unnamed: 0,state_code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA
5,CO,COLORADO
6,CT,CONNECTICUT
7,DE,DELAWARE
8,DC,DIST. OF COLUMBIA
9,FL,FLORIDA


In [23]:
# Create user defined function to validate 'state' data
valid_states = df_cities.toPandas()["State Code"].unique()
print(valid_states)

@udf(StringType())
def validate_state(s): 
    """ check for US states """
    if s in valid_states:
        return s
    return 'other'

['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [24]:
# Extract data with valid states
df_immigration = df_immigration.withColumn("i94addr" , validate_state(df_immigration.i94addr))
df_immigration.select("i94addr").distinct().toPandas()

Unnamed: 0,i94addr
0,AZ
1,SC
2,LA
3,MN
4,NJ
5,DC
6,OR
7,VA
8,RI
9,KY


In [25]:
# Keep US state data ( state != 'other')
df_immigration = df_immigration.filter(df_immigration.i94addr != 'other') 
df_immigration.select("i94addr").distinct().toPandas()

Unnamed: 0,i94addr
0,AZ
1,SC
2,LA
3,MN
4,NJ
5,DC
6,OR
7,VA
8,RI
9,KY


In [26]:
# convert SAS date into Pyspark date.
convert_date = F.udf(lambda x: (datetime(1960, 1, 1).date() + timedelta(x)).isoformat() if x else None)

df_immigration = df_immigration.withColumn('arrivalDate', convert_date('arrdate'))\
                            .withColumn('departureDate', convert_date('depdate'))\
                            .drop('arrdate','depdate')
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94mode,i94addr,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype,arrivalDate,departureDate
0,299.0,2016.0,4.0,103.0,103.0,NYC,1.0,NY,54.0,2.0,,1962.0,6292016,,OS,87,WT,2016-04-01,2016-04-06
1,305.0,2016.0,4.0,103.0,103.0,NYC,1.0,NY,63.0,2.0,,1953.0,6292016,,OS,87,WT,2016-04-01,2016-04-11
2,496.0,2016.0,4.0,103.0,103.0,CHI,1.0,IL,64.0,1.0,,1952.0,6292016,,OS,65,WB,2016-04-01,2016-04-04
3,558.0,2016.0,4.0,103.0,103.0,SFR,1.0,CA,42.0,1.0,,1974.0,6292016,M,LH,454,WB,2016-04-01,2016-04-03
4,596.0,2016.0,4.0,103.0,103.0,NAS,1.0,FL,24.0,2.0,,1992.0,6292016,M,UP,221,WT,2016-04-01,2016-04-03


In [27]:
# check out there are rows with departure date being earlier than arrival date, which should be impossible. Let's clean this.
df_immigration = df_immigration.where('departureDate > arrivalDate')
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94mode,i94addr,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype,arrivalDate,departureDate
0,299.0,2016.0,4.0,103.0,103.0,NYC,1.0,NY,54.0,2.0,,1962.0,6292016,,OS,87,WT,2016-04-01,2016-04-06
1,305.0,2016.0,4.0,103.0,103.0,NYC,1.0,NY,63.0,2.0,,1953.0,6292016,,OS,87,WT,2016-04-01,2016-04-11
2,496.0,2016.0,4.0,103.0,103.0,CHI,1.0,IL,64.0,1.0,,1952.0,6292016,,OS,65,WB,2016-04-01,2016-04-04
3,558.0,2016.0,4.0,103.0,103.0,SFR,1.0,CA,42.0,1.0,,1974.0,6292016,M,LH,454,WB,2016-04-01,2016-04-03
4,596.0,2016.0,4.0,103.0,103.0,NAS,1.0,FL,24.0,2.0,,1992.0,6292016,M,UP,221,WT,2016-04-01,2016-04-03


In [28]:
#fix datatypes and rename columns
df_immigration = df_immigration.withColumn("cic_id",col("cicid").cast(IntegerType())).drop("cicid") \
            .withColumn("arrive_year",col('i94yr').cast(IntegerType())).drop("i94yr") \
            .withColumn("arrival_date", col("arrivalDate").cast(DateType())).drop("arrivalDate") \
            .withColumn("departure_date", col("departureDate").cast(DateType())).drop("departureDate") \
            .withColumn("arrive_month",col('i94mon').cast(IntegerType())).drop("i94mon") \
            .withColumn("citizen_country",col('i94cit').cast(IntegerType())).drop("i94cit") \
            .withColumn("resident_country",col('i94res').cast(IntegerType())).drop("i94res") \
            .withColumn("age",col('i94bir').cast(IntegerType())).drop("i94bir") \
            .withColumn("birth_year",col('biryear').cast(IntegerType())).drop("biryear") \
            .withColumn("visa_class",col('i94visa').cast(IntegerType())).drop("i94visa") \
            .withColumn("mode",col('i94mode').cast(IntegerType())).drop("i94mode") \
            .withColumn("allowed_date", to_date("dtaddto", "MMddyyyy")).drop("dtaddto") \
            .withColumnRenamed("i94port", "port") \
            .withColumnRenamed("i94addr","arrive_state") \
            .withColumnRenamed("fltno","flight_num") \
            .withColumnRenamed("visatype","visa_type") \
            .withColumnRenamed("visapost","visa_issue_state")

df_immigration.limit(5).toPandas()

Unnamed: 0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrival_date,departure_date,arrive_month,citizen_country,resident_country,age,birth_year,visa_class,mode,allowed_date
0,NYC,NY,,,OS,87,WT,299,2016,2016-04-01,2016-04-06,4,103,103,54,1962,2,1,2016-06-29
1,NYC,NY,,,OS,87,WT,305,2016,2016-04-01,2016-04-11,4,103,103,63,1953,2,1,2016-06-29
2,CHI,IL,,,OS,65,WB,496,2016,2016-04-01,2016-04-04,4,103,103,64,1952,1,1,2016-06-29
3,SFR,CA,,M,LH,454,WB,558,2016,2016-04-01,2016-04-03,4,103,103,42,1974,1,1,2016-06-29
4,NAS,FL,,M,UP,221,WT,596,2016,2016-04-01,2016-04-03,4,103,103,24,1992,2,1,2016-06-29


In [None]:
df_immigration.printSchema()

In [29]:
# filling missing/NaN values with (unknown)
df_immigration = df_immigration.na.fill(value='unknown',subset=["port","arrive_state","visa_issue_state","gender","airline"])
df_immigration.limit(5).toPandas()

Unnamed: 0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrival_date,departure_date,arrive_month,citizen_country,resident_country,age,birth_year,visa_class,mode,allowed_date
0,NYC,NY,unknown,unknown,OS,87,WT,299,2016,2016-04-01,2016-04-06,4,103,103,54,1962,2,1,2016-06-29
1,NYC,NY,unknown,unknown,OS,87,WT,305,2016,2016-04-01,2016-04-11,4,103,103,63,1953,2,1,2016-06-29
2,CHI,IL,unknown,unknown,OS,65,WB,496,2016,2016-04-01,2016-04-04,4,103,103,64,1952,1,1,2016-06-29
3,SFR,CA,unknown,M,LH,454,WB,558,2016,2016-04-01,2016-04-03,4,103,103,42,1974,1,1,2016-06-29
4,NAS,FL,unknown,M,UP,221,WT,596,2016,2016-04-01,2016-04-03,4,103,103,24,1992,2,1,2016-06-29


In [None]:
df_immigration.select('visa_class').distinct().show()

In [30]:
df_immigration = df_immigration.withColumn('visa_class', when(df_immigration.visa_class == 1, 'Business' )\
                                                         .when(df_immigration.visa_class == 2, 'Pleasure')\
                                                         .when(df_immigration.visa_class == 3, 'Student' ))

In [31]:
df_immigration = df_immigration.withColumn('mode', when(df_immigration.mode == 1, 'Air' )\
                                                    .when(df_immigration.mode == 2, 'Sea')\
                                                    .when(df_immigration.mode == 3, 'Land' )\
                                                    .when(df_immigration.mode == 9, 'Not reported' ))

In [32]:
df_immigration.limit(5).toPandas()

Unnamed: 0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrival_date,departure_date,arrive_month,citizen_country,resident_country,age,birth_year,visa_class,mode,allowed_date
0,NYC,NY,unknown,unknown,OS,87,WT,299,2016,2016-04-01,2016-04-06,4,103,103,54,1962,Pleasure,Air,2016-06-29
1,NYC,NY,unknown,unknown,OS,87,WT,305,2016,2016-04-01,2016-04-11,4,103,103,63,1953,Pleasure,Air,2016-06-29
2,CHI,IL,unknown,unknown,OS,65,WB,496,2016,2016-04-01,2016-04-04,4,103,103,64,1952,Business,Air,2016-06-29
3,SFR,CA,unknown,M,LH,454,WB,558,2016,2016-04-01,2016-04-03,4,103,103,42,1974,Business,Air,2016-06-29
4,NAS,FL,unknown,M,UP,221,WT,596,2016,2016-04-01,2016-04-03,4,103,103,24,1992,Pleasure,Air,2016-06-29


In [None]:
df_immigration.select("visa_issue_state").distinct().toPandas()

In [33]:
# Get the country_codes.
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
country_codes = code_mapper(f_content, "i94cntyl")
list_map = list(map(list, country_codes.items()))
country_codes_df = spark.createDataFrame(list_map, ['country_code', 'country'])
country_codes_df.limit(5).toPandas()

Unnamed: 0,country_code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [34]:
country_codes_df.withColumn("country_code", col("country_code").cast(IntegerType()))

DataFrame[country_code: int, country: string]

In [35]:
# join the immigration df with the country_codes_df

df_immigration = df_immigration.join(country_codes_df, df_immigration['citizen_country'] == country_codes_df.country_code, \
                                     how = 'left')\
                             .withColumnRenamed('country', 'citizen_country_name')\
                             .drop('country_code')\
                             .drop('citizen_country')\
                             .join(country_codes_df, df_immigration['resident_country'] == country_codes_df.country_code, \
                                   how = 'left')\
                             .withColumnRenamed('country', 'resident_country_name')\
                             .drop('country_code')\
                             .drop('resident_country')

df_immigration.limit(10).toPandas()

Unnamed: 0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrival_date,departure_date,arrive_month,age,birth_year,visa_class,mode,allowed_date,citizen_country_name,resident_country_name
0,NYC,CT,unknown,M,CX,840,WT,2018851,2016,2016-04-11,2016-04-23,4,30,1986,Pleasure,Air,2016-07-09,,BURMA
1,HHW,HI,unknown,unknown,NQ,182,WT,1835040,2016,2016-04-10,2016-04-19,4,48,1968,Pleasure,Air,2016-07-08,JAPAN,BURMA
2,HHW,HI,unknown,unknown,NQ,182,WT,1835041,2016,2016-04-10,2016-04-19,4,9,2007,Pleasure,Air,2016-07-08,JAPAN,BURMA
3,FTL,IL,unknown,unknown,NH,12,WT,2352309,2016,2016-04-13,2016-04-17,4,27,1989,Pleasure,Air,2016-07-11,JAPAN,BURMA
4,NYC,FL,unknown,M,DL,172,WT,680670,2016,2016-04-04,2016-04-11,4,61,1955,Pleasure,Air,2016-07-02,SINGAPORE,BURMA
5,NYC,FL,unknown,F,DL,172,WT,680669,2016,2016-04-04,2016-04-11,4,63,1953,Pleasure,Air,2016-07-02,SINGAPORE,BURMA
6,LVG,CA,KLL,M,KE,5,B2,1422788,2016,2016-04-08,2016-04-17,4,61,1955,Pleasure,Air,2016-10-07,MALAYSIA,BURMA
7,NYC,PA,unknown,M,CX,830,WT,1650443,2016,2016-04-09,2016-04-19,4,33,1983,Pleasure,Air,2016-07-07,AUSTRALIA,BURMA
8,NYC,NY,unknown,M,CX,830,B2,2951936,2016,2016-04-15,2016-04-29,4,29,1987,Pleasure,Air,2016-10-14,MAURITANIA,BURMA
9,NYC,FL,RNG,M,SQ,26,B1,431506,2016,2016-04-02,2016-06-14,4,20,1996,Business,Air,2016-10-01,,BURMA


#### Airport Data Cleaning

In [20]:
#Dropping any rows with duplicate ident.
df_airport = df_airport.dropDuplicates(["ident"])
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,06IN,closed,Ellis Fly-In Airport,575,,US,US-IN,Blackhawk,,,,"-87.303596, 39.282799"
1,06VA,small_airport,Mount Horeb Field,1160,,US,US-VA,Grottoes,06VA,,06VA,"-78.85530090332031, 38.249000549316406"
2,0LA0,heliport,West Hackberry Heliport,10,,US,US-LA,Hackberry,0LA0,,0LA0,"-93.40019989013672, 30.008499145507812"
3,0MD6,small_airport,Walters Airport,750,,US,US-MD,Mount Airy,0MD6,,0MD6,"-77.10579681396484, 39.38119888305664"
4,0OH7,small_airport,Apple Airport,1000,,US,US-OH,Piqua,0OH7,,0OH7,"-84.1718978881836, 40.1432991027832"


In [21]:
#Drop rows with 100% missing values.

df_airport = df_airport.dropna(how='all')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,06IN,closed,Ellis Fly-In Airport,575,,US,US-IN,Blackhawk,,,,"-87.303596, 39.282799"
1,06VA,small_airport,Mount Horeb Field,1160,,US,US-VA,Grottoes,06VA,,06VA,"-78.85530090332031, 38.249000549316406"
2,0LA0,heliport,West Hackberry Heliport,10,,US,US-LA,Hackberry,0LA0,,0LA0,"-93.40019989013672, 30.008499145507812"
3,0MD6,small_airport,Walters Airport,750,,US,US-MD,Mount Airy,0MD6,,0MD6,"-77.10579681396484, 39.38119888305664"
4,0OH7,small_airport,Apple Airport,1000,,US,US-OH,Piqua,0OH7,,0OH7,"-84.1718978881836, 40.1432991027832"


In [22]:
# split coordinates column into latitude and longitude

df_airport=df_airport.withColumn('latitude',split(df_airport['coordinates'],',').getItem(0))\
                    .withColumn('longitude',split(df_airport['coordinates'],',').getItem(1))\
                    .drop('coordinates')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,06IN,closed,Ellis Fly-In Airport,575,,US,US-IN,Blackhawk,,,,-87.303596,39.282799
1,06VA,small_airport,Mount Horeb Field,1160,,US,US-VA,Grottoes,06VA,,06VA,-78.85530090332031,38.249000549316406
2,0LA0,heliport,West Hackberry Heliport,10,,US,US-LA,Hackberry,0LA0,,0LA0,-93.40019989013672,30.00849914550781
3,0MD6,small_airport,Walters Airport,750,,US,US-MD,Mount Airy,0MD6,,0MD6,-77.10579681396484,39.38119888305664
4,0OH7,small_airport,Apple Airport,1000,,US,US-OH,Piqua,0OH7,,0OH7,-84.1718978881836,40.1432991027832


In [23]:
df_airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|   balloonport|
| seaplane_base|
|      heliport|
|        closed|
|medium_airport|
| small_airport|
+--------------+



In [24]:
# Clean airports dataset by filter only type = (small / medium / large) airports
df_airport = df_airport.filter( (df_airport["type"] == "small_airport") | (df_airport["type"]=="medium_airport") | (df_airport["type"] == "large_airport") ) 
df_airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|medium_airport|
| small_airport|
+--------------+



In [25]:
# get state code from iso_region
df_airport = df_airport.withColumn('state', split(df_airport['iso_region'], '-').getItem(1))
df_airport.limit(5).toPandas()
df_airport = df_airport.drop('iso_region')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,latitude,longitude,state
0,06VA,small_airport,Mount Horeb Field,1160,,US,Grottoes,06VA,,06VA,-78.85530090332031,38.249000549316406,VA
1,0MD6,small_airport,Walters Airport,750,,US,Mount Airy,0MD6,,0MD6,-77.10579681396484,39.38119888305664,MD
2,0OH7,small_airport,Apple Airport,1000,,US,Piqua,0OH7,,0OH7,-84.1718978881836,40.1432991027832,OH
3,0OK9,small_airport,Crystal Airport,1016,,US,Jennings,0OK9,,0OK9,-96.63700103759766,36.2134017944336,OK
4,16KY,small_airport,Praise God Airport,1070,,US,Carter,16KY,,16KY,-83.1227035522461,38.44400024414063,KY


In [26]:
# since df_immigration.port is df_airport.iata_code, so we don't need null in it.
# Drop rows with missing values in iata_code.

df_airport = df_airport.dropna(how='all',subset=['iata_code'])
df_airport.select('iata_code').distinct().show()

+---------+
|iata_code|
+---------+
|      DWR|
|      KEB|
|      MIZ|
|      VCH|
|      BGM|
|      UAB|
|      SCW|
|      CCK|
|      GIS|
|      TNP|
|      KGL|
|      KMU|
|      FMY|
|      LEN|
|      LEB|
|      PMK|
|      PKE|
|      GZW|
|      SGT|
|      CNU|
+---------+
only showing top 20 rows



In [27]:
df_airport.select('iata_code').where('iata_code IS NULL').count()

0

In [28]:
df_airport.count()

8699

In [29]:
# Clean airports dataset by filter only iso_country = US
df_airport = df_airport.filter(df_airport["iso_country"] == "US")
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,latitude,longitude,state
0,KAVP,medium_airport,Wilkes Barre Scranton International Airport,962,,US,Wilkes-Barre/Scranton,KAVP,AVP,AVP,-75.72339630130001,41.3385009766,PA
1,KAXS,small_airport,Altus Quartz Mountain Regional Airport,1433,,US,Altus,KAXS,AXS,AXS,-99.3385,34.697952,OK
2,KCKA,small_airport,Kegelman AF Aux Field,1202,,US,Cherokee,KCKA,CKA,CKA,-98.1231002808,36.7439002991,OK
3,KEB,small_airport,Nanwalek Airport,27,,US,Nanwalek,KEB,KEB,KEB,-151.925003052,59.3521003723,AK
4,KFSI,medium_airport,Henry Post Army Air Field (Fort Sill),1189,,US,Fort Sill,KFSI,FSI,FSI,-98.40219879,34.64979935,OK


In [34]:
df_airport.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- state: string (nullable = true)



#### us-cities-demographics Cleaning

In [4]:
#Drop rows with 100% missing values.
df_cities = df_cities.dropna(how='all')

In [5]:
#Dropping duplicate rows.
df_cities = df_cities.dropDuplicates(['City', 'State', 'Race'])
df_cities.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Alafaya,Florida,33.5,39504,45760,85264,4176,15842,2.94,FL,White,63666
1,Baldwin Park,California,35.8,38747,38309,77056,780,34322,4.13,CA,Black or African-American,1560
2,Houston,Texas,32.6,1149686,1148942,2298628,71898,696210,2.66,TX,Asian,173854
3,Las Cruces,New Mexico,32.7,47835,53809,101644,9421,11888,2.58,NM,White,91201
4,Missouri City,Texas,37.2,34932,36846,71778,4274,18556,3.03,TX,Asian,17854


In [33]:
df_cities.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [6]:
# change datatypes, format column names 
df_cities = df_cities.withColumn("median_age",col("Median Age").cast(FloatType())).drop("Median Age") \
                    .withColumn("male_population",col("Male Population").cast(IntegerType())).drop("Male Population") \
                    .withColumn("female_population",col("Female Population").cast(IntegerType())).drop("Female Population") \
                    .withColumn("total_population",col("Total Population").cast(IntegerType())).drop("Total Population") \
                    .withColumn("veterans_num",col("Number of Veterans").cast(IntegerType())).drop("Number of Veterans") \
                    .withColumn("foreign_born_population",col("Foreign-born").cast(IntegerType())).drop("Foreign-born") \
                    .withColumn("avg_household_size",col("Average Household Size").cast(FloatType())).drop("Average Household Size") \
                    .withColumn("count",col("Count").cast(IntegerType())) \
                    .withColumnRenamed("City", "city") \
                    .withColumnRenamed("State", "state") \
                    .withColumnRenamed("State Code", "state_code") \
                    .withColumnRenamed("Race", "race")

In [7]:
# pivot table to make each race population into seperate columns, change column names
df_cities = df_cities.groupBy(col("city"),col("state"),col("median_age"),col("male_population"),col("female_population")\
                            ,col("total_population"),col("veterans_num"),col("foreign_born_population"),col("avg_household_size") \
                            ,col("state_code")) \
                    .pivot("race").agg(sum("count")) \
                    .fillna({"American Indian and Alaska Native": 0,
                     "Asian": 0,
                     "Black or African-American": 0,
                     "Hispanic or Latino": 0,
                     "White": 0}) \
                    .withColumnRenamed("American Indian and Alaska Native", "american_indian_alaska_native") \
                    .withColumnRenamed("Asian","asian") \
                    .withColumnRenamed("Black or African-American","african_american") \
                    .withColumnRenamed("Hispanic or Latino","hispanic_latino") \
                    .withColumnRenamed("White","white")

In [45]:
df_cities.sort("state").limit(7).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,veterans_num,foreign_born_population,avg_household_size,state_code,american_indian_alaska_native,asian,african_american,hispanic_latino,white
0,Mobile,Alabama,38.0,91275,103030,194305,11939,7234,2.4,AL,2816,5518,96397,5229,93755
1,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,0,4759,18191,3430,61869
2,Tuscaloosa,Alabama,29.1,47293,51045,98338,3647,4706,2.67,AL,261,2733,42331,2475,52603
3,Dothan,Alabama,38.900002,32172,35364,67536,6334,1699,2.59,AL,656,1175,23243,1704,43516
4,Huntsville,Alabama,38.099998,91764,97350,189114,16637,12691,2.18,AL,1755,6566,61561,10887,121904
5,Birmingham,Alabama,35.599998,102122,112789,214911,13212,8258,2.21,AL,1319,1500,157985,8940,51728
6,Montgomery,Alabama,35.400002,94582,106004,200586,14955,9337,2.41,AL,1277,6518,121360,6648,73545


In [8]:
# group table by state
df_cities= df_cities.groupBy(col("state_code"),col("state"))\
            .agg(avg("median_age").cast(IntegerType()).alias("median_age"),\
                 sum("male_population").cast(IntegerType()).alias("male_population"),\
                 sum("female_population").cast(IntegerType()).alias("female_population"),\
                 sum("total_population").cast(IntegerType()).alias("total_population"),\
                 sum("veterans_num").cast(IntegerType()).alias("veterans_num"),\
                 sum("foreign_born_population").cast(IntegerType()).alias("foreign_born_population"),\
                 avg("avg_household_size").cast(IntegerType()).alias("avg_household_size"),\
                 sum("american_indian_alaska_native").cast(IntegerType()).alias("american_indian_alaska_native"),
                 sum("asian").cast(IntegerType()).alias("asian"),\
                 sum("african_american").cast(IntegerType()).alias("african_american"),\
                 sum("hispanic_latino").cast(IntegerType()).alias("hispanic_latino"),\
                 sum("white").cast(IntegerType()).alias("white"))
df_cities.limit(5).toPandas()

Unnamed: 0,state_code,state,median_age,male_population,female_population,total_population,veterans_num,foreign_born_population,avg_household_size,american_indian_alaska_native,asian,african_american,hispanic_latino,white
0,MT,Montana,35,87707,93587,181294,13854,5977,2,9684,4165,3349,10000,169026
1,NC,North Carolina,33,1466105,1594094,3060199,166146,379327,2,35209,178740,1029446,354409,1790136
2,MD,Maryland,36,627951,684178,1312129,64143,229794,2,16155,128839,573768,138644,594522
3,CO,Colorado,35,1454619,1481050,2935669,187896,337631,2,62613,148790,208043,703722,2463916
4,CT,Connecticut,34,432157,453424,885581,24953,225866,2,10729,48311,231822,309992,505674


In [54]:
df_cities.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- veterans_num: integer (nullable = true)
 |-- foreign_born_population: integer (nullable = true)
 |-- avg_household_size: integer (nullable = true)
 |-- american_indian_alaska_native: integer (nullable = true)
 |-- asian: integer (nullable = true)
 |-- african_american: integer (nullable = true)
 |-- hispanic_latino: integer (nullable = true)
 |-- white: integer (nullable = true)



#### Tempratures data Cleaning

In [62]:
df_temperature.sort(df_temperature.City).limit(9).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,10.779000000000002,1.942,A Coruña,Spain,42.59N,8.73W
1,1744-07-01,17.992,1.849,A Coruña,Spain,42.59N,8.73W
2,1743-12-01,,,A Coruña,Spain,42.59N,8.73W
3,1744-01-01,,,A Coruña,Spain,42.59N,8.73W
4,1744-02-01,,,A Coruña,Spain,42.59N,8.73W
5,1744-03-01,,,A Coruña,Spain,42.59N,8.73W
6,1744-04-01,13.325,2.137,A Coruña,Spain,42.59N,8.73W
7,1744-05-01,12.9,1.896,A Coruña,Spain,42.59N,8.73W
8,1744-06-01,16.41,1.893,A Coruña,Spain,42.59N,8.73W


In [63]:
df_temperature.select('Country').where(col("Country").like("U%")).distinct().toPandas()

Unnamed: 0,Country
0,United States
1,Ukraine
2,Uruguay
3,Uganda
4,United Arab Emirates
5,Uzbekistan
6,United Kingdom


In [4]:
# filter data with United States country only
df_temperature = df_temperature.filter(df_temperature.Country == 'United States')   

In [5]:
# remove duplicates 
df_temperature = df_temperature.dropDuplicates(['dt', 'City', 'Country'])

In [6]:
# Drop rows with 100% missing values and rows which have missing values in important data field "AverageTemperature"

df_temperature = df_temperature.dropna(how='all', subset=['AverageTemperature'])

In [7]:
# explore the data between which dates
df_temperature.select(max(df_temperature.dt).alias("max date"), min(df_temperature.dt).alias("min date")).toPandas()

Unnamed: 0,max date,min date
0,2013-09-01,1743-11-01


In [8]:
df_temperature.select(df_temperature.dt).where(df_temperature.dt >= '2013-01-01').distinct().toPandas()

Unnamed: 0,dt
0,2013-09-01
1,2013-05-01
2,2013-02-01
3,2013-07-01
4,2013-03-01
5,2013-06-01
6,2013-04-01
7,2013-08-01
8,2013-01-01


In [9]:
# Drop anything before 2013 to use for averages
df_temperature = df_temperature.filter(df_temperature.dt >= '2013-01-01')

In [74]:
df_temperature.printSchema()

root
 |-- month: integer (nullable = true)
 |-- average_temperature: double (nullable = true)
 |-- average_temperature_uncertainty: double (nullable = true)



In [10]:
# change datatypes, format column names 
df_temperature = df_temperature.withColumn("month", month(df_temperature.dt))\
    .withColumn("date",col("dt").cast(DateType())).drop("dt") \
    .withColumn("average_temperature",col("AverageTemperature").cast(FloatType())).drop("AverageTemperature") \
    .withColumn("average_temperature_uncertainty",col("AverageTemperatureUncertainty").cast(FloatType())).drop("AverageTemperatureUncertainty")\
    .withColumnRenamed("City", "city") \
    .withColumnRenamed("Country", "country") \
    .withColumnRenamed("Latitude", "latitude") \
    .withColumnRenamed("Longitude", "longitude") 

In [11]:
df_temperature.limit(10).toPandas()

Unnamed: 0,city,country,latitude,longitude,month,date,average_temperature,average_temperature_uncertainty
0,Houston,United States,29.74N,96.00W,1,2013-01-01,12.55,0.447
1,Mesa,United States,32.95N,112.02W,2,2013-02-01,11.201,0.292
2,Anaheim,United States,32.95N,117.77W,3,2013-03-01,15.344,0.486
3,Newark,United States,40.99N,74.56W,4,2013-04-01,9.723,0.355
4,Cleveland,United States,40.99N,80.95W,5,2013-05-01,16.789,0.263
5,Norfolk,United States,36.17N,75.58W,5,2013-05-01,19.249001,0.369
6,Rancho Cucamonga,United States,34.56N,116.76W,6,2013-06-01,26.841999,0.488
7,Tacoma,United States,47.42N,121.97W,7,2013-07-01,17.033001,0.248
8,Thornton,United States,39.38N,104.05W,7,2013-07-01,22.065001,0.235
9,Memphis,United States,34.56N,89.51W,9,2013-09-01,25.007,1.118


In [12]:
df_temperature = df_temperature.groupBy(col("month"))\
                .agg(avg("average_temperature").alias("average_temperature"),\
                    avg("average_temperature_uncertainty").alias("average_temperature_uncertainty"))
df_temperature.toPandas()

Unnamed: 0,month,average_temperature,average_temperature_uncertainty
0,1,5.569964,0.379331
1,6,23.26252,0.305762
2,3,9.214806,0.330601
3,5,18.67054,0.316173
4,9,22.404085,1.064927
5,4,13.813677,0.343258
6,8,24.15556,0.384395
7,7,24.947448,0.306347
8,2,6.118089,0.3245


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
As previously mentioned, the chosen data model was the star schema, That model was the chosen one because it allows great performance, and it also allows users to write simple queries joining the fact and dimension tables in order to achieve the analytical dataset they need and perform BI solutions.


#### Fact table: 
- 'immigrations' table: fk(cic_id, port, arrive_state, arrive_month, arrival_date, departure_date, allowed_date)

#### Dimensions tables: 
- #1: 'immigrants' table: pk: cic_id, birth_year, age, gender, airline, flight_num, visa_type, visa_class, visa_issue_state, mode, citizen_country, resident_country

- #2: 'demographics' table: pk: state_code, state, median_age, male_population, female_population, total_population, veterans_num, foreign_born_population, avg_household_size, american_indian_alaska_native, asian, african_american, hispanic_latino, white

- #3: 'airports' table: pk: iata_code, name, type, state, elevation_ft, latitude, longitude

- #4: 'temperatures' table: pk: month, average_temperature, average_temperature_uncertainty

- #5: 'date' table: pk: date_key, date, day, month, year, weekday

#### 3.2 Mapping Out Data Pipelines
the steps necessary to pipeline the data into the chosen data model:

- Extract Data for immigrants Table from immigration dataframe and Write data into parquet files
- Extract Data for demographics Table from demographics dataframe and Write data into parquet files
- Extract Data for airports Table from airports dataframe and Write data into parquet files
- Extract Data for temperatures Table from temperatures dataframe and Write data into parquet files
- Extract Data for Date Table from immigration dataframe and Write data into parquet files
- Extract Data and immigrations Table from immigration dataframe and Write data into parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### #1: 'immigrants' Table
#### Extract Data for immigrants Table
- Select columns for cic_id, birth_year, age, gender, airline, flight_num, visa_type, visa_class, visa_issue_state, mode, citizen_country, resident_country

In [67]:
immigrants_table = df_immigration.select('cic_id', 'birth_year', 'age', 'gender', 'airline', 'flight_num', 'visa_type', 'visa_class',\
                                       'visa_issue_state', 'mode', 'citizen_country_name', 'resident_country_name')

In [69]:
immigrants_table.limit(1).toPandas()

Unnamed: 0,cic_id,birth_year,age,gender,airline,flight_num,visa_type,visa_class,visa_issue_state,mode,citizen_country_name,resident_country_name
0,2018851,1986,30,M,CX,840,WT,Pleasure,unknown,Air,,BURMA


In [70]:
# write immigrates table to parquet files partitioned by visa_class
immigrants_table.write.mode('overwrite').partitionBy('visa_class').parquet('immigrants')

#### #2: 'demographics' Table
#### Extract Data for demographics Table
- Select columns for state_code, state, median_age, male_population, female_population, total_population, veterans_num, foreign_born_population, avg_household_size, american_indian_alaska_native, asian, african_american, hispanic_latino, white

In [9]:
demographics_table = df_cities.select('state_code', 'state', 'median_age', 'male_population', 'female_population', \
                                      'total_population', 'veterans_num', 'foreign_born_population', 'avg_household_size',\
                                      'american_indian_alaska_native', 'asian', 'african_american', 'hispanic_latino', 'white')

demographics_table.limit(1).toPandas()

Unnamed: 0,state_code,state,median_age,male_population,female_population,total_population,veterans_num,foreign_born_population,avg_household_size,american_indian_alaska_native,asian,african_american,hispanic_latino,white
0,MT,Montana,35,87707,93587,181294,13854,5977,2,9684,4165,3349,10000,169026


In [10]:
# write demographics table to parquet files partitioned by state_code
demographics_table.write.mode('overwrite').partitionBy('state_code').parquet('demographics')

#### #3: 'airports' Table
#### Extract Data for airports Table
- Select columns for iata_code, name, type, state, elevation_ft, latitude, longitude

In [31]:
airports_table = df_airport.select('iata_code', 'name', 'type', 'state', 'elevation_ft', 'latitude', 'longitude')

airports_table.limit(1).toPandas()

Unnamed: 0,iata_code,name,type,state,elevation_ft,latitude,longitude
0,AVP,Wilkes Barre Scranton International Airport,medium_airport,PA,962,-75.72339630130001,41.3385009766


In [32]:
# write airports table to parquet files partitioned by type
airports_table.write.mode('overwrite').partitionBy('type').parquet('airports')

#### #4: 'temperatures' Table
#### Extract Data for temperatures Table
- Select columns for month, average_temperature, average_temperature_uncertainty

In [13]:
temperatures_table = df_temperature.select('month', 'average_temperature', 'average_temperature_uncertainty')

temperatures_table.limit(1).toPandas()

Unnamed: 0,month,average_temperature,average_temperature_uncertainty
0,1,5.569964,0.379331


In [14]:
# write temperatures table to parquet files 
temperatures_table.write.mode('overwrite').parquet('temperatures')

#### #5: `Date` Table
#### Extract Data for Date Table
- Extract the date, day, month, year, and weekday from the dates column 
- Specify labels for these columns 


In [42]:
d = df_immigration.select("arrival_date").union(df_immigration.select("departure_date"))\
                                    .union(df_immigration.select("allowed_date")).distinct()
d.limit(5).toPandas()                                   

Unnamed: 0,arrival_date
0,2017-08-11
1,2017-09-11
2,2016-04-25
3,2018-03-17
4,2017-01-06


In [59]:
d.printSchema()

root
 |-- date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- date_key: string (nullable = true)



In [57]:
d = d.withColumn("date_key", date_format(d.arrival_date,"yyyyMMdd"))\
    .withColumn("day", dayofmonth(d.arrival_date))\
    .withColumn("month", month(d.arrival_date))\
    .withColumn("year", year(d.arrival_date))\
    .withColumn("weekday", dayofweek(d.arrival_date))\
    .withColumnRenamed("arrival_date", "date")

d.limit(5).toPandas()

Unnamed: 0,date,day,month,year,weekday
0,2017-08-11,11,8,2017,6
1,2017-09-11,11,9,2017,2
2,2016-04-25,25,4,2016,2
3,2018-03-17,17,3,2018,7
4,2017-01-06,6,1,2017,6


In [58]:
d.limit(5).toPandas()

Unnamed: 0,date,day,month,year,weekday,date_key
0,2017-08-11,11,8,2017,6,20170811
1,2017-09-11,11,9,2017,2,20170911
2,2016-04-25,25,4,2016,2,20160425
3,2018-03-17,17,3,2018,7,20180317
4,2017-01-06,6,1,2017,6,20170106


In [60]:
# write dates table to parquet files partitioned by year and month
d.write.mode('overwrite').partitionBy('year', 'month').parquet('date')

#### #6: `immigrations` Table
#### Extract Data and immigrations Table

- Select the port, arrive_state, arrival_date_key, departure_date_key, allowed_date_key, cic_id, arrive_month and set to `immigrations_table`

In [62]:
immigrations_table = df_immigration.select('cic_id', 'port', 'arrive_state', 'arrive_month', 'arrival_date', 'departure_date',\
                                           'allowed_date')
immigrations_table.limit(5).toPandas()

Unnamed: 0,cic_id,port,arrive_state,arrive_month,arrival_date,departure_date,allowed_date
0,2018851,NYC,CT,4,2016-04-11,2016-04-23,2016-07-09
1,1835040,HHW,HI,4,2016-04-10,2016-04-19,2016-07-08
2,1835041,HHW,HI,4,2016-04-10,2016-04-19,2016-07-08
3,2352309,FTL,IL,4,2016-04-13,2016-04-17,2016-07-11
4,680670,NYC,FL,4,2016-04-04,2016-04-11,2016-07-02


In [63]:
immigrations_table = immigrations_table.withColumn("arrival_date", date_format(immigrations_table.arrival_date,"yyyyMMdd"))\
                                        .withColumn("departure_date", date_format(immigrations_table.departure_date,"yyyyMMdd"))\
                                        .withColumn("allowed_date", date_format(immigrations_table.allowed_date,"yyyyMMdd"))
immigrations_table.limit(5).toPandas()

Unnamed: 0,cic_id,port,arrive_state,arrive_month,arrival_date,departure_date,allowed_date
0,2018851,NYC,CT,4,20160411,20160423,20160709
1,1835040,HHW,HI,4,20160410,20160419,20160708
2,1835041,HHW,HI,4,20160410,20160419,20160708
3,2352309,FTL,IL,4,20160413,20160417,20160711
4,680670,NYC,FL,4,20160404,20160411,20160702


In [64]:
# write immigrations table to parquet files partitioned by arrive_state
immigrations_table.write.mode('overwrite').partitionBy('arrive_state').parquet('immigrations')

#### 4.2 Data Quality Checks
the data quality checks perform to ensure the pipeline ran as expected:
- Integrity constraints on the relational database (e.g., unique key, data type, etc.)
- Source/Count checks to ensure completeness
 
Run Quality Checks

In [3]:
# load final tables 
fact_immigrations = spark.read.parquet("immigrations")
dim_immigrants = spark.read.parquet("immigrants")
dim_demographics = spark.read.parquet("demographics")
dim_airports = spark.read.parquet("airports")
dim_temperatures = spark.read.parquet("temperatures")
dim_date = spark.read.parquet("date")
tablelist = [fact_immigrations,dim_immigrants,dim_demographics,dim_airports,dim_temperatures,dim_date]

In [4]:
# check rows in tables and column types
def check_rows(tablelist):
    for table in tablelist:
        if table.count() > 0:
            print("Quality check passed for {} table is not empty".format(table))
        else:
            print("Quality check failed. No records in {} table".format(table))

In [5]:
check_rows(tablelist)

Quality check passed for DataFrame[cic_id: int, port: string, arrive_month: int, arrival_date: string, departure_date: string, allowed_date: string, arrive_state: string] table is not empty
Quality check passed for DataFrame[cic_id: int, birth_year: int, age: int, gender: string, airline: string, flight_num: string, visa_type: string, visa_issue_state: string, mode: string, citizen_country_name: string, resident_country_name: string, visa_class: string] table is not empty
Quality check passed for DataFrame[state: string, median_age: int, male_population: int, female_population: int, total_population: int, veterans_num: int, foreign_born_population: int, avg_household_size: int, american_indian_alaska_native: int, asian: int, african_american: int, hispanic_latino: int, white: int, state_code: string] table is not empty
Quality check passed for DataFrame[iata_code: string, name: string, state: string, elevation_ft: string, latitude: string, longitude: string, type: string] table is not 

In [6]:
def check_unique_keys(fact,dim1,dim2,dim3,dim4,dim5):
    if fact.groupBy('cic_id', 'port', 'arrive_state', 'arrive_month', 'arrival_date', 'departure_date','allowed_date').count().filter("count > 1").count() == 0:
        print("Immigrations Fact table's keys are unique")
    else:
        print("Immigrations Fact table's keys are not unique")
    if dim1.groupBy("cic_id").count().filter("count > 1").count() == 0:
        print("Immigrants Dimention table's keys are unique")
    else:
        print("Immigrants Dimention table's keys are not unique")
    if dim2.groupBy("state_code").count().filter("count > 1").count() == 0:
        print("Demographics Dimention table's keys are unique")
    else:
        print("Demographics Dimention table's keys are not unique")
    if dim3.groupBy("iata_code").count().filter("count > 1").count() == 0:
        print("Airports Dimention table's keys are unique")
    else:
        print("Airports Dimention table's keys are not unique")
    if dim4.groupBy("month").count().filter("count > 1").count() == 0:
        print("Temperatures Dimention table's keys are unique")
    else:
        print("Temperatures Dimention table's keys are not unique")
    if dim5.groupBy("date_key").count().filter("count > 1").count() == 0:
        print("Date Dimention table's keys are unique")
    else:
        print("Date Dimention table's keys are not unique")

In [7]:
check_unique_keys(*tablelist)

Immigrations Fact table's keys are unique
Immigrants Dimention table's keys are unique
Demographics Dimention table's keys are unique
Airports Dimention table's keys are unique
Temperatures Dimention table's keys are unique
Date Dimention table's keys are unique


#### 4.3 Data dictionary 
Data dictionary for data model. For each field, provide a brief description of what the data is and where it came from.

#### Dimensions tables: 
- #1: 'immigrants' table: 
        - cic_id: primary key, INT, (immigrant id)
        - birth_year: INT, (4 digit year of immigrant birth)
        - age: INT, (Age of immigrant in Years)
        - gender: String, (immigrant sex)
        - airline: String,(Airline used to arrive in U.S.) 
        - flight_num: String, (Flight number of Airline used to arrive in U.S.)
        - visa_type: String, (Class of admission legally admitting the immigrant to temporarily stay in U.S.)
        - visa_class: String, (Visa class collapsed into three categories Business, Pleasure, Student)
        - visa_issue_state: String, (Department of State where Visa was issued)
        - mode: String, (Mode of transportation collapsed into three categories 'Air','Sea','Land')
        - citizen_country: String, (Country of citizenship)
        - resident_country: String, (Country of residence)

- #2: 'demographics' table: 
        - state_code: primary key, String, (code of state)
        - state: String, (name of state)
        - median_age:  INT, (Average age of residents)
        - male_population: INT, (Number of male residents)
        - female_population: INT, (Number of female residents)
        - total_population: INT, (Number of total residents)
        - veterans_num: INT, (Number of residents that are veterans)
        - foreign_born_population: INT, (Number of residents not born in country)
        - avg_household_size: INT, (Average size of residents in single house)
        - american_indian_alaska_native: INT, (Number of residents thier race is american indian alaska native)
        - asian: INT, (Number of residents thier race is asian)
        - african_american: INT, (Number of residents thier race is african american)
        - hispanic_latino: INT, (Number of residents thier race is hispanic latino) 
        - white: INT, (Number of residents thier race is white) 

- #3: 'airports' table: 
        - iata_code: Primary Key, String, (International Air Transport Association airport code)
        - name: String, (Airport name)
        - type: String, (Type of airport by size) 
        - state: String, (state of airport location)
        - elevation_ft: String, (Elevation of airport in feet)
        - latitude: String, (Latitude of airport)
        - longitude: String, (Longitude of airport)

- #4: 'temperatures' table: 
        - month: Primary Key, INT, (the month of average temperature)
        - average_temperature: DOUBLE, (average temperature in U.S. by month) 
        - average_temperature_uncertainty: DOUBLE, (Uncertainty of Avg Temp in U.S. by month)

- #5: 'date' table: 
        - date_key: Primary Key, String, (key of the date)
        - date: DATE, (date)
        - day: INT, (day of the date)
        - month: INT, (month of the date)
        - year: INT, (year of the date)
        - weekday: INT, (the day of the week)
        
#### Fact table: 
- 'immigrations' table: 
        - cic_id: Foreign Key, INT, (cic_id 'immigrants' table referenced )
        - port: Foreign Key, String, (iata_code 'airports' table referenced)
        - arrive_state: Foreign Key, String, (state_code 'demographics' table referenced)
        - arrive_month: Foreign Key, INT, (month 'temperature' table referenced)
        - arrival_date: Foreign Key, String, (date_key 'date' table referenced)
        - departure_date: Foreign Key, String, (date_key 'date' table referenced)
        - allowed_date: Foreign Key, String, (date_key 'date' table referenced)



#### Step 5: Complete Project Write Up

* The rationale for the choice of tools and technologies for the project.

    The project uses Apache Spark engine. Spark is an simple and fast and also scalable analytics engine for large scale data processing. It has an ability to process and analyse massive ammounts of data using PySpark interface. Can handle different data formats (e.g. SAS, Parquet, CSV), and can be integrated with cloud storage solutions like S3 or Redshift.

    Python is used since it is powerful for dev works in Jupyter notebook and ETL building.

* Propose how often the data should be updated and why.

    - Immigration data: data are provided on a monthly basis, and hence should be updated monthly
    - Temperature data: data runs until 2013, needs to be updated as the fact immigration is for 2016, hence data should be updated yearly
    - Demographics data: updates are provided on a yearly basis to the public and hence, a yearly update is sufficient
    - Airports data: can be assumed to be constant
    - Date data: updated periodically only once per month
    

* A description of how would approach the problem differently under the following scenarios:

 * The data was increased by 100x.
 
         Currently the Spark instance installed locally is used. However, local/stand-alone Spark may not offer the optimum performance for larger datasets. Pyspark locally begins to lack computational power. 
         In this case, it would be interesting to work with cloud technology such as AWS. Creating a cluster in AWS should be considered, it would Upload the data to S3 and then use Redshift to perform the ETL and generate the data model.
         
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
         
        In this case, I would move on a AWS Cloud platform and connect with proper BI tools (Tableau for example) and automate the entire data flow using Airflow. If a time is scheduled so that the dashboard to be updated, then this would be a perfect use case to utilize Apache Airflow to carry out this assignemnt. Within Apache Airflow, can set schedules so that the data can be refreshed on a daily basis by 7am every day. Accordingly, the dashboard that's built based on the data will also be updated.
        Airflow is a great solution to schedule and automate a data pipeline it can be easily supported by changing the "schedule_interval" value in the dags/airflow_pipeline.py code for Apache Airflow to schedule it for daily run.
        
 * The database needed to be accessed by 100+ people.
     
         Currently no database is created behind this solution as this is designed for analytical purposes for a small scale of users. 
         In this case, Data can be migrated to Amazon Redshift cloud solution, Where it can help manage the user's workload, computational resources, ensure that every user has defined access depending on their role, allow auto-scaling capabilities to handle the load of increased access by users.