# Project: Migrations in the United States
## Data Engineering Capstone Project - WGA
### Project Summary
In this project, we will explore the immigration information of the United States, to later analyze it, where we will find:

- The effects of temperature on the volume of travellers,
- The seasonality of travel
- The connection between the volume of travel and the number of entry ports (ie airports)
- The connection between the volume of travel and the demographics of various cities


For the project we will use the following data:

- I94 Immigration Data: Data from the United States National Office of Tourism and Commerce, data from form i94, additionally a data dictionary is added in the project.

    - countries.csv : Data that contains the information of the countries.
    - i94portCodes.csv: Data containing the information of the cities.
    This information was extracted from:
    https://travel.trade.gov/research/reports/i94/historical/2016.html

- World Temperature Data: Database containing information on the temperatures of various cities in the world.                         
  This information was extracted from:
  https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

- U.S. City Demographic Data: Information that contains the demographics of cities in the United States, this information comes from the US Census Bureau's 2015 American Community Survey.
- Airport Code Table: This Information on airports and cities.

For the project we do the following:

- Data added according to dates (year, month, day, etc ...)
- Data aggregated by airports and cities
- Explore the temperatures of the entry and exit of travelers in the United States
- Demographic data for the United States

The steps of this project are:

- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_add
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd

## Step 1: Scope the Project and Gather Data
### Scope
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Describe and Gather Data
Describe the data sets you're using. Where did it come from? What type of information is included?

### Immigration Dataset
The US immigration database contains 3 million lines, we will use 1000 rows from a "csv" data set to explore it.

In [2]:
# Read in the data here
df_temp_sample = pd.read_csv('./data/immigration_data_sample.csv')
#us_spark=spark.read.csv("./data/us-cities-demographics.csv", sep=';', header=True)

##### Data set that describes the information of a traveler when entering the United States

In [3]:
df_temp_sample.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

##### The data is described as follows:

- i94cit : country of citizenship
- i94res : country of residence
- i94port: arrival airport
- arrdate: arrival date.
- i94mode: Describes the i94 modes
- i94addr: I94 codes
- depdate: is the Departure Date from the USA. It is a SAS date numeric field thata permament format has not been applied.
- i94bir: Age of Respondent in Years.
- i94visa: Visa codes.
- occup: Occupation that will be performed in U.S.
- biryear: 4 digit year of birth.
- dtaddto: Character Date Field - Date to which admitted to U.S. (allowed to stay until)
- gender: Traveler's gender.
- insnum: INS number.
- airline: Airline used to arrive in U.S.
- admnum: Admission Numbe
- fltno: Flight number of Airline used to arrive in U.S.
- visatype: Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

##### We will show the example with 50 columns

In [4]:
pd.set_option('display.max_columns', 50)
df_temp_sample.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,51.0,2.0,1.0,20160408,,,T,N,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,20635.0,48.0,2.0,1.0,20160412,,,T,O,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,33.0,2.0,1.0,20160402,,,G,O,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,20575.0,39.0,2.0,1.0,20160428,,,O,O,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,20553.0,35.0,2.0,1.0,20160401,,,O,O,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


##### We increase the data dictionary:

###### In this step, we will add details to our data dictionary, which will consist of replacing the codes in our database since our data set is denormalized.
###### First, we will add to the data dictionary the columns I94CIT and I94RES (countries.csv), which belong to the country of citizenship and the country of residence of our travelers.

In [5]:
df_countries = pd.read_csv('./data/countries.csv')

In [6]:
df_countries.shape

(289, 2)

In [7]:
df_countries.head()

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


###### Then we will add a table that will report the i94port codes and the input city (i94portCodes.csv).

In [8]:
df_i94portCodes = pd.read_csv('./data/i94portCodes.csv')

In [9]:
df_i94portCodes.shape

(660, 3)

In [10]:
df_i94portCodes.head()

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


##### Demographic data
###### This database contains the reasons for travel, we will explore if there is a relationship between the flow of immigration and the demographics of various cities in the United States.

In [11]:
# Read in the data here
df_demographics_cities = pd.read_csv('./data/us-cities-demographics.csv', sep=';')

In [12]:
df_demographics_cities.shape

(2891, 12)

In [13]:
df_demographics_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [14]:
# we look at the list of available columns in the dataset
df_demographics_cities.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

###### Now, we are going to load the airport codes that will be used to link the airports with their codes

##### Airport data
###### We will include the information of the airports as they are the entry point for innmigrants

In [15]:
# Read in the data here
df_airports_codes = pd.read_csv('./data/airport-codes_csv.csv')

In [17]:
df_airports_codes.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [18]:
df_airports_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### World Temperature data
###### Data that will serve to link the climatic and tourist data, for this, we add the information of the world temperature and we will explore if the climate impacts on the volume of tourists.

In [19]:
df_temperature_city = pd.read_csv('./data//GlobalLandTemperaturesByCity.csv')

In [20]:
df_temperature_city.shape

(8599212, 7)

In [21]:
df_temperature_city.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Full immigration dataset
###### Finally, the complete immigration database will be loaded

In [22]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_immigration_global = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [23]:
df_immigration_global.count()

3096313

In [24]:
df_immigration_global.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [25]:
#write to parquet
df_immigration_global.write.parquet("sas_data")
df_immigration_global=spark.read.parquet("sas_data")

## Step 2: Explore and Assess the Data
### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

### Cleaning Steps
Document steps necessary to clean the data

#### Temperature data
We will explore the temperature dataset

In [26]:
df_temperature_city.shape

(8599212, 7)

###### We check if we need all the data

In [27]:
df_temperature_city.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [28]:
df_temperature_city['Country'].nunique()

159

###### The dataframe contains temperature data for 69 countries, we will filter the countries to be able to work it better

In [31]:
# We will filter the United States
df_temperature_city = df_temperature_city[df_temperature_city['Country']=='United States']

In [33]:
# Convert the date to datetime objects
df_temperature_city['convertedDate'] = pd.to_datetime(df_temperature_city.dt)

###### This project focuses on air travelers, we will manage data from 1950, because commercial travel developed after the Second World War in the 1950s.


In [34]:
# Remove all dates prior to 1950
df_temperature_city=df_temperature_city[df_temperature_city['convertedDate']>"1950-01-01"].copy()

In [35]:
df_temperature_city.shape

(196348, 8)

In [152]:
#We explore the most recent date
df_temperature_city['convertedDate'].max()

Timestamp('2013-09-01 00:00:00')

###### We did not find any temperature that can be linked to our project, it will be assumed that there will be information that can be linked to our immigration database.

###### Now we will explore the data

In [37]:
# We explore null values
df_temperature_city.isnull().sum()

dt                               0
AverageTemperature               1
AverageTemperatureUncertainty    1
City                             0
Country                          0
Latitude                         0
Longitude                        0
convertedDate                    0
dtype: int64

In [38]:
df_temperature_city[df_temperature_city.AverageTemperature.isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
287781,2013-09-01,,,Anchorage,United States,61.88N,151.13W,2013-09-01


###### To replace the null values, the average of historical data will be used, on the other hand we will use the combination of city and date as the main key, since we assume that each row represents a union between city and date, then we are going to explore it.

In [39]:
df_temperature_city.shape

(196348, 8)

In [40]:
df_temperature_city[['City','convertedDate']].drop_duplicates().shape

(189472, 2)

###### It is possible to view multiple entries per city, now we will explore multiple entries

In [42]:
df_temperature_city[df_temperature_city[['City','convertedDate']].duplicated()].head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01
405837,1950-03-01,3.871,0.232,Arlington,United States,39.38N,76.99W,1950-03-01
405838,1950-04-01,9.678,0.191,Arlington,United States,39.38N,76.99W,1950-04-01
405839,1950-05-01,16.786,0.234,Arlington,United States,39.38N,76.99W,1950-05-01
405840,1950-06-01,21.548,0.222,Arlington,United States,39.38N,76.99W,1950-06-01


In [44]:
df_temperature_city[(df_temperature_city['City'] == 'Arlington') & (df_temperature_city.dt == '1950-02-01')]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,convertedDate
402597,1950-02-01,11.144,0.199,Arlington,United States,32.95N,96.70W,1950-02-01
405836,1950-02-01,1.655,0.057,Arlington,United States,39.38N,76.99W,1950-02-01


###### It can be explored that temperature is measured at various locations.

##### Airport data
###### Now, we will explore the airport dataset

In [45]:
df_airports_codes.shape

(55075, 12)

In [46]:
df_airports_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


###### We will explore airports by country

In [47]:
df_airports_codes.groupby('iso_country')['iso_country'].count()

iso_country
AD        2
AE       57
AF       64
AG        3
AI        1
AL       13
AM       13
AO      104
AQ       27
AR      848
AS        4
AT      145
AU     1963
AW        1
AZ       35
BA       15
BB        6
BD       16
BE      146
BF       51
BG      134
BH        4
BI        7
BJ       10
BL        1
BM        3
BN        2
BO      197
BQ        3
BR     4334
      ...  
TM       21
TN       15
TO        6
TR      124
TT        3
TV        3
TW       65
TZ      207
UA      191
UG       38
UM        6
US    22757
UY       54
UZ      176
VA        1
VC        6
VE      592
VG        3
VI        9
VN       50
VU       32
WF        2
WS        4
XK        6
YE       25
YT        1
ZA      489
ZM      103
ZW      138
ZZ        7
Name: iso_country, Length: 243, dtype: int64

###### The data set contains information for all countries, in this example only the United States airports will be used, first we will explore if we are missing information.

In [48]:
df_airports_codes[df_airports_codes['iso_country'].isna()].shape

(247, 12)

In [49]:
#We explore if information is missing in the countries
df_airports_codes[df_airports_codes['iso_country'].isna()].groupby('continent')['continent'].count()

continent
AF    247
Name: continent, dtype: int64

###### The continent where information is lacking is in Africa, for our example it will not cause us problems

In [50]:
#We eliminate the missing
df_airports_codes = df_airports_codes[df_airports_codes['iso_country'].fillna('').str.upper().str.contains('US')].copy()

###### We explore the columns

In [51]:
df_airports_codes.groupby('type')['type'].count()

type
balloonport          18
closed             1326
heliport           6265
large_airport       170
medium_airport      692
seaplane_base       566
small_airport     13720
Name: type, dtype: int64

###### For our project, we will only use the commercial airports where immigration data is linked, we will remove data that does not serve us.

In [53]:
excludedValues = ['closed', 'heliport', 'seaplane_base', 'balloonport']
df_airports_codes = df_airports_codes[~df_airports_codes['type'].str.strip().isin(excludedValues)].copy()

###### We explore the nulls

In [54]:
#We review the values
df_airports_codes.isnull().sum()

ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

###### For the project, we cannot use the airport identification code from the immigration dataset, we will explore another option, we will use the names of the municipalities to be able to link them.
###### We explore the data to obtain the names of the municipalities.

In [56]:
# We check if the municipality column is found in all airports
df_airports_codes[df_airports_codes.municipality.isna()].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7653,6XA4,small_airport,Zadow Airstrip,,,US,US-TX,,6XA4,,,"-95.954353809, 29.991738550900003"
7887,74xa,small_airport,Gun Barrel City Airpark,385.0,,US,US-TX,,74XA,,,"-96.1456650496, 32.3551499558"
8082,79ID,small_airport,Kooskia (Clear Creek Int) Airport,1800.0,,US,US-ID,,79ID,,,"-115.869691372, 46.0488642914"
8114,79WT,small_airport,Ellensburg (Rotor Ranch) Airport,1962.0,,US,US-WA,,79WT,,,"-120.589778423, 47.091426059499994"
9055,8FA4,small_airport,Samsula / Coe Field,40.0,,US,US-FL,,8FA4,,,"-81.1328315735, 29.0102045831"


###### We will remove the nulls

In [57]:
df_airports_codes = df_airports_codes[~df_airports_codes['municipality'].isna()].copy()

###### We convert the municipal field to uppercase to be able to link it to our database

In [58]:
df_airports_codes.municipality = df_airports_codes.municipality.str.upper()

In [59]:
df_airports_codes.groupby('iso_region')['iso_region'].count()

iso_region
US-AK      586
US-AL      197
US-AR      291
US-AZ      214
US-CA      551
US-CO      288
US-CT       56
US-DC        2
US-DE       36
US-FL      522
US-GA      365
US-HI       35
US-IA      232
US-ID      238
US-IL      579
US-IN      486
US-KS      372
US-KY      164
US-LA      281
US-MA       79
US-MD      157
US-ME      122
US-MI      379
US-MN      361
US-MO      411
US-MS      211
US-MT      255
US-NC      349
US-ND      297
US-NE      259
US-NH       54
US-NJ      116
US-NM      149
US-NV      113
US-NY      402
US-OH      492
US-OK      372
US-OR      357
US-PA      486
US-RI       10
US-SC      173
US-SD      162
US-TN      228
US-TX     1546
US-U-A       3
US-UT      103
US-VA      311
US-VT       66
US-WA      379
US-WI      457
US-WV       83
US-WY       95
Name: iso_region, dtype: int64

###### We can observe an error in the state of "U-A". We will use the state field to link it with the name of the city and thus we can match it to the demographics of the city.

In [60]:
#We will use the region field since it is the combination between the United States code and the state.
df_airports_codes['len'] = df_airports_codes["iso_region"].apply(len)
#We remove the wrong codes
df_airports_codes = df_airports_codes[df_airports_codes['len']==5].copy()
#Finally the state code is extracted.
df_airports_codes['state'] = df_airports_codes['iso_region'].str.strip().str.split("-", n = 1, expand = True)[1]

##### Demographic data

###### We will explore the demographic dataset

In [61]:
df_demographics_cities.shape

(2891, 12)

###### We convert the city to capital letters by eliminating the blank spaces

In [63]:
df_demographics_cities.City = df_demographics_cities.City.str.upper().str.strip()

###### We explore the values

In [64]:
df_demographics_cities.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

###### The database seems correct, we will not make any changes.

In [65]:
# Converting to uppercase and removing whitespace
df_demographics_cities.City = df_demographics_cities.City.str.strip().str.upper()

###### We explored if city and race could function as primary key

In [66]:
#Our main key will be the city and the race
df_demographics_cities[df_demographics_cities[['City','Race']].duplicated()].head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193
210,LAKEWOOD,California,39.9,41523.0,40069.0,81592,4094.0,18274.0,3.13,CA,Hispanic or Latino,24987
238,GLENDALE,California,42.1,98181.0,102844.0,201025,4448.0,111510.0,2.69,CA,White,146718
300,SPRINGFIELD,Massachusetts,31.8,74744.0,79592.0,154336,5723.0,16226.0,2.81,MA,Asian,5606
549,BLOOMINGTON,Indiana,23.5,40588.0,43227.0,83815,2368.0,10033.0,2.33,IN,Asian,9801


###### We check if the defined is enough

In [67]:
df_demographics_cities[(df_demographics_cities.City == 'WILMINGTON') & (df_demographics_cities.Race == 'Asian')]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
102,WILMINGTON,North Carolina,35.5,52346.0,63601.0,115947,5908.0,7401.0,2.24,NC,Asian,3152
177,WILMINGTON,Delaware,36.4,32680.0,39277.0,71957,3063.0,3336.0,2.45,DE,Asian,1193


###### It is clear that the difference is the state, we will add this column to our main key

In [68]:
df_demographics_cities[df_demographics_cities[['City', 'State','Race']].duplicated()]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count


###### There is no duplicate in this combination, so it can serve as a primary key
##### Immigration data
###### We will explore the immigration data

In [69]:
df_immigration_global.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

###### Let's explore if we can use the column cicid as a primary key

In [70]:
# We will create a temporary view
df_immigration_global.createOrReplaceTempView("temp_table")

In [71]:
df_immigration_global.count()

3096313

In [72]:
spark.sql("""
SELECT COUNT (DISTINCT cicid)
FROM temp_table
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+



###### We will check the i94port codes

In [73]:
spark.sql("""
SELECT LENGTH (i94port) AS len
FROM temp_table
GROUP BY len
""").show()

+---+
|len|
+---+
|  3|
+---+



###### The data size is correct, as the data comes from 01/01/1960, we will calculate all the dates by placing an "arrdate" to 01/01/1960

In [74]:
df_immigration_global = spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM temp_table")
df_immigration_global.createOrReplaceTempView("temp_table")

###### Now we will replace the columns "I94VISA", where we have:
- 1 = Business
- 2 = Pleasure
- 3 = Student

In [75]:
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM temp_table""").createOrReplaceTempView("temp_table")

In [76]:
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM temp_table""").createOrReplaceTempView("temp_table")

In [77]:
#We verify the nulls
spark.sql("SELECT count(*) FROM temp_table WHERE departure_date = 'N/A'").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



###### Then we do the following
- departure_date> arrival_date

In [78]:
spark.sql("""
SELECT COUNT(*)
FROM temp_table
WHERE departure_date <= arrival_date
""").show()

+--------+
|count(1)|
+--------+
|     375|
+--------+



###### We have 120 results where "departure_date" may be unsuccessful, we explore future solutions

In [79]:
spark.sql("""
SELECT arrival_date, departure_date
FROM temp_table
WHERE departure_date <= arrival_date
""").show(10)

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-30|    2016-04-29|
|  2016-04-30|    2016-04-28|
|  2016-04-30|    2016-04-29|
|  2016-04-05|    2012-04-14|
|  2016-04-05|    2016-03-14|
|  2016-04-14|    2016-03-03|
|  2016-04-01|    2016-02-28|
|  2016-04-04|    2016-03-07|
|  2016-04-01|    2016-03-05|
|  2016-04-05|    2016-03-07|
+------------+--------------+
only showing top 10 rows



###### We will not be able to correct these errors, the most practical way is to delete the rows

In [81]:
spark.sql("""
SELECT *
FROM temp_table
WHERE departure_date >= arrival_date
""").createOrReplaceTempView("temp_table")

###### We verify the arrival and departure dates and then combine them.

In [82]:
#We explore the different departure dates
spark.sql("SELECT COUNT (DISTINCT departure_date) FROM temp_table ").show()
#We explore the different arrival dates
spark.sql("SELECT COUNT (DISTINCT arrival_date) FROM temp_table ").show()
#We explore the values between both results
spark.sql("""   SELECT COUNT(DISTINCT departure_date) 
                FROM temp_table 
                WHERE departure_date IN (
                    SELECT DISTINCT arrival_date FROM temp_table
                ) 
                """).show()

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                           174|
+------------------------------+

+----------------------------+
|count(DISTINCT arrival_date)|
+----------------------------+
|                          30|
+----------------------------+

+------------------------------+
|count(DISTINCT departure_date)|
+------------------------------+
|                            29|
+------------------------------+



###### As we are missing a value we will combine both results that include the entry and exit arrows.
###### We explore unique values.

In [83]:
spark.sql("""
SELECT i94mode, count(*)
FROM temp_table
GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     238|
|    1.0| 2871184|
|    3.0|   61572|
|    2.0|   17970|
|    9.0|    2517|
+-------+--------+



###### As a result of our dictionary:
- 1 = 'Air'
- 2 = 'Let'
- 3 = 'Land'
- 9 = 'Not reported' We will only keep Air arrival since we're joining this with airport datasets
###### We will only use air flights to ensure our results
###### We explore the data

In [84]:
spark.sql("""
SELECT COUNT(*)
FROM temp_table
WHERE i94bir IS NULL
""").show()

+--------+
|count(1)|
+--------+
|      46|
+--------+



###### Due to the previous result we have missing data, we will check the column "biryear"

In [85]:
spark.sql("SELECT COUNT(biryear) FROM temp_table WHERE biryear IS NULL").show()

+--------------+
|count(biryear)|
+--------------+
|             0|
+--------------+



###### We explore in detail

In [86]:
spark.sql("SELECT MAX(biryear), MIN(biryear) FROM temp_table WHERE biryear IS NOT NULL").show()

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2016.0|      1916.0|
+------------+------------+



###### Let's explore the sequence of people who are 80 years old to see their behavior.

In [87]:
#Number of travelers over 80 years old
spark.sql("""
SELECT COUNT(*)
FROM temp_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
""").show()

# frequency of travellers by birth year
spark.sql("""
SELECT biryear, COUNT(*)
FROM temp_table 
WHERE biryear IS NOT NULL
AND biryear <= 1936
GROUP BY biryear
ORDER BY biryear ASC
""").show()

+--------+
|count(1)|
+--------+
|   24694|
+--------+

+-------+--------+
|biryear|count(1)|
+-------+--------+
| 1916.0|       8|
| 1917.0|      16|
| 1918.0|      21|
| 1919.0|      36|
| 1920.0|      34|
| 1921.0|      69|
| 1922.0|      89|
| 1923.0|     155|
| 1924.0|     209|
| 1925.0|     274|
| 1926.0|     414|
| 1927.0|     569|
| 1928.0|     792|
| 1929.0|    1073|
| 1930.0|    1442|
| 1931.0|    1794|
| 1932.0|    2239|
| 1933.0|    2688|
| 1934.0|    3442|
| 1935.0|    4194|
+-------+--------+
only showing top 20 rows



###### The year of birth can be used for each row, we will use it to calculate the age, then we will check it we can calculate it

In [88]:
spark.sql("SELECT (2016-biryear)-i94bir AS difference, count(*) FROM temp_table WHERE i94bir IS NOT NULL GROUP BY difference").show()

+----------+--------+
|difference|count(1)|
+----------+--------+
|       0.0| 2953435|
+----------+--------+



###### We explore if we can use gender

In [89]:
spark.sql("""
SELECT gender, count(*) 
FROM temp_table
GROUP BY gender
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1228646|
|  null|  407456|
|     M| 1316305|
|     U|     238|
|     X|     836|
+------+--------+



###### We look for wrong values in gender

In [90]:
spark.sql("""SELECT * FROM temp_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("temp_table")

###### We explore the data of the "citizenship and residence"

In [91]:
#citizenship countries
spark.sql("""
SELECT count(*) 
FROM temp_table
WHERE i94cit IS NULL
""").show()

#residence countries
spark.sql("""
SELECT count(*) 
FROM temp_table
WHERE i94res IS NULL
""").show()

#reported address
spark.sql("""
SELECT count(*) 
FROM temp_table
WHERE i94addr IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|  114019|
+--------+



###### The addresses (really the state of residence), sometimes missing with enough continuity, will not be used in the project.

In [92]:
spark.sql("""
SELECT COUNT(*)
FROM temp_table
WHERE visatype IS NULL
""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



###### We explore the "visa_type"

In [93]:
spark.sql("""
SELECT visa_type, visatype, count(*)
FROM temp_table
GROUP BY visa_type, visatype
ORDER BY visa_type, visatype
""").show()

+---------+--------+--------+
|visa_type|visatype|count(1)|
+---------+--------+--------+
| Business|      B1|  186610|
| Business|      E1|    3182|
| Business|      E2|   16227|
| Business|     GMB|     132|
| Business|       I|    2962|
| Business|      I1|     214|
| Business|      WB|  185857|
| Pleasure|      B2|  967988|
| Pleasure|      CP|   11785|
| Pleasure|     CPL|       8|
| Pleasure|     GMT|   79454|
| Pleasure|     SBP|       2|
| Pleasure|      WT| 1060229|
|  Student|      F1|   27789|
|  Student|      F2|    1774|
|  Student|      M1|     708|
|  Student|      M2|      30|
+---------+--------+--------+



###### According to the types of visa we have:

- B1 visa is for business visits valid for up to a year
- B2 visa is for pleasure visits valid for up to a year
- CP could not find a definition
- E2 investor visas allows foreign investors to enter and work inside of the United States based on a substantial investment
- F1 visas are used by non-immigrant students for Academic and Language training Courses.
- F2 visas are used by the dependents of F1 visa holders
- GMT could not find a definition
- M1 for students enrolled in non-academic or “vocational study”. Mechanical, language, cooking classes, etc ...
- WB Waiver Program (WT / WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.
- WT Waiver Program (WT / WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.

###### We explore the occupation field

In [94]:
spark.sql("""
SELECT occup, COUNT(*) AS n
FROM temp_table
GROUP BY occup
ORDER BY n DESC, occup
""").show()

+-----+-------+
|occup|      n|
+-----+-------+
| null|2538838|
|  STU|   3275|
|  OTH|    508|
|  NRR|    299|
|  MKT|    262|
|  EXA|    175|
|  ULS|    142|
|  ADM|    119|
|  GLS|    119|
|  TIE|    108|
|  MVC|     58|
|  ENO|     55|
|  CEO|     53|
|  TIP|     49|
|  LLJ|     45|
|  RET|     44|
|  CMP|     43|
|  PHS|     42|
|  UNP|     33|
|  HMK|     30|
+-----+-------+
only showing top 20 rows



###### Because the field is mostly not entered it will not be used in the project.
###### Now, the conceptual model will be built

In [95]:
df_immigration_global = spark.sql("""SELECT * FROM temp_table""")

## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
##### In our project we are interested in the flow of trips that occur in the United States, for this, the data set of "i94" will serve as our table of facts, the table will be called "FACT_IMMIGRATION_PJ_WGA", and will have the following fields: :

- cicid,
- citizenship_country,
- residence_country,
- city,
- state,
- arrival_date,
- departure_date,
- age,
- visa_type,
- detailed_visa_type,

##### Because our dataset contains information for only 1 month, we will use the following dimensions

#### DIM_TIME_PJ_WGA:
##### This dimension will serve to store the dates, the columns will be:

- date,
- year,
- month,
- day,
- week,
- weekday,
- dayofyear

#### DIM_AIRPORTS_PJ_WGA: 
##### This dimension will serve to store the airports, the columns will be:

- ident,
- type,
- name,
- elevation_ft,
- state,
- municipality,
- iata_code

#### DIM_CITY_DEMOGRAPHICS_PJ_WGA: 
##### This dimension will store the demographic data of the traveler areas, the columns will be:

- City,
- state,
- median_age,
- male_population,
- female_population,
- total population
- Foreign_born,
- Average_Household_Size,
- Race,
- Count,

#### DIM_TEMPERATURES_PJ_WGA: 
##### This dimension will store the temperatures of the cities, the columns will be:

- date,
- City,
- average temperature,
- average temperature uncertainty

### 3.2 Mapping Out Data Pipelines
##### Some data cleaning steps were considered in section 2, below we detail them:

#### Data Extraction:
##### We load the data sets in "CSV" and "SAS" formats

#### Data Transformation and Loading:

#### FACT_IMMIGRATION_PJ_WGA:
- Drop rows where the mode of arrival is not air travel
- Drop rows with incorrect gender data
- convert arrival and departure dates;
- replace country codes with the character string equivalents
- replace visa_type with character string
- replace port of entry with city and state
- filter out any row where the port of entry is not in the US
- compute age in a new row using birth year and year of our current date.
- insert data into our fact table
- Write to parquet

#### DIM_TEMPERATURES_PJ_WGA:
- For the temperature table, drop all data for cities outside the united states;
- For the temperature table, drop all data for dates before 1950 since airtravel wasn't possible before that date;
- Convert city to upper case
- Compute the average temperature and uncertainty over date+city partitions
- Insert into the temperature table as is since our dataset since our dataset may include new cities in future dates;
- Write to parquet

#### DIM_TIME_PJ_WGA:
- Get all the arrival dates from the immigration data_set;
- extract year, month, day, week from the date and insert all the values in the dim_time table;
- Write to parquet

#### DIM_AIRPORTS_PJ_WGA:
- Remove all non us airports
- Remove all invalid port of entries, ie: ['closed', 'heliport', 'seaplane_base', 'balloonport']
- Remove all rows where municipalities are missing.
- Convert municipality to upper case
- Insert to our table
- Write to parquet

#### DIM_CITY_DEMOGRAPHICS_PJ_WGA:

- Convert to city names to upper case
- Insert to our table
- Write to parquet

## Step 4: Run Pipelines to Model the Data

### 4.1 Create the data model
##### For this step we are going to read all the CSVs using the pandas dataframes and then convert them to spark dataframe

In [96]:
df_demographics_cities_spark = spark.read.format("csv").option("header", "true").option("delimiter", ";").load('./data/us-cities-demographics.csv')

In [97]:
df_demographics_cities_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [98]:
df_demographics_cities.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [99]:
spark.createDataFrame(df_demographics_cities).printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: double (nullable = true)
 |-- Female Population: double (nullable = true)
 |-- Total Population: long (nullable = true)
 |-- Number of Veterans: double (nullable = true)
 |-- Foreign-born: double (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: long (nullable = true)



##### Staging the data

In [100]:
# load dictionary data from our project
df_countryCodes = pd.read_csv('./data/countries.csv')
df_i94portCodes = pd.read_csv('./data/i94portCodes.csv')

# load the various csv files into pandas dataframes from our project
df_demographics = pd.read_csv('./data/us-cities-demographics.csv', sep=';')
df_temperature = pd.read_csv('./data/GlobalLandTemperaturesByCity.csv')

# load the SAS data from our project
df_immigration=spark.read.parquet("sas_data")

##### Transforming the data

In [101]:
# Converting data dictionaries to Spark
spark_df_countryCodes = spark.createDataFrame(df_countryCodes)
spark_df_countryCodes .createOrReplaceTempView("countryCodes")

In [102]:
# Remove the nulls
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isna()].copy()

In [103]:
# We filter our airport which is the United States
nonUSstates = ['CANADA', 'Canada', 'NETHERLANDS', 'NETH ANTILLES', 'THAILAND', 'ETHIOPIA', 'PRC', 'BERMUDA', 'COLOMBIA', 'ARGENTINA', 'MEXICO', 
               'BRAZIL', 'URUGUAY', 'IRELAND', 'GABON', 'BAHAMAS', 'MX', 'CAYMAN ISLAND', 'SEOUL KOREA', 'JAPAN', 'ROMANIA', 'INDONESIA',
               'SOUTH AFRICA', 'ENGLAND', 'KENYA', 'TURK & CAIMAN', 'PANAMA', 'NEW GUINEA', 'ECUADOR', 'ITALY', 'EL SALVADOR']

In [None]:
df_i94portCodes = df_i94portCodes[~df_i94portCodes.state.isin(nonUSstates)].copy()

In [104]:
spark_df_i94portCodes = spark.createDataFrame(df_i94portCodes)
spark_df_i94portCodes .createOrReplaceTempView("i94portCodes")

In [105]:
df_immigration.createOrReplaceTempView("immig_table")

In [106]:
#Eliminating all income to the United States that was not by air
spark.sql("""
SELECT *
FROM immig_table
WHERE i94mode = 1
""").createOrReplaceTempView("immig_table")

In [107]:
# Eliminating the empty genres
spark.sql("""SELECT * FROM immig_table WHERE gender IN ('F', 'M')""").createOrReplaceTempView("immig_table")

In [108]:
# Converting date fields to be operable
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date FROM immig_table").createOrReplaceTempView("immig_table")

In [109]:
# Converting "departure dates" into information that can be traded
spark.sql("""SELECT *, CASE 
                        WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate)
                        WHEN depdate IS NULL THEN NULL
                        ELSE 'N/A' END AS departure_date 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [110]:
# Removing invalid codes
#country of citizenship
spark.sql("""
SELECT im.*, cc.country AS citizenship_country
FROM immig_table im
INNER JOIN countryCodes cc
ON im.i94cit = cc.code
""").createOrReplaceTempView("immig_table")

In [111]:
spark.sql("""
SELECT im.*, cc.country AS residence_country
FROM immig_table im
INNER JOIN countryCodes cc
ON im.i94res = cc.code
""").createOrReplaceTempView("immig_table")

In [112]:
# Now, character strings will be added to the column "visa_type"
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE 'N/A' END AS visa_type 
                        
                FROM immig_table""").createOrReplaceTempView("immig_table")

In [113]:
# Now we will add the names of "entry_port" and the visa entries
spark.sql("""
SELECT im.*, pc.location AS entry_port, pc.state AS entry_port_state
FROM immig_table im 
INNER JOIN i94portCodes pc
ON im.i94port = pc.code
""").createOrReplaceTempView("immig_table")

In [114]:
# We calculate the age of the travelers and add it
spark.sql("""
SELECT *, (2016-biryear) AS age 
FROM immig_table
""").createOrReplaceTempView("immig_table")

In [115]:
# We insert the data of our fact
fact_immigration = spark.sql("""
                        SELECT 
                            cicid, 
                            citizenship_country,
                            residence_country,
                            TRIM(UPPER (entry_port)) AS city,
                            TRIM(UPPER (entry_port_state)) AS state,
                            arrival_date,
                            departure_date,
                            age,
                            visa_type,
                            visatype AS detailed_visa_type

                        FROM immig_table
""")

In [116]:
# Now we are going to extract all the entry and exit dates to create our dimension
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time_table")

In [117]:
# Now we are going to extract year, month, day, weekofyear, dayofweek and weekofyear from the date so that we can insert them
dim_time = spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time_table
ORDER BY date ASC
""")

In [118]:
# We filter only data from the United States
df_temperature = df_temperature[df_temperature['Country']=='United States'].copy()

# Convert the date to datetime objects
df_temperature['date'] = pd.to_datetime(df_temperature.dt)

# Eliminating dates before the 1950s
df_temperature=df_temperature[df_temperature['date']>"1950-01-01"].copy()

In [119]:
# Now, let's convert the city names to uppercase
df_temperature.City = df_temperature.City.str.strip().str.upper()

In [120]:
# convert the dataframes from pandas to spark
spark_df_temperature = spark.createDataFrame(df_temperature)
spark_df_temperature .createOrReplaceTempView("temperature")

In [121]:
dim_temperature = spark.sql("""
SELECT
    DISTINCT date, city,
    AVG(AverageTemperature) OVER (PARTITION BY date, City) AS average_temperature, 
    AVG(AverageTemperatureUncertainty)  OVER (PARTITION BY date, City) AS average_termperature_uncertainty
    
FROM temperature
""")

In [122]:

df_demographics.City = df_demographics.City.str.strip().str.upper()
df_demographics['State Code'] = df_demographics['State Code'].str.strip().str.upper()
df_demographics.Race = df_demographics.Race.str.strip().str.upper()

In [123]:
# convert the dataframes from pandas to spark
spark_df_demographics = spark.createDataFrame(df_demographics)
spark_df_demographics.createOrReplaceTempView("demographics")

In [124]:
dim_demographics = spark.sql("""
                                SELECT  City, 
                                        State, 
                                        `Median Age` AS median_age, 
                                        `Male Population` AS male_population, 
                                        `Female Population` AS female_population, 
                                        `Total Population` AS total_population, 
                                        `Foreign-born` AS foreign_born, 
                                        `Average Household Size` AS average_household_size, 
                                        `State Code` AS state_code, 
                                        Race, 
                                        Count
                                FROM demographics
""")

In [125]:
#Now, as our database has several nulls, we will use spark to be able to process them
spark_df_airports = spark.read.format("csv").option("header", "true").load('./data/airport-codes_csv.csv')
spark_df_airports.createOrReplaceTempView("airports")

In [126]:
spark.sql("""
SELECT *
FROM airports
WHERE iso_country IS NOT NULL
AND UPPER(TRIM(iso_country)) LIKE 'US'
""").createOrReplaceTempView("airports")

In [128]:
spark.sql("""
SELECT *
FROM airports
WHERE LOWER(TRIM(type)) NOT IN ('closed', 'heliport', 'seaplane_base', 'balloonport')
AND municipality IS NOT NULL
AND LENGTH(iso_region) = 5
""").createOrReplaceTempView("airports")

In [129]:
#Inserting the data in our airport dimension
dim_airports = spark.sql("""
SELECT TRIM(ident) AS ident, type, name, elevation_ft, SUBSTR(iso_region, 4) AS state, TRIM(UPPER(municipality)) AS municipality, iata_code
FROM airports
""")

In [131]:
# Finally, we save our data set in parquet format
dim_demographics.write.parquet("dim_demographics_pj_wga")
dim_time.write.parquet("dim_time_pj_wga")
dim_airports.write.parquet("dim_airports_pj_wga")
dim_temperature.write.parquet("dim_temperature_pj_wga")
fact_immigration.write.parquet("fact_immigration_pj_wga")

## 4.2 Data Quality Checks
### Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:

### Integrity constraints on the relational database (e.g., unique key, data type, etc.)
- Unit tests for the scripts to ensure they are doing the right thing
- Source/Count checks to ensure completeness
### Run Quality Checks

In [132]:
#We explore our data
dim_demographics.createOrReplaceTempView("dim_demographics_pj_wga")
dim_time.createOrReplaceTempView("dim_time_pj_wga")
dim_airports.createOrReplaceTempView("dim_airports_pj_wga")
dim_temperature.createOrReplaceTempView("dim_temperature_pj_wga")
fact_immigration.createOrReplaceTempView("fact_immigration_pj_wga")

###### First, let's make sure the columns used as primary keys don't contain any null values. We define a function that could be incorporated in an automated data pipeline

In [133]:
def nullValueCheck(spark_ctxt, tables_to_check):
    """
    This function performs null value checks on specific columns of given tables received as parameters and raises a ValueError exception when null values are encountered.
    It receives the following parameters:
    spark_ctxt: spark context where the data quality check is to be performed
    tables_to_check: A dictionary containing (table, columns) pairs specifying for each table, which column is to be checked for null values.   
    """  
    for table in tables_to_check:
        print(f"Performing data quality check on table {table}...")
        for column in tables_to_check[table]:
            returnedVal = spark_ctxt.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""")
            if returnedVal.head()[0] > 0:
                raise ValueError(f"Data quality check failed! Found NULL values in {column} column!")
        print(f"Table {table} passed.")

###### Now, we carry out quality control of our information

In [136]:
#Data to verify
tables_to_check = { 'fact_immigration_pj_wga' : ['cicid'], 'dim_time_pj_wga':['date'], 'dim_demographics_pj_wga': ['City','state_code'], 'dim_airports_pj_wga':['ident'], 'dim_temperature_pj_wga':['date','City']}

#We use our function
nullValueCheck(spark, tables_to_check)

Performing data quality check on table fact_immigration_pj_wga...
Table fact_immigration_pj_wga passed.
Performing data quality check on table dim_time_pj_wga...
Table dim_time_pj_wga passed.
Performing data quality check on table dim_demographics_pj_wga...
Table dim_demographics_pj_wga passed.
Performing data quality check on table dim_airports_pj_wga...
Table dim_airports_pj_wga passed.
Performing data quality check on table dim_temperature_pj_wga...
Table dim_temperature_pj_wga passed.


##### The data quality check was successful.

##### Then, we will perform a more detailed quality check.

In [138]:
#Data quality of the time dimension

#Check the number of rows, in our case the expected value is 192
spark.sql("""
SELECT COUNT(*) - 192
FROM dim_time_pj_wga
""").show()

# Check unique values, in our case the expected value is 192
spark.sql("""
SELECT COUNT(DISTINCT date) - 192
FROM dim_time_pj_wga
""").show()

# Finally for a more detailed check, we subtract both results, the expected value is NULL
spark.sql("""
SELECT DISTINCT date
FROM dim_time_pj_wga

MINUS

(SELECT DISTINCT arrival_date AS date
FROM immig_table
UNION
SELECT DISTINCT departure_date AS date
FROM immig_table
WHERE departure_date IS NOT NULL)

""").show()

+--------------------------------+
|(count(1) - CAST(192 AS BIGINT))|
+--------------------------------+
|                               0|
+--------------------------------+

+--------------------------------------------+
|(count(DISTINCT date) - CAST(192 AS BIGINT))|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+

+----+
|date|
+----+
+----+



In [142]:
#Data quality of the fact innmigration
# Check the number of rows, in our case the expected value is 2244307
spark.sql("""
SELECT count(distinct cicid) - 2244307
FROM immig_table
""").show()

# Check unique values, in our case the expected value is 2244307
spark.sql("""
SELECT count(distinct cicid) - 2244307
FROM fact_immigration_pj_wga
""").show()

# Finally for a more detailed check, we subtract both results, the expected value is 0
spark.sql("""
SELECT count(*) - 2244307
FROM fact_immigration_pj_wga
""").show()

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2244307 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+-------------------------------------------------+
|(count(DISTINCT cicid) - CAST(2244307 AS BIGINT))|
+-------------------------------------------------+
|                                                0|
+-------------------------------------------------+

+------------------------------------+
|(count(1) - CAST(2244307 AS BIGINT))|
+------------------------------------+
|                                   0|
+------------------------------------+



In [143]:
#Data quality of the dim_demographics
# Check the number of rows, in our case the expected value is 2891
spark.sql("""
SELECT count(*) - 2891
FROM dim_demographics_pj_wga
""").show()

spark.sql("""
SELECT COUNT(DISTINCT city, state, race) - 2891
FROM dim_demographics_pj_wga
""").show()

+---------------------------------+
|(count(1) - CAST(2891 AS BIGINT))|
+---------------------------------+
|                                0|
+---------------------------------+

+----------------------------------------------------------+
|(count(DISTINCT city, state, race) - CAST(2891 AS BIGINT))|
+----------------------------------------------------------+
|                                                         0|
+----------------------------------------------------------+



In [144]:
#Data quality of the dim_airports
# Check the number of rows, in our case the expected value is 14529
spark.sql("""
SELECT count(*) - 14529
FROM dim_airports_pj_wga
""").show()

spark.sql("""
SELECT COUNT(DISTINCT ident) - 14529
FROM dim_airports_pj_wga
""").show()

+----------------------------------+
|(count(1) - CAST(14529 AS BIGINT))|
+----------------------------------+
|                                 0|
+----------------------------------+

+-----------------------------------------------+
|(count(DISTINCT ident) - CAST(14529 AS BIGINT))|
+-----------------------------------------------+
|                                              0|
+-----------------------------------------------+



In [145]:
#Data quality of the dim_temperature
# Check the number of rows, in our case the expected value is 189472
spark.sql("""
SELECT count(*) - 189472
FROM dim_temperature_pj_wga
""").show()

spark.sql("""
SELECT COUNT(DISTINCT date, city) - 189472
FROM dim_temperature_pj_wga
""").show()

+-----------------------------------+
|(count(1) - CAST(189472 AS BIGINT))|
+-----------------------------------+
|                                  0|
+-----------------------------------+

+-----------------------------------------------------+
|(count(DISTINCT date, city) - CAST(189472 AS BIGINT))|
+-----------------------------------------------------+
|                                                    0|
+-----------------------------------------------------+



###### Next, we try joining the dimensions

In [147]:
# Now, we link the airports with the immigration
fact_immigration.show(2)
dim_airports.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows

+-----+-------------+--------------------+------------+-----+------------+---------+
|ident|         type|                name|elevation_ft|state|municipality|iata_code|
+-----+-------------+--------------------+------------+

###### Due to the information, where we find that a city can have more than one airports, we will verify how many combinations can be made between cities and states, this will serve to analyze the behavior of these 2 data sources

In [148]:
# We check the number of combinations between cities and our fact data set
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigration_pj_wga
""").show()

#Both combinations are common
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigration_pj_wga
) fi
INNER JOIN 
(
SELECT DISTINCT municipality, state
FROM dim_airports_pj_wga
) da
ON fi.city = da.municipality
AND fi.state = da.state
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        165|
+---------------------------+

+--------+
|count(1)|
+--------+
|     102|
+--------+



###### Approximately more than 60% of our information can be linked between our set of factual data and those of the airport (it should be noted that we only have one month of data), and it is very good.
###### Let's now explore the demographic dataset, possibly the combination is lower since our table does not contain all the cities in the United States

In [149]:
fact_immigration.show(2)
dim_demographics.show(2)

+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|    cicid|citizenship_country|residence_country|  city|state|arrival_date|departure_date| age|visa_type|detailed_visa_type|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
|4041803.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|    2016-05-07|49.0| Business|                B1|
|4041804.0|            GERMANY|          GERMANY|BANGOR|   ME|  2016-04-22|          null|38.0| Business|                B1|
+---------+-------------------+-----------------+------+-----+------------+--------------+----+---------+------------------+
only showing top 2 rows

+-------------+-------------+----------+---------------+-----------------+----------------+------------+----------------------+----------+------------------+-----+
|         City|        State|median_age|male_population|femal

In [150]:
#Explore the combinations
spark.sql("""
SELECT COUNT(DISTINCT city, state)
FROM fact_immigration_pj_wga
""").show()

# Let's look at the combinations of our fact data set and the "demographics" dimension
spark.sql("""
SELECT COUNT(*)
FROM
(
SELECT DISTINCT city, state
FROM fact_immigration_pj_wga
) fi
INNER JOIN 
(
SELECT DISTINCT City, state_code
FROM dim_demographics_pj_wga
) da
ON fi.city = da.City
AND fi.state = da.state_code
""").show(2)

+---------------------------+
|count(DISTINCT city, state)|
+---------------------------+
|                        165|
+---------------------------+

+--------+
|count(1)|
+--------+
|      69|
+--------+



###### Because we do not have all the cities, our combination is not high, but the result is still perfect, let's see what happens if we combine the non-existent city and state

In [151]:
# Finally, we explore the number of rows to see what results we have
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration_pj_wga
WHERE CONCAT(city, state) IN (
    SELECT CONCAT(fi.city, fi.state)
    FROM
    (
        SELECT DISTINCT city, state
        FROM fact_immigration_pj_wga
    ) fi
    INNER JOIN 
    (
        SELECT DISTINCT municipality, state
        FROM dim_airports_pj_wga 
    ) da
    ON fi.city = da.municipality
    AND fi.state = da.state
)
""").show(2)

+--------+
|count(1)|
+--------+
| 1983869|
+--------+



###### In our results we initially had 2 244 307 rows, now 1 983 869, the result is still optimal, but, considering that our information is incomplete, to specify, the number of cities, since it only includes populations with more than 65,000 inhabitants , and we eliminate the data that is not within our final result.

### 4.3 Data dictionary

#### Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### DIM_CITY_DEMOGRAPHICS_PJ_WGA
##### Primary key  = City, state_code, Race
- City (String): Name of the city
- State (String): Name of the state
- median_age (double): Median age for the city
- male_population (double): Size of the male population
- female_population (double): Size of the female population
- total_population (long) : Total population
- foreign_born (double): Number of foreign born residents
- average_household_size (double): Average size of a household
- state_code (String): Two letter state code
- Race (String): Racial category selected by respondants. Possible values are:
- Hispanic or Latino
			White
			Asian
			Black or African-American
			American Indian and Alaska Native
- Count (long): Number of respondants (ie Size) for the combination City, State, Race

#### DIM_TIME_PJ_WGA
##### Primary key: date
- date (string): date in the format yyyy-mm-dd
- year (integer): year for the given date, available for aggregation
- month (integer): month for the given date, available for aggregation
- day (integer): day for the given date, available for aggregation
- week (integer): week for the given date, available for aggregation (values between 0 and 52)
- weekday (integer): weekday for the given date, available for aggregation (values between 0 and 6)
- year_day (integer): year for the given date, available for aggregation

#### DIM_AIRPORTS_PJ_WGA
##### Primary key: ident
- ident (string): Airport identified
- type (string): Type of airport. Possible values are:
		large_airport
		medium_airport
		small_airport
- name (string): Name of the airport 
- elevation_ft (string):  Elevation of the airport
- state (string): State where the airport is located
- municipality (string): Name of the city closest to the airport
- iata_code (string): iata_code that appears to airplane tickets and baggages

#### DIM_TEMPERATURES_PJ_WGA
##### Primary key: date, city
- date (timestamp): date when the temperature was registered
- city (string): city where the termperature was registered
- average_temperature (double): average temperature recorded for the day in the specified city
- average_termperature_uncertainty (double): average uncertainty recored 

#### FACT_IMMIGRATION_PJ_WGA
##### Primary key: cicid
- cicid (double): Unique identifier for each traveller
- citizenship_country (string): Traveller's country of citizenship
- residence_country (string): Traveller's country of residence
- city (string): City where the entry port of the traveller is located
- state (string): State where the entry port of the traveller is located
- arrival_date (date): Traveller's arrival date
- departure_date (string): Traveller's departure date, if known
- age (double):  Traveller's age
- visa_type (string): aggregate visa type. Possible values are:
- Business,
- Pleasure,
- Student,
- detailed_visa_type (string): Detailed visa types. Numerous values are available. Not all could be identified:
		B1: B1 visa is for business visits valid for up to a year
		B2: B2 visa is for pleasure visits valid for up to a year
		CP: could not find a definition
		E2: E2 investor visas allows foreign investors to enter and work inside of the United States based on a substantial investment
		F1: F1 visas are used by non-immigrant students for Academic and Language training Courses. 
		F2: F2 visas are used by the dependents of F1 visa holders
		GMT: could not find a definition
		M1: for students enrolled in non-academic or “vocational study”. Mechanical, language, cooking classes, etc...
		WB: Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa.
		WT: Waiver Program (WT/WB Status) travel to the United States for tourism or business for stays of 90 days or less without obtaining a visa

## Step 5: Complete Project Write Up

##### Taking into account the size of our dataset (3 million rows) for just one month, linking airport data, city temperatures, demographics, the best suited technology is spark, and even more so if it is it has to process data with a longer period than the one analyzed.

##### At the beginning of this project we wanted to analyze the following

- The impact of temperature on the number of travelers.
- The seasonality of travel.
- Links between travel volume and airports.
- Links between the number of trips and the demographics of various cities in the United States.

##### Since the data is constant, a monthly, bimonthly or quarterly update would be sufficient for the requirements of the study carried out.

#### Alternate requirement scenarios:

##### How would our project change if I have the following scenarios?

##### The data was increased by 100x:

###### Our data would be saved in the Amazon S3 repository (instead of using an EMR cluster, along with the tables that involve the ETL process), likewise our processing tables would be saved.
###### Since our data set is large, we would continue to use Spark, as it is the most suitable tool for processing large data sets.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day:

###### For these projects we would use Apache Airflow to process the ETL and verify the data quality of our project.

##### The database needed to be accessed by 100+ people:

###### After processing all the data, and it is ready to be consumed, it would be saved in a postgres database on a redshift cluster that easily supports multiuser access.