# Project Title
### Data Engineering Capstone Project

#### Project Deep dive
The Capstone project utilises immigration data from the International Trade Administration (ITA) and World Temperature Data from a dataset from kaggle. \
The main purpose of this project is to explore data, create an efficient database with an optimised data model, and allow users to validate initial hypotheses surrounding the seasonality of immigration. The data used is not complete enough to provide conclusive insights into the correlation between temperature and immigration due to the immigration data only being available for 2016. The temperature data only goes until 2013. We can however extrapolate the average temperatures and apply them to our analysis. 
 
The sort of questions that users may want to answer are as follows:
- Does temperature have an effect on immigration fluxes?
- Are immigration fluxes prone to seasonality?
- Are higher temperatures repellent for immigration? 
- Do higher temperatures constitute for dangerous situations surrounding immigration movements?
- Which months do immigrants enter the USA most frequently?
- Do we experience significant amplitude in immigration over different years and months?
- Which cities do immigrants enter the most?

As stated, to answer all of these questions more data is needed, but the model and functions are set up in a such a way that this data can easily be added to fully enrich the database. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [15]:
import pandas as pd
import re
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import count, isnan
pd.set_option('display.max_rows', 50, 'display.max_columns', 50)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [3]:
imm_sub = pd.read_csv('immigration_data_sample.csv')
# reading the whole immigration dataset using pandas was not feasible so initially the sample file is used
us_cities = pd.read_csv("us-cities-demographics.csv", sep = ';')


In [10]:
tempa_file_name = "GlobalLandTemperaturesByCity.csv"
tempa = pd.read_csv(tempa_file_name, sep=',', index_col=0)

In [5]:
regex_pattern = re.compile(r"\'(.*)\'.*\'([A-Z\-a-z]+)(.*)\'")
i94_ports = {}
cities = {}
with open("I94_SAS_Labels_Descriptions.sas") as f:
    lines = f.readlines()
for line in lines[303:962]:
    match = regex_pattern.search(line)
    i94_ports[match[1]] = match[2]
    cities[match[2].title()] = match[1]

In [6]:
cities

{'Anchorage': 'ANC',
 'Baker': 'BAR',
 'Daltons': 'DAC',
 'Dew': 'PIZ',
 'Dutch': 'DTH',
 'Eagle': 'EGP',
 'Fairbanks': 'FRB',
 'Homer': 'HOM',
 'Hyder': 'HYD',
 'Juneau': 'JUN',
 'Ketchikan': 'KET',
 'Moses': 'MWH',
 'Nikiski': 'NIK',
 'Nom': 'NOM',
 'Poker': 'PKC',
 'Port': 'PTO',
 'Skagway': 'SKA',
 'St': 'STA',
 'Tokeen': 'TKI',
 'Wrangell': 'WRA',
 'Madison': 'HSV',
 'Mobile': 'MOB',
 'Little': 'LIA',
 'Rogers': 'ROG',
 'Douglas': 'DOU',
 'Lukeville': 'LUK',
 'Mariposa': 'MAP',
 'Naco': 'NAC',
 'Nogales': 'NOG',
 'Phoenix': 'PHO',
 'Portal': 'POR',
 'San': 'SAN',
 'Sasabe': 'SAS',
 'Tucson': 'TUC',
 'Yuma': 'YUM',
 'Andrade': 'AND',
 'Burbank': 'BUR',
 'Calexico': 'CAL',
 'Campo': 'CAO',
 'Fresno': 'FRE',
 'Imperial': 'ICP',
 'Long': 'LNB',
 'Los': 'LOI',
 'Meadows': 'BFL',
 'Oakland': 'PTK',
 'Ontario': 'ONT',
 'Otay': 'OTM',
 'Pacific': 'BLT',
 'Palm': 'PSP',
 'Sacramento': 'SAC',
 'Salinas': 'SLS',
 'Santa': 'STR',
 'Stockton': 'STO',
 'Tecate': 'TEC',
 'Travis-Afb': 'TRV',
 'A

In [7]:
i94_ports

{'ANC': 'ANCHORAGE',
 'BAR': 'BAKER',
 'DAC': 'DALTONS',
 'PIZ': 'DEW',
 'DTH': 'DUTCH',
 'EGL': 'EAGLE',
 'FRB': 'FAIRBANKS',
 'HOM': 'HOMER',
 'HYD': 'HYDER',
 'JUN': 'JUNEAU',
 '5KE': 'KETCHIKAN',
 'KET': 'KETCHIKAN',
 'MOS': 'MOSES',
 'NIK': 'NIKISKI',
 'NOM': 'NOM',
 'PKC': 'POKER',
 'ORI': 'PORT',
 'SKA': 'SKAGWAY',
 'SNP': 'ST',
 'TKI': 'TOKEEN',
 'WRA': 'WRANGELL',
 'HSV': 'MADISON',
 'MOB': 'MOBILE',
 'LIA': 'LITTLE',
 'ROG': 'ROGERS',
 'DOU': 'DOUGLAS',
 'LUK': 'LUKEVILLE',
 'MAP': 'MARIPOSA',
 'NAC': 'NACO',
 'NOG': 'NOGALES',
 'PHO': 'PHOENIX',
 'POR': 'PORTAL',
 'SLU': 'SAN',
 'SAS': 'SASABE',
 'TUC': 'TUCSON',
 'YUI': 'YUMA',
 'AND': 'ANDRADE',
 'BUR': 'BURBANK',
 'CAL': 'CALEXICO',
 'CAO': 'CAMPO',
 'FRE': 'FRESNO',
 'ICP': 'IMPERIAL',
 'LNB': 'LONG',
 'LOS': 'LOS',
 'BFL': 'MEADOWS',
 'OAK': 'OAKLAND',
 'ONT': 'ONTARIO',
 'OTM': 'OTAY',
 'BLT': 'PACIFIC',
 'PSP': 'PALM',
 'SAC': 'SACRAMENTO',
 'SLS': 'SALINAS',
 'SDP': 'SAN',
 'SFR': 'SAN',
 'SNJ': 'SAN',
 'SLO': 'SAN',

In [8]:
imm_sub.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [11]:
print(len(tempa))
tempa.head(10)

8599212


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [12]:
us_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Starting spark session
Using spark to read in the sas immigration data

In [17]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

23/02/27 18:29:31 WARN Utils: Your hostname, Terminator.local resolves to a loopback address: 127.0.0.1; using 192.168.178.97 instead (on interface en0)
23/02/27 18:29:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /Users/terminator/.ivy2/cache
The jars for the packages stored in: /Users/terminator/.ivy2/jars
saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f32a229a-0851-4025-9a14-6ad31d845d79;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;2.0.0-s_2.11 in spark-packages
	found com.epam#parso;2.0.8 in central
	found org.slf4j#slf4j-api;1.7.5 in central


:: loading settings :: url = jar:file:/Users/terminator/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.logging.log4j#log4j-api-scala_2.11;2.7 in central
	found org.scala-lang#scala-reflect;2.11.8 in central
:: resolution report :: resolve 171ms :: artifacts dl 7ms
	:: modules in use:
	com.epam#parso;2.0.8 from central in [default]
	org.apache.logging.log4j#log4j-api-scala_2.11;2.7 from central in [default]
	org.scala-lang#scala-reflect;2.11.8 from central in [default]
	org.slf4j#slf4j-api;1.7.5 from central in [default]
	saurfang#spark-sas7bdat;2.0.0-s_2.11 from spark-packages in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   5   |   0   |   0   |   0   ||   5   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-f32a229a

23/02/27 18:29:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [18]:
df_imm = spark.read.parquet("sas_data")

                                                                                

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


In [19]:
df_imm.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [20]:
df_imm.show(truncate=False)

23/02/27 18:29:44 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid    |i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum        |fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|4.0   |245.0 |438.0 |LOS    |20574.0|1.0    |CA     |20582.0|40.0  |1.0    |1.0  |20160430|SYD     |null |G      |O      |nu

In [21]:
df_imm.select(F.max(df_imm.i94mon)).show()

+-----------+
|max(i94mon)|
+-----------+
|        4.0|
+-----------+



In [22]:
df_imm.count()

3096313

In [23]:
tempa.dt.describe

<bound method NDFrame.describe of 0          1743-11-01
1          1743-12-01
2          1744-01-01
3          1744-02-01
4          1744-03-01
              ...    
8599207    2013-05-01
8599208    2013-06-01
8599209    2013-07-01
8599210    2013-08-01
8599211    2013-09-01
Name: dt, Length: 8599212, dtype: object>

In [24]:
tempa_duplicates = tempa[tempa.duplicated()]
print(tempa_duplicates)
# no duplictes in temperature data

Empty DataFrame
Columns: [dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude]
Index: []


In [25]:
mask = tempa.isna().any(axis=1)
tempa_na = tempa[mask]
print(tempa_na)
# a fair amount of records with NaN average temperatures, these will be removed.

                 dt  AverageTemperature  AverageTemperatureUncertainty  \
1        1743-12-01                 NaN                            NaN   
2        1744-01-01                 NaN                            NaN   
3        1744-02-01                 NaN                            NaN   
4        1744-03-01                 NaN                            NaN   
9        1744-08-01                 NaN                            NaN   
...             ...                 ...                            ...   
8596076  1752-06-01                 NaN                            NaN   
8596077  1752-07-01                 NaN                            NaN   
8596078  1752-08-01                 NaN                            NaN   
8596079  1752-09-01                 NaN                            NaN   
8599211  2013-09-01                 NaN                            NaN   

           City      Country Latitude Longitude  
1         Århus      Denmark   57.05N    10.33E  
2         Å

#### Cleaning Steps
##### Cleaning the temperature data
The world temperature data only goes until 2013, it also includes temperatures for 1743. These records are not necessary for this dataset and will be removed; we will keep the average temperatures of 2012 given that the average annual temperature was 56.6 °F which is around the same as the average annual temperature of 2016 (56.2 °F). We are dealing with 2016 immigration data and only have temperature data until 2013. With the annual averages being around the same we can more accurately use the 2012 temperature data compared to the 2013 data (average annual temperature was 53.3°F, source: https://www.weather.gov/media/slc/ClimateBook/Annual%20Average%20Temperature%20By%20Year.pdf). More concretely we only have 2016 April immigration data so we will only use 2012 April average temperature in the function. \
Moreover, records that do not have an average temperature will be removed. \
Lastly, only cities in the US are selected.

In [26]:
tempa = tempa[tempa["dt"] >= '2012-01-01']
tempa.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 73710 entries, 3218 to 8599211
Data columns (total 7 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   dt                             73710 non-null  object 
 1   AverageTemperature             70640 non-null  float64
 2   AverageTemperatureUncertainty  70640 non-null  float64
 3   City                           73710 non-null  object 
 4   Country                        73710 non-null  object 
 5   Latitude                       73710 non-null  object 
 6   Longitude                      73710 non-null  object 
dtypes: float64(2), object(5)
memory usage: 4.5+ MB


In [27]:
tempa = tempa[tempa['AverageTemperature'].notna()]
tempa.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 70640 entries, 3218 to 8599210
Data columns (total 7 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   dt                             70640 non-null  object 
 1   AverageTemperature             70640 non-null  float64
 2   AverageTemperatureUncertainty  70640 non-null  float64
 3   City                           70640 non-null  object 
 4   Country                        70640 non-null  object 
 5   Latitude                       70640 non-null  object 
 6   Longitude                      70640 non-null  object 
dtypes: float64(2), object(5)
memory usage: 4.3+ MB


In [28]:
tempa = tempa[tempa['Country'] == 'United States']
tempa.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5396 entries, 49859 to 8439246
Data columns (total 7 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   dt                             5396 non-null   object 
 1   AverageTemperature             5396 non-null   float64
 2   AverageTemperatureUncertainty  5396 non-null   float64
 3   City                           5396 non-null   object 
 4   Country                        5396 non-null   object 
 5   Latitude                       5396 non-null   object 
 6   Longitude                      5396 non-null   object 
dtypes: float64(2), object(5)
memory usage: 337.2+ KB


##### Cleaning the i94 ports
Both the immigration and the temperature data need to be cleaned to only display the relevant i94 ports. The labels description SAS file is used for this purpose. 

In [29]:
df_imm_cleaned = df_imm[(df_imm.i94port.isin(list(i94_ports.keys())))]
print(df_imm_cleaned.count())
df_imm_cleaned.show(truncate=False, n=5)

3096242
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid    |i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum        |fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|4.0   |245.0 |438.0 |LOS    |20574.0|1.0    |CA     |20582.0|40.0  |1.0    |1.0  |20160430|SYD     |null |G      |O      |null   |M      |1976.0 |10292016|F     |null  |QF     |9.495387003E10|00011|B1      |
|5748518.0|2016.0|4.0   |245.0 |438.0 |LOS    |20574.0|1.0    |NV     |20591.0|32.0  |1.

In [30]:
#tempa = tempa[(tempa["City"].isin(list(cities.keys())))]
#print(len(tempa))
tempa[(tempa["City"].isin(list(cities.keys())))]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
140284,2012-01-01,-0.344,0.410,Akron,United States,40.99N,80.95W
140285,2012-02-01,1.527,0.319,Akron,United States,40.99N,80.95W
140286,2012-03-01,10.109,0.442,Akron,United States,40.99N,80.95W
140287,2012-04-01,9.195,0.412,Akron,United States,40.99N,80.95W
140288,2012-05-01,18.921,0.322,Akron,United States,40.99N,80.95W
...,...,...,...,...,...,...,...
8165133,2013-05-01,17.134,0.188,Washington,United States,39.38N,76.99W
8165134,2013-06-01,22.919,0.245,Washington,United States,39.38N,76.99W
8165135,2013-07-01,25.658,0.304,Washington,United States,39.38N,76.99W
8165136,2013-08-01,22.722,0.241,Washington,United States,39.38N,76.99W


In [31]:
# And finally adding the i94_port to the temperature dataset
tempa['i94port'] = tempa["City"].map(cities)

In [32]:
tempa

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,i94port
49859,2012-01-01,7.996,0.204,Abilene,United States,32.95N,100.53W,
49860,2012-02-01,8.434,0.252,Abilene,United States,32.95N,100.53W,
49861,2012-03-01,15.628,0.173,Abilene,United States,32.95N,100.53W,
49862,2012-04-01,21.069,0.388,Abilene,United States,32.95N,100.53W,
49863,2012-05-01,24.698,0.323,Abilene,United States,32.95N,100.53W,
...,...,...,...,...,...,...,...,...
8439242,2013-05-01,15.544,0.281,Yonkers,United States,40.99N,74.56W,
8439243,2013-06-01,20.892,0.273,Yonkers,United States,40.99N,74.56W,
8439244,2013-07-01,24.722,0.279,Yonkers,United States,40.99N,74.56W,
8439245,2013-08-01,21.001,0.323,Yonkers,United States,40.99N,74.56W,


From the exploration above we want to filter the i94 immigration data on valid i94 ports. \
We need the temperature data to be in a spark dataframe as well. \
From the temperature data we can see that it could be enriched with the i94 port code on city name, and then filter the temperature data without valid i94 ports as well as the records without average temperature data. \
For the immigration table we could use the columns ["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa"] and for the temperature table we could use ["AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"]. \
A mixture of these two tables will result in the fact table. 

In [33]:
# Single function to clean the temperature data
def clean_temperature(tempa, cities):
    """
    Function that cleans the pandas temperature dataframe. 
    Takes as input the temperature pandas dataframe and the cities dictionary created from the SAS labels file
    Average temperature of 2012 April is selected, Nan average temperature records are removed, only cities in the US are kept, and the City's i94 port needs to be found.
    The subset of columns that are deemed to be useful for the fact table are selected.
    Finally the pandas dataframe is converted to a spark dataframe to coincide with the immigration dataframe.
    """
    tempa = tempa[(tempa["dt"] >= '2012-04-01') & (tempa["dt"] < '2012-05-01')]
    tempa = tempa[tempa['AverageTemperature'].notna()]
    tempa = tempa[tempa['Country'] == 'United States']
    tempa = tempa[(tempa["City"].isin(list(cities.keys())))]
    tempa['i94port'] = tempa["City"].map(cities)
    tempa = tempa.loc[:, ["dt", "AverageTemperature", "City", "Country", "Latitude", "Longitude", "i94port"]]
    tempa_table = spark.createDataFrame(tempa)
    return tempa_table

In [34]:
tempa_table = clean_temperature(tempa, cities)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Checking on clean temperature table if any of the columns contain null or Nan values. The following continues the data exploration and is the pre-cursor to the data quality check functions. 

In [35]:
tempa_table.select((isnan("City") | tempa_table["City"].isNull())).show()
#tempa_table.select(count(isnan("City") | tempa_table["City"].isNull())).collect()[0]
#tempa_table.filter((isnan(c) | tempa_table[c].isNull()).alias(c) for c in tempa_table.columns).collect()[0]

+-------------------------------+
|(isnan(City) OR (City IS NULL))|
+-------------------------------+
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
|                          false|
+-------------------------------+
only showing top 20 rows



In [36]:
# Single function to clean the immigration data
def clean_immigration(df_imm, i94_ports):
    """
    Function to clean immigration spark dataframe.
    Inputs: Spark immigration dataframe and i94_ports dictionary
    Filters out records where the i94 port does not match the i94_ports dictionary.
    The selected subset of columns are to be used in the fact table.
    Returns a cleaned immigration spark dataframe. 
    """
    df_imm = df_imm.filter(df_imm.i94port.isin(list(i94_ports.keys())))
    imm_table = df_imm.select(["cicid", "i94yr", "i94mon", "i94cit", "i94port", "arrdate", "depdate", "i94visa"])
    return imm_table

In [37]:
imm_table = clean_immigration(df_imm, i94_ports)

In [38]:
imm_table.filter(isnan("depdate")).show()

+-----+-----+------+------+-------+-------+-------+-------+
|cicid|i94yr|i94mon|i94cit|i94port|arrdate|depdate|i94visa|
+-----+-----+------+------+-------+-------+-------+-------+
+-----+-----+------+------+-------+-------+-------+-------+



In [39]:
imm_table.filter(imm_table["depdate"].isNull()).show()

+---------+------+------+------+-------+-------+-------+-------+
|    cicid| i94yr|i94mon|i94cit|i94port|arrdate|depdate|i94visa|
+---------+------+------+------+-------+-------+-------+-------+
|5748534.0|2016.0|   4.0| 245.0|    SFR|20574.0|   null|    2.0|
|5748908.0|2016.0|   4.0| 249.0|    LOS|20574.0|   null|    2.0|
|5748930.0|2016.0|   4.0| 249.0|    SEA|20574.0|   null|    2.0|
|5748945.0|2016.0|   4.0| 249.0|    SFR|20574.0|   null|    2.0|
|5748946.0|2016.0|   4.0| 249.0|    SFR|20574.0|   null|    2.0|
|5748948.0|2016.0|   4.0| 249.0|    BOS|20574.0|   null|    3.0|
|5748968.0|2016.0|   4.0| 250.0|    WAS|20574.0|   null|    1.0|
|5749006.0|2016.0|   4.0| 250.0|    CHI|20574.0|   null|    2.0|
|5749007.0|2016.0|   4.0| 250.0|    CHI|20574.0|   null|    2.0|
|5749009.0|2016.0|   4.0| 250.0|    CHI|20574.0|   null|    2.0|
|5749064.0|2016.0|   4.0| 250.0|    LOS|20574.0|   null|    2.0|
|5749094.0|2016.0|   4.0| 251.0|    NYC|20573.0|   null|    2.0|
|5749225.0|2016.0|   4.0|

We can see here that some records have empty departure dates. Whilst this is not strange at first sight, it is a little strange for the US. This is because at the port of arrival all visitors are required to submit an address for their stay as well as dates for their departure, including flight tickets. There are obviously occasions where the departure date of a visit is to be determined later, but this would constitute a flag for immigration staff and further questioning would take place. In this dataset it could be possible that the lack of the departure date is a data quality issue, and that it will perhaps be added in updates. The immigration table at the source is a transaction table where new records get appended, an update on existing records therefore seems unlikely. \
In further iterations of this immigration insights table it would be wise to further investigate what the nature and the impact is of the lack of a departure date as a question in the likes of "how many visitors are currently in the US" is very viable and interesting for users of the table to ask. 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model and Data Dictionary
"Map out the conceptual data model and explain why you chose that model."

We are using the immigration and global temperatures datasets. They are cleaned with the I94 SAS Labels Descriptions file. After the cleaning step we have two dimensional tables with the following structure:
##### Immigration standardized table
| Column name | data type | description |
| ----------- | --------- | ------------------------------- |
|**cicid**      | **FLOAT**       | **immigration record id**|
| i94yr       | INT       | year of immigration record      |
| i94mon      | INT       | month of immigration record      |
| i94cit      | VARCHAR(3)| origin country code               |
| i94port     | CHAR(3)   | US city code of arrival port      |
| arrdate     | FLOAT     | arrival date at US port in sas date format      |
| depdate     | FLOAT     | departure date from US in sas date format      |
| i94visa     | FLOAT     | reason for visit      |

Each row in the immigration record table contains information on aliens entering in US ports of arrival. The unique visit record is marked with a **cicid**. 
##### Temperature standardized table
| Column name | data type | description |
| ----------- | --------- | ------------------------------- |
| dt       | VARCHAR(256)       | month of temperature record in yyyy-mm-dd     |
| AverageTemperature      | FLOAT       | average temperature for recorded month      |
| City      | VARCHAR(256)| city of temperature record               |
| Country     | VARCHAR(256)   | country of temperature record      |
| Latitude     | VARCHAR(256)     | latitude      |
| Longitude     | VARCHAR(256)     | longitude       |
| i94port     | CHAR(3)     | US city code of arrival port      |

Each row denotes the average temperature for a given month of a given city. The combination of the **dt** and City columns create a unique record.
##### Immigration optimized fact table
| Column name | data type | description |
| ----------- | --------- | ------------------------------- |
| **cicid**       | **FLOAT**       | **immigration record id**      |
| i94yr       | INT       | year of immigration record      |
| i94mon      | INT       | month of immigration record      |
| i94cit      | VARCHAR(3)| origin country code               |
| i94port     | CHAR(3)   | US city code of arrival port      |
| arrdate     | FLOAT     | arrival date at US port in sas date format      |
| depdate     | FLOAT     | departure date from US in sas date format      |
| i94visa     | FLOAT     | reason for visit      |
| dt       | VARCHAR(256)       | month of temperature record in yyyy-mm-dd     |
| AverageTemperature      | FLOAT       | average temperature for recorded month      |
| City      | VARCHAR(256)| city of temperature record               |
| Country     | VARCHAR(256)   | country of temperature record      |
| Latitude     | VARCHAR(256)     | latitude      |
| Longitude     | VARCHAR(256)     | longitude       |

These columns were chosen based on the fact that a user of the optimized fact table wants to easily see the average temperature of the month of the port of arrival. The more detailed columns from the immigration table do not enrich the use case for which the fact table is being built. \
The immigration records are enriched with temperature data for the port of arrival. Each record remains unique through the **cicid** column. The fact table can be seen as a transaction table, new visits to the US are recorded and appended to the table. Existing records do not change. 
#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. Start SparkSession
2. Ingest the data into Spark (immigration) and pandas (temperature) dataframes
3. Extract cities and i94_ports dictionairies from I94_SAS_labels
4. Clean the temperature data with dedicated cleaning function (see clean_temperature function for specifics)
5. Clean immigration data with dedicated function (see clean_immigration for specifics)
6. Run data quality checks on completeness of ingested data
7. Write cleaned and checked tables (immigration and temperature) to standardized storage as parquet files
8. Combine standardized spark tables to a single optimized fact table to be updated regularly.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 

In [40]:
# Data quality check function to determine if standardized datasets contain data and if there are null values
def check_data_quality(df, string_of_df_to_check, exclude_cols=[]):
    """
    Performs data quality checks on a Spark DataFrame. Checks if there are columns that contain null/Nan values.
    Takes as inputs a Spark DataFrame, a string variable of the name of the DataFrame to check, 
    and an optional list of columns to exclude from the null/Nan values check in case there are expected null/Nan values.
    Returns True if the DataFrame contains data and has no null/Nan values, and prints an error otherwise.
    """
    if df.count() == 0:
        raise ValueError (f"Error: DataFrame {string_of_df_to_check} is empty")
        
    else:
        null_columns = []
        for c in df.columns:
            if c in exclude_cols:
                continue
            null_count = df.filter((isnan(c) | df[c].isNull())).count()
            if null_count > 0:
                null_columns.append(c)
        if null_columns:
            print(f"Error: DataFrame {string_of_df_to_check} contains null/Nan values in columns: {null_columns}")
        else:
            print(f"Data quality checks passed: DataFrame {string_of_df_to_check} contains data and has no null/Nan values")
            return True

In [41]:
def check_duplicates(df):
    """
    Checks a Spark DataFrame for duplicate records.
    Returns True if the DataFrame contains no duplicate records, and False otherwise.
    """
    count_before = df.count()
    df = df.dropDuplicates()
    count_after = df.count()
    if count_before == count_after:
        print("No duplicate records found")
        return True
    else:
        print(f"Error: {count_before - count_after} duplicate records found")
        return False

#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [42]:
# 1. Start SparkSession
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [43]:
# 2. Ingest data
## immigration dataset
df_imm = spark.read.parquet("sas_data")
## temperature dataset
tempa_file_name = "GlobalLandTemperaturesByCity.csv"
tempa = pd.read_csv(tempa_file_name, sep=',', index_col=0)

In [44]:
# 3. Extract cities and i94_ports dictionairies from I94_SAS_Labels
regex_pattern = re.compile(r"\'(.*)\'.*\'([A-Z\-a-z]+)(.*)\'")
i94_ports = {}
cities = {}
with open("I94_SAS_Labels_Descriptions.sas") as f:
    lines = f.readlines()
for line in lines[303:962]:
    match = regex_pattern.search(line)
    i94_ports[match[1]] = match[2]
    cities[match[2].title()] = match[1]

In [45]:
# 4. Clean temperature data
tempa_table = clean_temperature(tempa, cities)
tempa_table.show(truncate=False, n=10)
tempa_table.count()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+----------+------------------+-----------+-------------+--------+---------+-------+
|dt        |AverageTemperature|City       |Country      |Latitude|Longitude|i94port|
+----------+------------------+-----------+-------------+--------+---------+-------+
|2012-04-01|9.195             |Akron      |United States|40.99N  |80.95W   |CAK    |
|2012-04-01|13.133            |Albuquerque|United States|34.56N  |107.03W  |ABQ    |
|2012-04-01|12.158            |Alexandria |United States|39.38N  |76.99W   |AXB    |
|2012-04-01|0.4289999999999998|Anchorage  |United States|61.88N  |151.13W  |ANC    |
|2012-04-01|16.287            |Atlanta    |United States|34.56N  |83.68W   |ATL    |
|2012-04-01|22.44100000000001 |Austin     |United States|29.74N  |97.85W   |AUS    |
|2012-04-01|12.158            |Baltimore  |United States|39.38N  |76.99W   |BAL    |
|2012-04-01|22.031            |Beaumont   |United States|29.74N  |94.15W   |BEA    |
|2012-04-01|18.724            |Birmingham |United States|32.95N  

73

In [46]:
# 5. Clean immigration data
imm_table = clean_immigration(df_imm, i94_ports)
imm_table.show(truncate=False, n=10)
imm_table.count()

+---------+------+------+------+-------+-------+-------+-------+
|cicid    |i94yr |i94mon|i94cit|i94port|arrdate|depdate|i94visa|
+---------+------+------+------+-------+-------+-------+-------+
|5748517.0|2016.0|4.0   |245.0 |LOS    |20574.0|20582.0|1.0    |
|5748518.0|2016.0|4.0   |245.0 |LOS    |20574.0|20591.0|1.0    |
|5748519.0|2016.0|4.0   |245.0 |LOS    |20574.0|20582.0|1.0    |
|5748520.0|2016.0|4.0   |245.0 |LOS    |20574.0|20588.0|1.0    |
|5748521.0|2016.0|4.0   |245.0 |LOS    |20574.0|20588.0|1.0    |
|5748522.0|2016.0|4.0   |245.0 |HHW    |20574.0|20579.0|2.0    |
|5748523.0|2016.0|4.0   |245.0 |HHW    |20574.0|20586.0|2.0    |
|5748524.0|2016.0|4.0   |245.0 |HHW    |20574.0|20586.0|2.0    |
|5748525.0|2016.0|4.0   |245.0 |HOU    |20574.0|20581.0|2.0    |
|5748526.0|2016.0|4.0   |245.0 |LOS    |20574.0|20581.0|2.0    |
+---------+------+------+------+-------+-------+-------+-------+
only showing top 10 rows



3096242

In [47]:
# 6. Run data quality checks
check_data_quality(tempa_table, "temperature table")

Data quality checks passed: DataFrame temperature table contains data and has no null/Nan values


True

In [48]:
check_duplicates(tempa_table)

No duplicate records found


True

In [49]:
check_data_quality(imm_table, "immigration table", ["depdate"])

                                                                                

Data quality checks passed: DataFrame immigration table contains data and has no null/Nan values


True

In [50]:
check_duplicates(imm_table)

[Stage 91:>                                                       (0 + 16) / 17]

No duplicate records found


                                                                                

True

In [51]:
# 7. Write cleaned and checked tables (immigration and temperature) to standardized storage as parquet files 
tempa_table.coalesce(1).write.mode("overwrite").parquet("./standardized/temperature_standardized")
imm_table.coalesce(1).write.mode("overwrite").parquet("./standardized/immigration_standardized")

                                                                                

Note how we are currently using "overwrite" mode and writing out to a single file with coalesce. This is because the dataset is not large yet and parquet file sizes are optimized for file size between 512MB and 1GB (or an HDFS block). Many small file sizes actually reduce the reading time of the dataset and could also result in higher costs caused by the way in which spark executes compute on its nodes. \
We are overwriting as there is currently no new data coming in and we are still in the development stage. Once the solution goes to production with regular updates the mode will be changed to "append".

In [52]:
# 8. Combine standardized immigration and temperature tables using a left join on immigration table on i94_port
imm_fact_table = imm_table.join(tempa_table, on = "i94port", how = "left")
imm_fact_table.count()

3096797

In [53]:
imm_fact_table.summary().show()

[Stage 114:>                                                        (0 + 1) / 1]

+-------+-------+------------------+--------------------+-------+------------------+-----------------+------------------+------------------+----------+------------------+-----------+-------------+--------+---------+
|summary|i94port|             cicid|               i94yr| i94mon|            i94cit|          arrdate|           depdate|           i94visa|        dt|AverageTemperature|       City|      Country|Latitude|Longitude|
+-------+-------+------------------+--------------------+-------+------------------+-----------------+------------------+------------------+----------+------------------+-----------+-------------+--------+---------+
|  count|3096797|           3096797|             3096797|3096797|           3096797|          3096797|           2954310|           3096797|   1389943|           1389943|    1389943|      1389943| 1389943|  1389943|
|   mean|   null| 3078504.781015998|              2016.0|    4.0|304.91112849825157|20559.84772686101|20573.952449133638|1.8453980031626

                                                                                

In [54]:
imm_fact_table.coalesce(1).write.mode("overwrite").parquet("./optimized/optimized_immigration_fact")

                                                                                

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### Creating value through Agile development
The Capstone project provides potential end-users (immigration officers, travel agencies, government officials, social sciences researchers, ESG reporting companies) with a fact table that links temperatures of ports of arrival in the US with influx of US aliens. The purpose of the project is to stay true to agile development and quickly deliver a preliminary solution to users to determine if more value can be generated. To that effect the notebook can be executed on a local machine provided that the input datasets have been downloaded from the repository. The functions and pipeline have however been set up to be efficient and to easily scale with little amendment. As it stands the data should be updated yearly with new data. \
The data model as defined in 3.1 together with the pipeline undergoes a robust and scalable data protocol: raw -> standardized -> optimized. In a following iteration the fact table would reside and be updated in an enriched stage upon which several different optimized tables flow from. \
A data dictionary can also be found in 3.1 with descriptions of the columns. 
##### Limitations
The current setup has several limitations as discussed throughout the notebook. Mainly the completeness of the datasets and the availability of data across years poses issues. \
To improve the standardized tables and optimized fact table more reliable datasets could be found that can schedule updates directly from the API of which they originate. This would also allow the introduction of Airflow to schedule and orchestrate updates. 
##### Performance
The standardized tables and optimized table are currently stored locally in parquet files using spark tables. For the several million records currently ingested and in the output this runs very efficiently. Once the solution needs to scale to multiple years of temperature and immigration data a next step would be to containerize the solution in Docker and provide enough compute power for regular updates. The data would also be partitioned on i94_ports and by year/month for organised access. \
A step after this would be to create a virtual machine (VM) with enough storage to hold many years. This will work efficiently for up to a 100 users if the storage disk is large enough, say 128GB, and if the core count is at least 32. With this setup even a large fact table with many concurrent read operations will suffice a 100 users. \
However, once the optimized dataset becomes larger with more countries, more historic years, more frequent (say monthly) updates, and over 100 users the solution will need to be scaled to the Cloud. The VM setup can still provide the compute power for updates and ingestion, but the storage will need to be handled by a data lake preferably. The optimized table can be migrated to a dedicated Redshift cluster upon which unlimited users will be able to execute read operations for downstream analysis, such as dashboarding or reports. 
##### Scaling
The fully scaled solution (Large datasets, 100+ people, and daily updates) would look something like this:
- Ingest the immigration "transaction" data daily upon availability using Airflow and EMR cluster (self-maintained and optimized for Airflow and raw data API origins as well as its network and data centre locations).
- Store the raw immigration data in S3. 
- Update the standardized tables through event-driven scheduling on arrival of raw daily immigration data in S3 raw using EC2 (managed Spark) and orchestrate with Airflow.
- Update enriched stage table in dedicated Redshift cluster. 
- Update various optimized tables in the Redshift cluster to be served to end-users through dashboarding tools, semantic/output layer tools such as dbt, Supergrain, Hex. Ensure that data monitoring checks are in place before updating the optimized tables to ensure constant uptime. 