# Immigration and temperature Data Warehouse
### Data Engineering Capstone Project

#### Project Summary
The objective of this project is to create a Data Warehouse that allows data analysts to perform analysis and reports on data from different sources.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 

This project will integrate data from different sources so that data analysts can consume and extract value. The data will end up in a local storage. In future improvements, this data could be allocated to tools such as Amazon Redshift and Postgres.

The data will be consumed from datasets made available on the workstation (locally). They will be transformed using tools like pandas and Spark and then on local again.

The final schema of this data will be optimized so that data analysis can be performed.

It is part of the scope of this project to define and implement the ETL (extract, transform and load) that will make the data available in a schema for data analysis. Two data quality checks will be implemented.

#### Describe and Gather Data 

4 data sources will be used:

1. [Immigration Data: data from US National Tourism and Trade Office I94](https://www.trade.gov/national-travel-and-tourism-office)

Data about immigration with informations such as departure date, arrival date, immigration mode (air, sea, land) and gender.


2. [World Temperature Data](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data)

Average_temperature measurement on different cities on time.

3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

Demographic information about US cities.

4. [Airport Code Table](https://datahub.io/core/airport-codes#data)

Aiport informations with informations such as name, location and type.

### Step 2: Explore and Assess the Data
#### Explore the Data 

In this section, the available datasets will be explored. Some consistency checks will be carried out that will provide information for the construction of the ETL.

##### Exploring immigration dataset

In [45]:
import pandas as pd
immigration_df = pd.read_csv("immigration_data_sample.csv")
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [46]:
immigration_df.describe()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,dtadfile,entdepu,biryear,insnum,admnum
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,951.0,1000.0,1000.0,1000.0,1000.0,0.0,1000.0,35.0,1000.0
mean,1542097.0,3040461.0,2016.0,4.0,302.928,298.262,20559.68,1.078,20575.037855,42.382,1.859,1.0,20160420.0,,1973.618,3826.857143,69372370000.0
std,915287.9,1799818.0,0.0,0.0,206.485285,202.12039,8.995027,0.485955,24.211234,17.903424,0.386353,0.0,49.51657,,17.903424,221.742583,23381340000.0
min,10925.0,13208.0,2016.0,4.0,103.0,103.0,20545.0,1.0,20547.0,1.0,1.0,1.0,20160400.0,,1923.0,3468.0,0.0
25%,721442.2,1412170.0,2016.0,4.0,135.0,131.0,20552.0,1.0,20561.0,30.75,2.0,1.0,20160410.0,,1961.0,3668.0,55993010000.0
50%,1494568.0,2941176.0,2016.0,4.0,213.0,213.0,20560.0,1.0,20570.0,42.0,2.0,1.0,20160420.0,,1974.0,3887.0,59314770000.0
75%,2360901.0,4694151.0,2016.0,4.0,438.0,438.0,20567.25,1.0,20580.0,55.0,2.0,1.0,20160420.0,,1985.25,3943.0,93436230000.0
max,3095749.0,6061994.0,2016.0,4.0,746.0,696.0,20574.0,9.0,20715.0,93.0,3.0,1.0,20160800.0,,2015.0,4686.0,95021510000.0


In [47]:
immigration_df.isna().sum()

Unnamed: 0       0
cicid            0
i94yr            0
i94mon           0
i94cit           0
i94res           0
i94port          0
arrdate          0
i94mode          0
i94addr         59
depdate         49
i94bir           0
i94visa          0
count            0
dtadfile         0
visapost       618
occup          996
entdepa          0
entdepd         46
entdepu       1000
matflag         46
biryear          0
dtaddto          0
gender         141
insnum         965
airline         33
admnum           0
fltno            8
visatype         0
dtype: int64

In [48]:
any(immigration_df.duplicated())

False

##### Exploring World Temperature Dataset

In [49]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


As the other data sources only include data from the United States, we will exclude temperature data from other countries.

In [50]:
df_temperature = df_temperature.loc[df_temperature["Country"] == "United States"]
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [51]:
df_temperature.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,661524.0,661524.0
mean,13.949335,1.08955
std,9.173337,1.15068
min,-25.163,0.04
25%,7.787,0.3
50%,14.922,0.524
75%,21.081,1.646
max,34.379,10.519


In [52]:
df_temperature.isna().sum()

dt                                   0
AverageTemperature               25765
AverageTemperatureUncertainty    25765
City                                 0
Country                              0
Latitude                             0
Longitude                            0
dtype: int64

We notice that there are some lines with invalid temperature values. We will remove them too.

In [53]:
df_temperature = df_temperature.loc[~df_temperature["AverageTemperature"].isna()]
df_temperature.isna().sum()

dt                               0
AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
Country                          0
Latitude                         0
Longitude                        0
dtype: int64

In [54]:
any(df_temperature.duplicated())

False

##### Exploring U.S. City Demographic Data

In [55]:
demographics_df = pd.read_csv("us-cities-demographics.csv", sep=";")
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [56]:
demographics_df.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [57]:
demographics_df.isna().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [58]:
any(demographics_df.duplicated())

False

##### Exploring Airport Code Table

In [59]:
airport_df = pd.read_csv("airport-codes_csv.csv")
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [60]:
print(len(airport_df))

55075


In [61]:
print(airport_df.groupby("type").count()["name"])

type
balloonport          24
closed             3606
heliport          11287
large_airport       627
medium_airport     4550
seaplane_base      1016
small_airport     33965
Name: name, dtype: int64


In [62]:
airport_df.isna().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [63]:
any(airport_df.duplicated())

False

#### Cleaning Steps

After an initial exploration of the data, the following steps were defined for cleaning:

1. Remove nan temperatures
2. Remove temperature data from countries other than the United States
3. Remove rows with nan depdate on immigration dataset
4. Transform date columns on immigration dataset Transform columns to same pattern as temperature dataset
5. Standardize city and state data across all datasets

##### 1. Remove nan temperatures

In [64]:
df_temperature = df_temperature.loc[~df_temperature["AverageTemperature"].isna()]

##### 2. Remove temperature data from countries other than the United States

In [65]:
df_temperature = df_temperature.loc[df_temperature["Country"] == "United States"]

##### 3. Remove rows with nan depdate on immigration dataset

In [66]:
immigration_df = immigration_df.loc[~immigration_df["depdate"].isna()]

##### 4. Transform date columns on immigration dataset Transform columns to same pattern as temperature dataset

In [67]:
immigration_df["arrdate"] = pd.Timestamp('1960-1-1') + pd.to_timedelta(immigration_df["arrdate"], unit='D')
immigration_df["depdate"] = pd.Timestamp('1960-1-1') + pd.to_timedelta(immigration_df["depdate"], unit='D')
immigration_df[["arrdate", "depdate"]].head()

Unnamed: 0,arrdate,depdate
0,2016-04-22,2016-04-29
1,2016-04-23,2016-04-24
2,2016-04-07,2016-04-27
3,2016-04-28,2016-05-07
4,2016-04-06,2016-04-09


##### 5. Standardize city and state data across all datasets

City and state will be lowercase and separate columns in temperature, immigration, demographic and airport data.

In [68]:
df_temperature["city"] = df_temperature["City"].str.lower()
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,city
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,abilene
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,abilene
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,abilene
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,abilene
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,abilene


In [69]:
demographics_df["city"] = demographics_df["City"].str.lower()
demographics_df["state"] = demographics_df["State"].str.lower()
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count,city,state
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924,silver spring,maryland
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723,quincy,massachusetts
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759,hoover,alabama
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437,rancho cucamonga,california
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402,newark,new jersey


In [70]:
airport_df["city"] = airport_df["municipality"].str.lower()
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,city
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",bensalem
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",leoti
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",anchor point
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",harvest
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087",newport


To apply the same transformation to the immigration data we will have to read and parse the file *I94_SAS_Labels_Descriptions.SAS*

In [71]:
# imigration_labels = pd.read_sas("I94_SAS_Labels_Descriptions.SAS")
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    lines = f.readlines()
# print(lines)

# COUNTRY

star_line_idx = lines.index("  value i94cntyl\n") + 1
end_line_idx = lines.index("   996 =  'No Country Code (996)' ;\n") + 1

formated_lines = []
for line in lines[star_line_idx:end_line_idx]:
    line = line.replace(" ","").replace("\n", "").replace("'", "")
    line = line.split("=")
    line = [int(line[0]), line[1].lower()]
    formated_lines.append(line)
country_codes = pd.DataFrame(formated_lines, columns=["country_code", "country"])
display(country_codes.head())
        
# STATE
        
star_line_idx = lines.index("\t'AL'='ALABAMA'\n")
end_line_idx = lines.index("\t'99'='All Other Codes' ;\n") + 1

formated_lines = []
for line in lines[star_line_idx:end_line_idx]:
    line = line.replace(" ","").replace("\n", "").replace("'", "").replace("\t", "")
    line = line.split("=")
    line = [line[0], line[1].lower()]
    formated_lines.append(line)
state_codes = pd.DataFrame(formated_lines, columns=["state_code", "state"])
display(state_codes.head())

# City
   
star_line_idx = lines.index("   'ALC'\t=\t'ALCAN, AK             '\n")
end_line_idx = lines.index("   'OSN' \t=\t'No PORT Code (OSN)'\n") + 1

formated_lines = []
for line in lines[star_line_idx:end_line_idx]:
    line = line.replace(" ","").replace("\n", "").replace("'", "").replace("\t", "")
    line = line.split("=")
    city, state_code = line[1].split(",") if len(line[1].split(",")) == 2 else [line[1], ""]
    line = [line[0], city.lower(), state_code]
    formated_lines.append(line)
city_codes = pd.DataFrame(formated_lines, columns=["city_code", "city", "state_code"])
display(city_codes.head())

city_state_info = city_codes.merge(state_codes, left_on="state_code", right_on="state_code", how="left")[[
    "city_code", "city", "state"
]]

Unnamed: 0,country_code,country
0,582,"mexicoairsea,andnotreported(i-94,nolandarrivals)"
1,236,afghanistan
2,101,albania
3,316,algeria
4,102,andorra


Unnamed: 0,state_code,state
0,AL,alabama
1,AK,alaska
2,AZ,arizona
3,AR,arkansas
4,CA,california


Unnamed: 0,city_code,city,state_code
0,ALC,alcan,AK
1,ANC,anchorage,AK
2,BAR,bakeraaf-bakerisland,AK
3,DAC,daltonscache,AK
4,PIZ,dewstationptlaydew,AK


Now let's replace the codes with the names on immigration df

In [72]:
immigration_df = immigration_df.merge(city_state_info, left_on="i94port", right_on="city_code", how="left")
immigration_df = immigration_df.merge(country_codes, left_on="i94cit", right_on="country_code", how="left")
immigration_df = immigration_df.merge(country_codes, left_on="i94res", right_on="country_code", how="left")
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,admnum,fltno,visatype,city_code,city,state,country_code_x,country_x,country_code_y,country_y
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,...,56582670000.0,00782,WT,HHW,honolulu,hawaii,209.0,japan,209,japan
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,...,94362000000.0,XBLNG,B2,MCA,mcallen,texas,582.0,"mexicoairsea,andnotreported(i-94,nolandarrivals)",582,"mexicoairsea,andnotreported(i-94,nolandarrivals)"
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,...,55780470000.0,00464,WT,OGG,kahului-maui,hawaii,,,112,germany
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,...,94789700000.0,00739,B2,LOS,losangeles,california,297.0,qatar,297,qatar
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,...,42322570000.0,LAND,WT,CHM,champlain,newyork,111.0,france,111,france


In [73]:
immigration_df.rename(columns={'country_x':'citzen_country', 'country_y': 'residence_country'}, inplace=True)
immigration_df = immigration_df.drop(columns=["country_code_x", "country_code_y", "city_code"])
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,gender,insnum,airline,admnum,fltno,visatype,city,state,citzen_country,residence_country
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,...,F,,JL,56582670000.0,00782,WT,honolulu,hawaii,japan,japan
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,...,M,,*GA,94362000000.0,XBLNG,B2,mcallen,texas,"mexicoairsea,andnotreported(i-94,nolandarrivals)","mexicoairsea,andnotreported(i-94,nolandarrivals)"
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,...,M,,LH,55780470000.0,00464,WT,kahului-maui,hawaii,,germany
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,...,M,,QR,94789700000.0,00739,B2,losangeles,california,qatar,qatar
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,...,F,,,42322570000.0,LAND,WT,champlain,newyork,france,france


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The Star Schema data model will be adopted so that data analysts can perform analysis easily and efficiently.
Two independent fact tables will be created:

* temperature_fact
* immigration_fact

Each fact table will have a relationship with the following dimensions tables:

* city_dim
* country_dim
* date_dim (by day)

In addition, the city dimension table will be related to the airports dimension table.


#### 3.2 Mapping Out Data Pipelines
1. Clean all datasets
2. Build cities dimension table
3. Build date_dim table
4. country_dim
5. Create airport_dim (mapping city_id to city_dim)
6. Create the temperature fact table mapping the corresponding dimensions
7. Create the immigration fact table mapping the corresponding dimensions

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### Loading datasets

In [74]:
# Loading datasets on a spark cluster

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [75]:
load_all_data = False

if load_all_data:
    immigration_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/*.sas7bdat')
else:
    immigration_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)
demographics_df = pd.read_csv("us-cities-demographics.csv", sep=";")
airport_df = pd.read_csv("airport-codes_csv.csv")

#### 1.Clean all datasets

In [76]:
import pyspark.sql.functions as F
from pyspark.sql.types import DateType

# Cleaning data

df_temperature = df_temperature.loc[~df_temperature["AverageTemperature"].isna()]
df_temperature = df_temperature.loc[df_temperature["Country"] == "United States"]

immigration_df = immigration_df.na.drop(subset=["depdate"])


# lowercase city, state and country
df_temperature["city"] = df_temperature["City"].str.lower()
demographics_df["city"] = demographics_df["City"].str.lower()
demographics_df["state"] = demographics_df["State"].str.lower()
airport_df["city"] = airport_df["municipality"].str.lower()


# Converting dates on immigration_df
def convert_dates_immigration(date):
    return pd.Timestamp('1960-1-1') + pd.to_timedelta(date, unit='D')
convert_dates_immigration_udf = F.udf(convert_dates_immigration, DateType())

immigration_df = immigration_df.withColumn('arrdate', convert_dates_immigration_udf(F.col('arrdate')))
immigration_df = immigration_df.withColumn('depdate', convert_dates_immigration_udf(F.col('depdate')))

with open("I94_SAS_Labels_Descriptions.SAS") as f:
    lines = f.readlines()

# COUNTRY

star_line_idx = lines.index("  value i94cntyl\n") + 1
end_line_idx = lines.index("   996 =  'No Country Code (996)' ;\n") + 1

formated_lines = []
for line in lines[star_line_idx:end_line_idx]:
    line = line.replace(" ","").replace("\n", "").replace("'", "")
    line = line.split("=")
    line = [int(line[0]), line[1].lower()]
    formated_lines.append(line)
country_codes = pd.DataFrame(formated_lines, columns=["country_code", "country"])
        
# STATE
        
star_line_idx = lines.index("\t'AL'='ALABAMA'\n")
end_line_idx = lines.index("\t'99'='All Other Codes' ;\n") + 1

formated_lines = []
for line in lines[star_line_idx:end_line_idx]:
    line = line.replace(" ","").replace("\n", "").replace("'", "").replace("\t", "")
    line = line.split("=")
    line = [line[0], line[1].lower()]
    formated_lines.append(line)
state_codes = pd.DataFrame(formated_lines, columns=["state_code", "state"])

# City
   
star_line_idx = lines.index("   'ALC'\t=\t'ALCAN, AK             '\n")
end_line_idx = lines.index("   'OSN' \t=\t'No PORT Code (OSN)'\n") + 1

formated_lines = []
for line in lines[star_line_idx:end_line_idx]:
    line = line.replace(" ","").replace("\n", "").replace("'", "").replace("\t", "")
    line = line.split("=")
    city, state_code = line[1].split(",") if len(line[1].split(",")) == 2 else [line[1], ""]
    line = [line[0], city.lower(), state_code]
    formated_lines.append(line)
city_codes = pd.DataFrame(formated_lines, columns=["city_code", "city", "state_code"])

city_state_info = city_codes.merge(state_codes, left_on="state_code", right_on="state_code", how="left")[[
    "city_code", "city", "state"
]]

# Add city name
def add_city_name(city_code):
    return city_state_info.loc[city_state_info["city_code"] == city_code]["city"].iloc[0]


add_city_name_udf = F.udf(add_city_name)
immigration_df = immigration_df.withColumn('city_name', add_city_name_udf(F.col('i94port')))

# Add state name
def add_state_name(city_code):
    return city_state_info.loc[city_state_info["city_code"] == city_code]["state"].iloc[0]


add_state_name_udf = F.udf(add_state_name)
immigration_df = immigration_df.withColumn('state_name', add_state_name_udf(F.col('i94port')))


# add 'citzen_country' and 'residence_country'
def add_country_name(country_code):
    return country_codes.loc[country_codes["country_code"] == country_code]["country"].iloc[0]
add_country_name_udf = F.udf(add_country_name)
immigration_df = immigration_df.withColumn('citzen_country', add_country_name_udf(F.col('i94cit')))
immigration_df = immigration_df.withColumn('residence_country', add_country_name_udf(F.col('i94res')))

In [77]:
display(immigration_df.head())
display(df_temperature.head())
display(airport_df.head())
display(demographics_df.head())

Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=datetime.date(2016, 4, 1), i94mode=1.0, i94addr='MI', depdate=datetime.date(2016, 8, 25), i94bir=55.0, i94visa=2.0, count=1.0, dtadfile='20160401', visapost=None, occup=None, entdepa='T', entdepd='O', entdepu=None, matflag='M', biryear=1961.0, dtaddto='09302016', gender='M', insnum=None, airline='OS', admnum=666643185.0, fltno='93', visatype='B2', city_name='washingtondc', state_name='NaN', citzen_country='albania', residence_country='albania')

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,city
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,abilene
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,abilene
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,abilene
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,abilene
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,abilene


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,city
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",bensalem
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",leoti
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",anchor point
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",harvest
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087",newport


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count,city,state
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924,silver spring,maryland
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723,quincy,massachusetts
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759,hoover,alabama
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437,rancho cucamonga,california
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402,newark,new jersey


#### 2. Build city dimension table

In [78]:
city_dim = city_codes.merge(demographics_df, left_on="city", right_on="city")
city_dim = city_dim[["city_code", "city", "state", "Median Age", "Male Population", "Female Population", "Foreign-born", "Average Household Size", "Race"]]
columns = ["city_code", "city", "state", "median_age", "male_population", "female_population", "foreing_born", "average_household_size", "race"]
display(city_dim.head())

Unnamed: 0,city_code,city,state,Median Age,Male Population,Female Population,Foreign-born,Average Household Size,Race
0,ANC,anchorage,alaska,32.2,152945.0,145750.0,33258.0,2.77,Hispanic or Latino
1,ANC,anchorage,alaska,32.2,152945.0,145750.0,33258.0,2.77,White
2,ANC,anchorage,alaska,32.2,152945.0,145750.0,33258.0,2.77,American Indian and Alaska Native
3,ANC,anchorage,alaska,32.2,152945.0,145750.0,33258.0,2.77,Black or African-American
4,ANC,anchorage,alaska,32.2,152945.0,145750.0,33258.0,2.77,Asian


#### 3. Build date_dim table

In [79]:
from datetime import datetime, timedelta
min_year = int(min(df_temperature["dt"])[:4])
min_date = datetime(min_year,1,1)
max_date = datetime(2017,1,1)
delta = int((max_date - min_date).days)
date_list = [min_date + timedelta(days=x) for x in range(delta)]

date_dim = pd.DataFrame({"date_id": date_list})
date_dim["year"] = pd.DatetimeIndex(date_dim["date_id"]).year
date_dim["month"] = pd.DatetimeIndex(date_dim["date_id"]).month
date_dim["weekday"] = pd.DatetimeIndex(date_dim["date_id"]).weekday
date_dim.head()

Unnamed: 0,date_id,year,month,weekday
0,1743-01-01,1743,1,1
1,1743-01-02,1743,1,2
2,1743-01-03,1743,1,3
3,1743-01-04,1743,1,4
4,1743-01-05,1743,1,5


#### 4. Build country_dim table

In [80]:
country_dim = country_codes.copy()
country_dim.rename(columns={"country": "country_name"})
country_dim.head()

Unnamed: 0,country_code,country
0,582,"mexicoairsea,andnotreported(i-94,nolandarrivals)"
1,236,afghanistan
2,101,albania
3,316,algeria
4,102,andorra


#### 5. Create airport_dim (mapping city_id to city_dim)

In [81]:
airport_df = airport_df.merge(city_dim, left_on="city", right_on="city")
airport_dim = airport_df[["city_code", "name", "type"]]
airport_dim["airport_id"] = airport_df.index
display(airport_dim.head())

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,city_code,name,type,airport_id
0,GAR,Homan Field,small_airport,0
1,GAR,Homan Field,small_airport,1
2,GAR,Homan Field,small_airport,2
3,GAR,Homan Field,small_airport,3
4,GAR,Homan Field,small_airport,4


#### 6. Create the temperature fact table mapping the corresponding dimensions

In [82]:
df_temperature = df_temperature.merge(city_dim, left_on="city", right_on="city")

In [83]:
temperature_fact = df_temperature[["dt", "city_code", "AverageTemperature", "AverageTemperatureUncertainty"]]
columns = ["date_id", "city_code", "average_temperature", "average_temperature_uncertainty"]
temperature_fact.columns = columns
temperature_fact.head()

Unnamed: 0,date_id,city_code,average_temperature,average_temperature_uncertainty
0,1743-11-01,AKR,3.209,1.961
1,1743-11-01,AKR,3.209,1.961
2,1743-11-01,AKR,3.209,1.961
3,1743-11-01,AKR,3.209,1.961
4,1743-11-01,AKR,3.209,1.961


7. Create the immigration fact table mapping the corresponding dimensions

In [84]:
immigration_fact = immigration_df.select(
    "depdate", "arrdate", "i94port", "i94cit", "i94res", "i94mode", "i94bir", "gender", "i94visa", "visapost", "visatype", "airline"
).withColumn("immigration_id", F.monotonically_increasing_id())

# rename columns
new_columns = [
    "departure_date_id", 
    "arrival_date_id", 
    "city_id", 
    "citzen_country_id", 
    "residence_country_id", 
    "mode",
    "age",
    "gender",
    "visa",
    "visapost",
    "visatype",
    "airline"
]
for current_column, new_column in zip(immigration_fact.columns, new_columns):
    immigration_fact = immigration_fact.withColumnRenamed(current_column, new_column)

##### Export files

In [85]:
immigration_fact.write.mode("overwrite").parquet("output_files/immigration_fact")

In [86]:
temperature_fact.to_csv("output_files/temperature_fact.csv")
airport_dim.to_csv("output_files/airport_dim.csv")
country_dim.to_csv("output_files/country_dim.csv")
date_dim.to_csv("output_files/date_dim.csv")
city_dim.to_csv("output_files/city_dim.csv")

#### 4.2 Data Quality Checks

We will set two data quality checks:

1. Check if there is any empty dimension table
2. Check for nan on temperature fact table

In [87]:
# Check if there is any empty dimension table

test_result = "There is no dimension table empty" if not any(
    [airport_dim.empty,
    date_dim.empty,
    country_dim.empty,
    city_dim.empty],
) else "There is some dimension table empty!"
print(test_result)

There is no dimension table empty


In [88]:
nan_temperature_fact = temperature_fact.loc[
    (temperature_fact["average_temperature"].isna())
    | (temperature_fact["average_temperature_uncertainty"].isna())
]
test_result = "No nan values on temperature_fact" if nan_temperature_fact.empty else "Nan values on temperature_fact!"
print(test_result)

No nan values on temperature_fact


#### 4.3 Data dictionary 

##### city_dim

Contains city data.

* city_code: city id
* city_name: city name
* state_name: state name
* median_age: median age in the city
* male_population: male population in the city
* female_population: female population in the city
* foreing_born: foreing borns in the city
* average_household_size: average houshould size in the city
* race: common races in the city

##### country_dim

Contains the coutry name

* country_code: country id
* country_name: country name

##### date_dim

Contains date dimensions

* date_id: date id (YYYY-MM-DD)
* year
* month
* weekday

##### airport_dim

Contains airport information

* airport_id
* city_code
* name: airport name
* type: airport type

##### temperature_fact

Contains temperature facts

* date_id
* city_code
* average_temperature: average temperature on the date
* average_temperature_uncertainty: average uncertainty temperature on the date

##### immigration_fact

Contains immigration info

* departure_date_id: departure date id
* arrival_date_id: arrival date id
* city_id: city id
* citzen_country_id: citzen country id
* residence_country_id: residence country id
* mode: immigration mode (air, sea, land, not reported)
* age: age of the immigrant
* gender: gender of the immigrant
* visa: immigrant visa
* visapost: immigrant visa post
* visatype: immigrant visa type
* airline: airline used on immigration

#### Step 5: Complete Project Write Up
* Spark was chosen for processing in large datasets (immigration data). Pandas was chosed for small data compute. Input and output files was stored locally. In a real application this data could be saved in the cloud (like S3). With data on S3, it's much easier to feed datawarehouses on different technologies (like Redshift). ETL itself could also export the data straight to Redshift.
* Ideally, this data should be updated annually. Mainly immigration data which can vary significantly in the period of 1 year. However, not all data sources used provide an annual update. In this context, the data should be updated as new data becomes available or until reaching 1 year. Data analysts can contribute to this decision as well;

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x:
     * All data processing should be on a spark cluster. The deploy of the cluster can be made on cloud tools like Amazon EMR.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * A ETLs ochestrator (like airflow) must be implemented. Airflow will manage the pipelines and provide details about ETL runs (DAG runs).
 * The database needed to be accessed by 100+ people.
     * The database should be deployed on a database that support this number of connectios. We could use Amazon Redshift (self managed) or run postgres on some mananged VMs for example.