# Project Title
### Data Engineering Capstone Project

#### Project Summary
In this project data gathered from four datasets with different sources for analysing US immigration data in a simple star schema. The main aim is to provide analytics to answer business questions which can be analyze and provide insight into the pattern of immigration. The analysis questions can be answered based on the data model using simple joins.
Spark was used for the ETL pipeline and The final data is stored in parquet files for analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import findspark

In [2]:
# Specify the path to your Spark installation directory
spark_home = "C:\spark-3.2.4-bin-hadoop3.2"

In [3]:
# Initialize Spark using the custom Spark home directory
findspark.init(spark_home=spark_home)

# start

In [4]:
# all imports and installs 
from datetime import datetime, timedelta, date
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, count

# Create a Spark session
spark = SparkSession.builder \
    .appName("WordvvvvvvvvvvvCount") \
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
plan to do in the project is create a simple star schema data warehouse with 1 fact table and 5 dimensional tables saved in parquet format, for analytical purposes that allows better undersanding of the immigration trends to the US using 4 datasets I94 Immigration Data, World Temperature Data, U.S. City Demographic Data, and Airport Code Table, and manipulate them using Pyspark.

#### Describe and Gather Data 
The data sets:
[I94 Immigration Data] This data comes from the US National Tourism and Trade Office. the data in csv format.
includes data about the immigrants also the year, month, arrival and depture dates of immigrations and more.
[World Temperature Data]: dataset came from Kaggle. includes the date and average temperature for cities
[U.S. City Demographic Data]: This data comes from OpenSoft. includes demographics data for each city in U.S.
[Airport Code Table]: This data comes from datahub.io. includes a simple table of airport codes and corresponding cities.

In [6]:
# Read in the us-cities-demographics data
df_immigration=spark.read.csv("immigration_data_sample.csv",header=True,sep=',')
df_immigration.limit(5).toPandas()


Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT


In [7]:
# Read in the us-cities-demographics data
df_cities=spark.read.csv("us-cities-demographics.csv",header=True,sep=';')
df_cities.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [8]:
# Read in the airport-codes_csv data
df_airport=spark.read.csv("airport-codes_csv.csv",header=True)
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [9]:
df_airport

DataFrame[ident: string, type: string, name: string, elevation_ft: string, continent: string, iso_country: string, iso_region: string, municipality: string, gps_code: string, iata_code: string, local_code: string, coordinates: string]

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Immigration Data Cleanin

In [10]:
df_immigration.limit(10).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582674633.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361995930.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780468433.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789696030.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322572633.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,...,,M,1965.0,10072016,M,,DL,736852585.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,...,,M,1968.0,10112016,F,,CX,786312185.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,...,,M,1983.0,6302016,F,,BA,55474485033.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,...,,M,1977.0,7262016,,,LX,59413424733.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,...,,M,1981.0,6292016,,,AA,55449792933.0,00109,WT


In [11]:
# Get the names of the columns
column_names = df_immigration.columns

# Print the column names
for column_name in column_names:
    print(column_name)

_c0
cicid
i94yr
i94mon
i94cit
i94res
i94port
arrdate
i94mode
i94addr
depdate
i94bir
i94visa
count
dtadfile
visapost
occup
entdepa
entdepd
entdepu
matflag
biryear
dtaddto
gender
insnum
airline
admnum
fltno
visatype


_c0: This column appears to be an autogenerated or default index column. It may not contain any meaningful information related to immigration.

cicid: This is likely an identifier for the individual immigrant or traveler.

i94yr: The 4-digit year of the arrival date (e.g., 2016).

i94mon: The numeric month of the arrival date (e.g., 4 for April).

i94cit: The code for the city of the traveler's birth.

i94res: The code for the country of residence of the traveler.

i94port: The port of entry into the United States.

arrdate: The arrival date in the United States in SAS date format (a numeric representation of dates).

i94mode: The mode of transportation used by the traveler (e.g., air, sea, land).

i94addr: The U.S. state where the traveler intended to reside.

depdate: The departure date from the United States in SAS date format.

i94bir: The age of the traveler in years.

i94visa: The visa category code (e.g., 1 for Business, 2 for Tourism, 3 for Student).

count: The count of the traveler's entries (typically 1 for a single entry).

dtadfile: Character date field indicating when the data was loaded into the system.

visapost: The visa issuing post.

occup: The occupation of the traveler (if provided).

entdepa: Arrival flag code.

entdepd: Departure flag code.

entdepu: Update flag code.

matflag: Match flag code.

biryear: The traveler's birth year.

dtaddto: Date field indicating the date the traveler is allowed to stay until.

gender: The gender of the traveler.

insnum: Immigration and Naturalization Services number.

airline: The airline used by the traveler.

admnum: The admission number, which appears to be a unique identifier.

fltno: Flight number of the traveler's flight.

visatype: The type of visa the traveler holds (e.g., B1, B2, F1, H1B).

In [12]:
# Performing cleaning tasks

# Drop unnecessary columns in immigration data sample (df_immigration)

df_immigration = df_immigration.drop('count','occup','entdepa','entdepd','entdepu','matflag','insnum','admnum','dtadfile')
df_immigration.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,,1955.0,7202016,F,JL,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,MTR,1990.0,10222016,M,*GA,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,,1940.0,7052016,M,LH,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,DOH,1991.0,10272016,M,QR,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,,1997.0,7042016,F,,LAND,WT


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Assuming df_immigration is your DataFrame
# Loop through all columns and check for null values
for column_name in df_immigration.columns:
    null_count = df_immigration.filter(col(column_name).isNull()).count()
    print(f"Column '{column_name}' has {null_count} null values.")


Column '_c0' has 0 null values.
Column 'cicid' has 0 null values.
Column 'i94yr' has 0 null values.
Column 'i94mon' has 0 null values.
Column 'i94cit' has 0 null values.
Column 'i94res' has 0 null values.
Column 'i94port' has 0 null values.
Column 'arrdate' has 0 null values.
Column 'i94mode' has 0 null values.
Column 'i94addr' has 59 null values.
Column 'depdate' has 49 null values.
Column 'i94bir' has 0 null values.
Column 'i94visa' has 0 null values.
Column 'visapost' has 618 null values.
Column 'biryear' has 0 null values.
Column 'dtaddto' has 0 null values.
Column 'gender' has 141 null values.
Column 'airline' has 33 null values.
Column 'fltno' has 8 null values.
Column 'visatype' has 0 null values.


In [14]:
#Remove rows with missing values in i94port, i94addr
df_immigration = df_immigration.dropna(how="any", subset=["i94port", "i94addr"])

In [15]:
# Get the states_codes.
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
state_codes = code_mapper(f_content, "i94addrl")
list_map = list(map(list, state_codes.items()))
state_codes_df = spark.createDataFrame(list_map, ['state_code', 'state'])
state_codes_df.limit(10).toPandas()

Unnamed: 0,state_code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA
5,CO,COLORADO
6,CT,CONNECTICUT
7,DE,DELAWARE
8,DC,DIST. OF COLUMBIA
9,FL,FLORIDA


In [16]:
# Create user defined function to validate 'state' data
valid_states = df_cities.toPandas()["State Code"].unique()
print(valid_states)

@udf(StringType())
def validate_state(s): 
    """ check for US states """
    if s in valid_states:
        return s
    return 'other'

['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA'
 'NV' 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY'
 'SC' 'LA' 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID'
 'IN' 'AK' 'MS' 'HI' 'SD' 'ME' 'MT']


In [17]:
# Extract data with valid states
df_immigration = df_immigration.withColumn("i94addr" , validate_state(df_immigration.i94addr))
df_immigration.select("i94addr").distinct().toPandas()

Unnamed: 0,i94addr
0,AZ
1,SC
2,LA
3,MN
4,NJ
5,DC
6,OR
7,VA
8,RI
9,KY


In [18]:
df_immigration=df_immigration.filter(col("i94addr")!='other')

In [19]:
col = df_immigration.select(col("i94addr")).distinct()

In [20]:
col

DataFrame[i94addr: string]

In [21]:
col.show(50)

+-------+
|i94addr|
+-------+
|     AZ|
|     SC|
|     LA|
|     MN|
|     NJ|
|     DC|
|     OR|
|     VA|
|     RI|
|     KY|
|     NH|
|     MI|
|     NV|
|     WI|
|     ID|
|     CA|
|     NE|
|     CT|
|     NC|
|     MD|
|     MO|
|     IL|
|     ME|
|     WA|
|     MS|
|     AL|
|     IN|
|     OH|
|     TN|
|     IA|
|     NM|
|     PA|
|     NY|
|     TX|
|     GA|
|     MA|
|     KS|
|     FL|
|     CO|
|     AR|
|     OK|
|     PR|
|     UT|
|     HI|
+-------+



In [22]:
df_immigration.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,,1955.0,7202016,F,JL,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,MTR,1990.0,10222016,M,*GA,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,,1940.0,7052016,M,LH,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,DOH,1991.0,10272016,M,QR,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,,1997.0,7042016,F,,LAND,WT


In [23]:
df_immigration = df_immigration.drop('depdate','arrdate')
df_immigration.limit(5).toPandas()

Unnamed: 0,_c0,cicid,i94yr,i94mon,i94cit,i94res,i94port,i94mode,i94addr,i94bir,i94visa,visapost,biryear,dtaddto,gender,airline,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,1.0,HI,61.0,2.0,,1955.0,7202016,F,JL,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,1.0,TX,26.0,2.0,MTR,1990.0,10222016,M,*GA,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,1.0,FL,76.0,2.0,,1940.0,7052016,M,LH,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,1.0,CA,25.0,2.0,DOH,1991.0,10272016,M,QR,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,3.0,NY,19.0,2.0,,1997.0,7042016,F,,LAND,WT


In [24]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType, DateType
#fix datatypes and rename columns
df_immigration = df_immigration.withColumn("cic_id",col("cicid").cast(IntegerType())).drop("cicid") \
            .withColumn("arrive_year",col('i94yr').cast(IntegerType())).drop("i94yr") \
            .withColumn("arrive_month",col('i94mon').cast(IntegerType())).drop("i94mon") \
            .withColumn("citizen_country",col('i94cit').cast(IntegerType())).drop("i94cit") \
            .withColumn("resident_country",col('i94res').cast(IntegerType())).drop("i94res") \
            .withColumn("age",col('i94bir').cast(IntegerType())).drop("i94bir") \
            .withColumn("birth_year",col('biryear').cast(IntegerType())).drop("biryear") \
            .withColumn("visa_class",col('i94visa').cast(IntegerType())).drop("i94visa") \
            .withColumn("mode",col('i94mode').cast(IntegerType())).drop("i94mode") \
            .withColumn("allowed_date", to_date("dtaddto", "MMddyyyy")).drop("dtaddto") \
            .withColumnRenamed("i94port", "port") \
            .withColumnRenamed("i94addr","arrive_state") \
            .withColumnRenamed("fltno","flight_num") \
            .withColumnRenamed("visatype","visa_type") \
            .withColumnRenamed("visapost","visa_issue_state").drop("_c0	")



In [25]:
df_immigration.limit(5).toPandas()

Unnamed: 0,_c0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrive_month,citizen_country,resident_country,age,birth_year,visa_class,mode,allowed_date
0,2027561,HHW,HI,,F,JL,00782,WT,4084316,2016,4,209,209,61,1955,2,1,2016-07-20
1,2171295,MCA,TX,MTR,M,*GA,XBLNG,B2,4422636,2016,4,582,582,26,1990,2,1,2016-10-22
2,589494,OGG,FL,,M,LH,00464,WT,1195600,2016,4,148,112,76,1940,2,1,2016-07-05
3,2631158,LOS,CA,DOH,M,QR,00739,B2,5291768,2016,4,297,297,25,1991,2,1,2016-10-27
4,3032257,CHM,NY,,F,,LAND,WT,985523,2016,4,111,111,19,1997,2,3,2016-07-04


In [26]:
df_immigration.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- port: string (nullable = true)
 |-- arrive_state: string (nullable = true)
 |-- visa_issue_state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight_num: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- cic_id: integer (nullable = true)
 |-- arrive_year: integer (nullable = true)
 |-- arrive_month: integer (nullable = true)
 |-- citizen_country: integer (nullable = true)
 |-- resident_country: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- visa_class: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- allowed_date: date (nullable = true)



In [27]:
df_immigration = df_immigration.withColumn('mode', when(df_immigration.mode == 1, 'Air' )\
                                                    .when(df_immigration.mode == 2, 'Sea')\
                                                    .when(df_immigration.mode == 3, 'Land' )\
                                                    .when(df_immigration.mode == 9, 'Not reported' ))

In [28]:
df_immigration = df_immigration.withColumn('visa_class', when(df_immigration.visa_class == 1, 'Business' )\
                                                         .when(df_immigration.visa_class == 2, 'Pleasure')\
                                                         .when(df_immigration.visa_class == 3, 'Student' ))

In [29]:
# Get the country_codes.
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
country_codes = code_mapper(f_content, "i94cntyl")
list_map = list(map(list, country_codes.items()))
country_codes_df = spark.createDataFrame(list_map, ['country_code', 'country'])
country_codes_df.limit(5).toPandas()

Unnamed: 0,country_code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [30]:
country_codes_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)



In [31]:
country_codes_df.withColumn("country_code", col("country_code").cast(IntegerType()))

DataFrame[country_code: int, country: string]

In [32]:
# join the immigration df with the country_codes_df

df_immigration = df_immigration.join(country_codes_df, df_immigration['citizen_country'] == country_codes_df.country_code, \
                                     how = 'left')\
                             .withColumnRenamed('country', 'citizen_country_name')\
                             .drop('country_code')\
                             .drop('citizen_country')\
                             .join(country_codes_df, df_immigration['resident_country'] == country_codes_df.country_code, \
                                   how = 'left')\
                             .withColumnRenamed('country', 'resident_country_name')\
                             .drop('country_code')\
                             .drop('resident_country')



In [33]:
df_immigration.limit(5).toPandas()

Unnamed: 0,_c0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrive_month,age,birth_year,visa_class,mode,allowed_date,citizen_country_name,resident_country_name
0,589494,OGG,FL,,M,LH,00464,WT,1195600,2016,4,76,1940,Pleasure,Air,2016-07-05,,GERMANY
1,2027561,HHW,HI,,F,JL,00782,WT,4084316,2016,4,61,1955,Pleasure,Air,2016-07-20,JAPAN,JAPAN
2,3032257,CHM,NY,,F,,LAND,WT,985523,2016,4,19,1997,Pleasure,Land,2016-07-04,FRANCE,FRANCE
3,2631158,LOS,CA,DOH,M,QR,00739,B2,5291768,2016,4,25,1991,Pleasure,Air,2016-10-27,QATAR,QATAR
4,2171295,MCA,TX,MTR,M,*GA,XBLNG,B2,4422636,2016,4,26,1990,Pleasure,Air,2016-10-22,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan..."


### Airport Data Cleaning

In [34]:
df_airport.limit(10).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


In [35]:
#Drop rows with 100% missing values.

df_airport = df_airport.dropna(how='all')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [36]:
#Dropping any rows with duplicate ident.
df_airport = df_airport.dropDuplicates(["ident"])
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
1,00FA,small_airport,Grass Patch Airport,53,,US,US-FL,Bushnell,00FA,,00FA,"-82.21900177001953, 28.64550018310547"
2,00FL,small_airport,River Oak Airport,35,,US,US-FL,Okeechobee,00FL,,00FL,"-80.96920013427734, 27.230899810791016"
3,00GA,small_airport,Lt World Airport,700,,US,US-GA,Lithonia,00GA,,00GA,"-84.06829833984375, 33.76750183105469"
4,00II,heliport,Bailey Generation Station Heliport,600,,US,US-IN,Chesterton,00II,,00II,"-87.122802734375, 41.644500732421875"


In [37]:
# split coordinates column into latitude and longitude

df_airport=df_airport.withColumn('latitude',split(df_airport['coordinates'],',').getItem(0))\
                    .withColumn('longitude',split(df_airport['coordinates'],',').getItem(1))\
                    .drop('coordinates')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,latitude,longitude
0,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,-121.763427,39.427188
1,00FA,small_airport,Grass Patch Airport,53,,US,US-FL,Bushnell,00FA,,00FA,-82.21900177001953,28.64550018310547
2,00FL,small_airport,River Oak Airport,35,,US,US-FL,Okeechobee,00FL,,00FL,-80.96920013427734,27.230899810791016
3,00GA,small_airport,Lt World Airport,700,,US,US-GA,Lithonia,00GA,,00GA,-84.06829833984375,33.76750183105469
4,00II,heliport,Bailey Generation Station Heliport,600,,US,US-IN,Chesterton,00II,,00II,-87.122802734375,41.64450073242188


In [38]:
# Clean airports dataset by filter only type = (small / medium / large) airports
df_airport = df_airport.filter( (df_airport["type"] == "small_airport") | (df_airport["type"]=="medium_airport") | (df_airport["type"] == "large_airport") ) 
df_airport.select('type').distinct().show()

+--------------+
|          type|
+--------------+
| large_airport|
|medium_airport|
| small_airport|
+--------------+



In [39]:
# get state code from iso_region
df_airport = df_airport.withColumn('state', split(df_airport['iso_region'], '-').getItem(1))
df_airport.limit(5).toPandas()
df_airport = df_airport.drop('iso_region')
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,latitude,longitude,state
0,00CL,small_airport,Williams Ag Airport,87,,US,Biggs,00CL,,00CL,-121.763427,39.427188,CA
1,00FA,small_airport,Grass Patch Airport,53,,US,Bushnell,00FA,,00FA,-82.21900177001953,28.64550018310547,FL
2,00FL,small_airport,River Oak Airport,35,,US,Okeechobee,00FL,,00FL,-80.96920013427734,27.230899810791016,FL
3,00GA,small_airport,Lt World Airport,700,,US,Lithonia,00GA,,00GA,-84.06829833984375,33.76750183105469,GA
4,00IL,small_airport,Hammer Airport,840,,US,Polo,00IL,,00IL,-89.5604019165039,41.97840118408203,IL


In [40]:
# since df_immigration.port is df_airport.iata_code, so we don't need null in it.
# Drop rows with missing values in iata_code.

df_airport = df_airport.dropna(how='all',subset=['iata_code'])
df_airport.select('iata_code').distinct().show()

+---------+
|iata_code|
+---------+
|      BZT|
|      YUL|
|      DWR|
|      NWI|
|      KLR|
|      KMU|
|      KGL|
|      BGM|
|      CNU|
|      CRS|
|      KEB|
|      FMY|
|      LEB|
|      OXC|
|      RKP|
|      SGT|
|      TNP|
|      LEN|
|      PMI|
|      UAB|
+---------+
only showing top 20 rows



In [41]:
# Clean airports dataset by filter only iso_country = US
df_airport = df_airport.filter(df_airport["iso_country"] == "US")
df_airport.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,latitude,longitude,state
0,0AK,small_airport,Pilot Station Airport,305,,US,Pilot Station,,PQS,0AK,-162.899994,61.934601,AK
1,16A,small_airport,Nunapitchuk Airport,12,,US,Nunapitchuk,PPIT,NUP,16A,-162.440454,60.905591,AK
2,1KC,small_airport,Kalakaket Creek AS Airport,1598,,US,Kalakaket Creek,1KC,KKK,1KC,-156.820392609,64.4166256967,AK
3,4K5,small_airport,Ouzinkie Airport,55,,US,Ouzinkie,,KOZ,4K5,-152.496715,57.925362,AK
4,5A8,medium_airport,Aleknagik / New Airport,66,,US,Aleknagik,5A8,WKK,5A8,-158.617996216,59.2826004028,AK


# us-cities-demographics Cleaning

In [42]:
df_cities.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [43]:
#Drop rows with 100% missing values.
df_cities = df_cities.dropna(how='all')

In [44]:
#Dropping duplicate rows.
df_cities = df_cities.dropDuplicates(['City', 'State', 'Race'])
df_cities.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,American Indian and Alaska Native,1813
1,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,Asian,2929
2,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,Black or African-American,14449
3,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,Hispanic or Latino,33222
4,Abilene,Texas,31.3,65212,60664,125876,9367,8129,2.64,TX,White,95487


In [45]:
# change datatypes, format column names 
df_cities = df_cities.withColumn("median_age",col("Median Age").cast(FloatType())).drop("Median Age") \
                    .withColumn("male_population",col("Male Population").cast(IntegerType())).drop("Male Population") \
                    .withColumn("female_population",col("Female Population").cast(IntegerType())).drop("Female Population") \
                    .withColumn("total_population",col("Total Population").cast(IntegerType())).drop("Total Population") \
                    .withColumn("veterans_num",col("Number of Veterans").cast(IntegerType())).drop("Number of Veterans") \
                    .withColumn("foreign_born_population",col("Foreign-born").cast(IntegerType())).drop("Foreign-born") \
                    .withColumn("avg_household_size",col("Average Household Size").cast(FloatType())).drop("Average Household Size") \
                    .withColumn("count",col("Count").cast(IntegerType())) \
                    .withColumnRenamed("City", "city") \
                    .withColumnRenamed("State", "state") \
                    .withColumnRenamed("State Code", "state_code") \
                    .withColumnRenamed("Race", "race")

In [46]:
# pivot table to make each race population into seperate columns, change column names
df_cities = df_cities.groupBy(col("city"),col("state"),col("median_age"),col("male_population"),col("female_population")\
                            ,col("total_population"),col("veterans_num"),col("foreign_born_population"),col("avg_household_size") \
                            ,col("state_code")) \
                    .pivot("race").agg(sum("count")) \
                    .fillna({"American Indian and Alaska Native": 0,
                     "Asian": 0,
                     "Black or African-American": 0,
                     "Hispanic or Latino": 0,
                     "White": 0}) \
                    .withColumnRenamed("American Indian and Alaska Native", "american_indian_alaska_native") \
                    .withColumnRenamed("Asian","asian") \
                    .withColumnRenamed("Black or African-American","african_american") \
                    .withColumnRenamed("Hispanic or Latino","hispanic_latino") \
                    .withColumnRenamed("White","white")

In [47]:
df_cities.sort("state").limit(7).toPandas()

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,veterans_num,foreign_born_population,avg_household_size,state_code,american_indian_alaska_native,asian,african_american,hispanic_latino,white
0,Dothan,Alabama,38.900002,32172,35364,67536,6334,1699,2.59,AL,656,1175,23243,1704,43516
1,Huntsville,Alabama,38.099998,91764,97350,189114,16637,12691,2.18,AL,1755,6566,61561,10887,121904
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,0,4759,18191,3430,61869
3,Birmingham,Alabama,35.599998,102122,112789,214911,13212,8258,2.21,AL,1319,1500,157985,8940,51728
4,Tuscaloosa,Alabama,29.1,47293,51045,98338,3647,4706,2.67,AL,261,2733,42331,2475,52603
5,Mobile,Alabama,38.0,91275,103030,194305,11939,7234,2.4,AL,2816,5518,96397,5229,93755
6,Montgomery,Alabama,35.400002,94582,106004,200586,14955,9337,2.41,AL,1277,6518,121360,6648,73545


In [48]:
# group table by state
df_cities= df_cities.groupBy(col("state_code"),col("state"))\
            .agg(avg("median_age").cast(IntegerType()).alias("median_age"),\
                 sum("male_population").cast(IntegerType()).alias("male_population"),\
                 sum("female_population").cast(IntegerType()).alias("female_population"),\
                 sum("total_population").cast(IntegerType()).alias("total_population"),\
                 sum("veterans_num").cast(IntegerType()).alias("veterans_num"),\
                 sum("foreign_born_population").cast(IntegerType()).alias("foreign_born_population"),\
                 avg("avg_household_size").cast(IntegerType()).alias("avg_household_size"),\
                 sum("american_indian_alaska_native").cast(IntegerType()).alias("american_indian_alaska_native"),
                 sum("asian").cast(IntegerType()).alias("asian"),\
                 sum("african_american").cast(IntegerType()).alias("african_american"),\
                 sum("hispanic_latino").cast(IntegerType()).alias("hispanic_latino"),\
                 sum("white").cast(IntegerType()).alias("white"))
df_cities.limit(5).toPandas()

Unnamed: 0,state_code,state,median_age,male_population,female_population,total_population,veterans_num,foreign_born_population,avg_household_size,american_indian_alaska_native,asian,african_american,hispanic_latino,white
0,MT,Montana,35,87707,93587,181294,13854,5977,2,9684,4165,3349,10000,169026
1,NC,North Carolina,33,1466105,1594094,3060199,166146,379327,2,35209,178740,1029446,354409,1790136
2,MD,Maryland,36,627951,684178,1312129,64143,229794,2,16155,128839,573768,138644,594522
3,CO,Colorado,35,1454619,1481050,2935669,187896,337631,2,62613,148790,208043,703722,2463916
4,CT,Connecticut,34,432157,453424,885581,24953,225866,2,10729,48311,231822,309992,505674


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
As previously mentioned, the chosen data model was the star schema, That model was the chosen one because it allows great performance, and it also allows users to write simple queries joining the fact and dimension tables in order to achieve the analytical dataset they need and perform BI solutions.

#### Fact table: 
- 'immigrations' table: fk(cic_id, port, arrive_state, arrive_month, arrival_date, departure_date, allowed_date)

#### Dimensions tables: 
- #1: 'immigrants' table: pk: cic_id, birth_year, age, gender, airline, flight_num, visa_type, visa_class, visa_issue_state, mode, citizen_country, resident_country

- #2: 'demographics' table: pk: state_code, state, median_age, male_population, female_population, total_population, veterans_num, foreign_born_population, avg_household_size, american_indian_alaska_native, asian, african_american, hispanic_latino, white

- #3: 'airports' table: pk: iata_code, name, type, state, elevation_ft, latitude, longitude

- #4: 'temperatures' table: pk: month, average_temperature, average_temperature_uncertainty

- #5: 'date' table: pk: date_key, date, day, month, year, weekday

#### 3.2 Mapping Out Data Pipelines
the steps necessary to pipeline the data into the chosen data model:

- Extract Data for immigrants Table from immigration dataframe and Write data into parquet files
- Extract Data for demographics Table from demographics dataframe and Write data into parquet files
- Extract Data for airports Table from airports dataframe and Write data into parquet files
- Extract Data for temperatures Table from temperatures dataframe and Write data into parquet files
- Extract Data for Date Table from immigration dataframe and Write data into parquet files
- Extract Data and immigrations Table from immigration dataframe and Write data into parquet files

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### #1: 'immigrants' Table
#### Extract Data for immigrants Table
- Select columns for cic_id, birth_year, age, gender, airline, flight_num, visa_type, visa_class, visa_issue_state, mode, citizen_country, resident_country

#### #1: `immigrations` Table
#### Extract Data and immigrations Table

- Select the port, arrive_state, arrival_date_key, departure_date_key, allowed_date_key, cic_id, arrive_month and set to `immigrations_table`

In [49]:
df_immigration.limit(5).toPandas()

Unnamed: 0,_c0,port,arrive_state,visa_issue_state,gender,airline,flight_num,visa_type,cic_id,arrive_year,arrive_month,age,birth_year,visa_class,mode,allowed_date,citizen_country_name,resident_country_name
0,589494,OGG,FL,,M,LH,00464,WT,1195600,2016,4,76,1940,Pleasure,Air,2016-07-05,,GERMANY
1,2027561,HHW,HI,,F,JL,00782,WT,4084316,2016,4,61,1955,Pleasure,Air,2016-07-20,JAPAN,JAPAN
2,3032257,CHM,NY,,F,,LAND,WT,985523,2016,4,19,1997,Pleasure,Land,2016-07-04,FRANCE,FRANCE
3,2631158,LOS,CA,DOH,M,QR,00739,B2,5291768,2016,4,25,1991,Pleasure,Air,2016-10-27,QATAR,QATAR
4,2171295,MCA,TX,MTR,M,*GA,XBLNG,B2,4422636,2016,4,26,1990,Pleasure,Air,2016-10-22,"MEXICO Air Sea, and Not Reported (I-94, no lan...","MEXICO Air Sea, and Not Reported (I-94, no lan..."


In [50]:
immigrations_table = df_immigration.select('cic_id', 'port', 'arrive_state', 'arrive_month')
immigrations_table.limit(5).toPandas()

Unnamed: 0,cic_id,port,arrive_state,arrive_month
0,1195600,OGG,FL,4
1,4084316,HHW,HI,4
2,985523,CHM,NY,4
3,5291768,LOS,CA,4
4,4422636,MCA,TX,4


In [67]:
# Provide the path where you want to save the CSV file
output_path = "C:\\Users\\AL-FAJR\\Desktop\\spark_project\\fils\\immigrations_table.csv"
# Save the DataFrame as a CSV file
immigrations_table.write.csv(output_path, header=True)

#### #2: 'immigrants' Table
#### Extract Data for immigrants Table
- Select columns for cic_id, birth_year, age, gender, airline, flight_num, visa_type, visa_class, visa_issue_state, mode, citizen_country, resident_country

In [51]:
immigrants_table = df_immigration.select('cic_id', 'birth_year', 'age', 'gender', 'airline', 'flight_num', 'visa_type', 'visa_class',\
                                       'visa_issue_state', 'mode', 'citizen_country_name', 'resident_country_name')

In [60]:
# Provide the path where you want to save the CSV file
output_path = "C:\\Users\\AL-FAJR\\Desktop\\spark_project\\fils\\immigrants_table.csv"
# Save the DataFrame as a CSV file
immigrants_table.write.csv(output_path, header=True)


#### #2: 'demographics' Table
#### Extract Data for demographics Table
- Select columns for state_code, state, median_age, male_population, female_population, total_population, veterans_num, foreign_born_population, avg_household_size, american_indian_alaska_native, asian, african_american, hispanic_latino, white

In [61]:
demographics_table = df_cities.select('state_code', 'state', 'median_age', 'male_population', 'female_population', \
                                      'total_population', 'veterans_num', 'foreign_born_population', 'avg_household_size',\
                                      'american_indian_alaska_native', 'asian', 'african_american', 'hispanic_latino', 'white')

demographics_table.limit(1).toPandas()

Unnamed: 0,state_code,state,median_age,male_population,female_population,total_population,veterans_num,foreign_born_population,avg_household_size,american_indian_alaska_native,asian,african_american,hispanic_latino,white
0,MT,Montana,35,87707,93587,181294,13854,5977,2,9684,4165,3349,10000,169026


In [64]:
# Provide the path where you want to save the CSV file
output_path = "C:\\Users\\AL-FAJR\\Desktop\\spark_project\\fils\\demographics_table.csv"
# Save the DataFrame as a CSV file
demographics_table.write.csv(output_path, header=True)

#### #3: 'airports' Table
#### Extract Data for airports Table
- Select columns for iata_code, name, type, state, elevation_ft, latitude, longitude

In [65]:
airports_table = df_airport.select('iata_code', 'name', 'type', 'state', 'elevation_ft', 'latitude', 'longitude')

airports_table.limit(1).toPandas()

Unnamed: 0,iata_code,name,type,state,elevation_ft,latitude,longitude
0,PQS,Pilot Station Airport,small_airport,AK,305,-162.899994,61.934601


In [66]:
# Provide the path where you want to save the CSV file
output_path = "C:\\Users\\AL-FAJR\\Desktop\\spark_project\\fils\\airports_table.csv"
# Save the DataFrame as a CSV file
airports_table.write.csv(output_path, header=True)