# Capstone Project

## Purpose/Scope
This project will be to import, clean, and link supplied data sets from Udacity for further examination by Data Scientists for analytical information regarding immigration as it relates to city demographic data with additional data on the airport/port of arrival. Spark will be the analysis tool of choice due to it's ability to handle Big Data (many millions+) of rows well, especially if deployed to a cluster environment.

## Data

Datasets were provided by Udacity from other open source repositories. 
* Airport Code Table - Table of Airport Codes and the corresponding city & basic city metadata - Source: [Our Airports](http://ourairports.com/data/) via [DataHub](https://datahub.io/core/airport-codes#data)
* World Temperature Data - Data back to the 1700's for many world cities. - Source: [Berkeley Earth](http://berkeleyearth.org/) via [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
* US City Demographic Data - Demographics/Population data on most US Cities and CDP's from [US Census Bureau](https://www.census.gov/data/developers/about/terms-of-service.html) via [OpenDataSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/table/?dataChart=eyJxdWVyaWVzIjpbeyJjb25maWciOnsiZGF0YXNldCI6InVzLWNpdGllcy1kZW1vZ3JhcGhpY3MiLCJvcHRpb25zIjp7fX0sImNoYXJ0cyI6W3siYWxpZ25Nb250aCI6dHJ1ZSwidHlwZSI6ImNvbHVtbiIsImZ1bmMiOiJBVkciLCJ5QXhpcyI6Im1lZGlhbl9hZ2UiLCJzY2llbnRpZmljRGlzcGxheSI6dHJ1ZSwiY29sb3IiOiIjRkY1MTVBIn1dLCJ4QXhpcyI6ImNpdHkiLCJtYXhwb2ludHMiOjUwLCJzb3J0IjoiIn1dLCJ0aW1lc2NhbGUiOiIiLCJkaXNwbGF5TGVnZW5kIjp0cnVlLCJhbGlnbk1vbnRoIjp0cnVlfQ%3D%3D)
* I94 Immigration Data - Data on Immigration/Visitation in April 2016 from the Department of Commerce. Includes some demographic data on visitor. Original data set no longer available online, but provided by Udacity as an archive in Parquet format. - Current program site source: [I91 Arrivals Program](https://www.trade.gov/i-94-arrivals-program)


## Setup
Import requirements

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DecimalType, TimestampType
from datetime import date, datetime, timedelta

Set output location / AWS Credentials (alternatively, set in a config file). For the purposes of testing and cost savings, files are loaded in current state to a my_local_data folder, but changing the comments will result in an output to an Amazon S3 bucket.

In [3]:
output_location = 'my_local_data/'
#output_location = 's3a://AWSBUCKETNAME/AWSKEYNAME'
#os.environ["AWS_ACCESS_KEY_ID"]= 'YOURKEY'
#os.environ["AWS_SECRET_ACCESS_KEY"] = 'YOURSECRET'

Build initial spark session

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

Setup Date Function for Converting SAS Date formats to Regular Date format 
(SAS days are # of days since beginning of 1960)

In [5]:
@udf(TimestampType())
def sas_date (mydate):
    if mydate is None:
        z = None
    else: 
        z = datetime(1960,1,1) + timedelta(days=int(mydate))
    return z

## Read In Data to Spark

Import i94 data and preview

In [6]:
i94data =spark.read.parquet("./sas_data")

In [7]:
i94data_revised = i94data.withColumn("departure_date",sas_date(col("depdate")))
i94data_revised = i94data_revised.withColumn("arrival_date",sas_date(col("arrdate")))


In [8]:
i94data_revised.limit(5).toPandas().head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,departure_date,arrival_date
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,1976.0,10292016,F,,QF,94953870000.0,11,B1,2016-05-08,2016-04-30
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,1984.0,10292016,F,,VA,94955620000.0,7,B1,2016-05-17,2016-04-30
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,1987.0,10292016,M,,DL,94956410000.0,40,B1,2016-05-08,2016-04-30
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,1987.0,10292016,F,,DL,94956450000.0,40,B1,2016-05-14,2016-04-30
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,1988.0,10292016,M,,DL,94956390000.0,40,B1,2016-05-14,2016-04-30


Import City Demographics, filter out race information creating duplicate records (not needed for current scope) and preview.

In [9]:
us_cities=spark.read.csv("./us-cities-demographics.csv", sep=';', header=True)

In [10]:
us_cities = us_cities.select('City',
 'State',
 'Median Age',
 'Male Population',
 'Female Population',
 'Total Population',
 'Number of Veterans',
 'Foreign-born',
 'Average Household Size',
 'State Code').dropDuplicates().orderBy("state","city")

In [11]:
us_cities.limit(5).toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code
0,Birmingham,Alabama,35.6,102122,112789,214911,13212,8258,2.21,AL
1,Dothan,Alabama,38.9,32172,35364,67536,6334,1699,2.59,AL
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL
3,Huntsville,Alabama,38.1,91764,97350,189114,16637,12691,2.18,AL
4,Mobile,Alabama,38.0,91275,103030,194305,11939,7234,2.4,AL


Import Airport data, fix state column (from US-AL to AL) and preview.

In [12]:
us_airports=spark.read.csv("./airport-codes_csv.csv", sep=',', header=True)

In [13]:
us_airports = us_airports.withColumn('state', regexp_replace("iso_region","US-","")).drop('iso_region')

In [14]:
us_airports.limit(5).toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,municipality,gps_code,iata_code,local_code,coordinates,state
0,00A,heliport,Total Rf Heliport,11,,US,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",PA
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS
2,00AK,small_airport,Lowell Field,450,,US,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",AK
3,00AL,small_airport,Epps Airpark,820,,US,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",AL
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,Newport,,,,"-91.254898, 35.6087",AR


Import SAS Data Label file. 

The source of this function is Udacity Mentor  Anshul R on mentor help site https://knowledge.udacity.com/questions/125439 for Udacity student use.
Decoding this file's irregular format was not in scope of the course, so I used the code provided with minimal modifications to bring the data into a proper Spark DF.

Preview 5 new dataframes.

In [15]:


schema = StructType([ \
    StructField("ID",StringType(),True), \
    StructField("Name",StringType(),True), \
  ])

with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([str(i[0].strip()), i[1].strip()] for i in dic if len(i) == 2)
    return dic
i94cit_res = spark.createDataFrame(code_mapper(f_content, "i94cntyl").items(), schema)
i94port = spark.createDataFrame(code_mapper(f_content, "i94prtl").items(), schema)
i94mode = spark.createDataFrame(code_mapper(f_content, "i94model").items(), schema)
i94addr = spark.createDataFrame(code_mapper(f_content, "i94addrl").items(), schema)
i94mode = spark.createDataFrame(code_mapper(f_content, "i94model").items(), schema)
i94visa = spark.createDataFrame(list(({'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}).items()), schema)

Because the i94 table does not follow IATA naming, we will utilize the Port Table (derived from the SAS Label file above) to parse City and State. This can then be mapped with semi-resonable accuracy to the airport table if also filtering on the mode table to be air travel only.

In [16]:
i94port_revised = i94port.withColumn('City',split(i94port.Name,', ').getItem(0)).\
                                    withColumn('State',split(i94port.Name,', ').getItem(1)).\
                                    drop("Name")

In [17]:
i94cit_res.limit(5).toPandas().head()

Unnamed: 0,ID,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [18]:
i94mode.limit(5).toPandas().head()

Unnamed: 0,ID,Name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [19]:
i94addr.limit(5).toPandas().head()

Unnamed: 0,ID,Name
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [20]:
i94visa.limit(5).toPandas().head()

Unnamed: 0,ID,Name
0,1,Business
1,2,Pleasure
2,3,Student


In [21]:
i94mode.limit(5).toPandas().head()

Unnamed: 0,ID,Name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


## Data Model

### Concept

The model is made up by the previous defined dataframes:

|Name|Data Type|Description|Assumptions|
|---|----|---|----|
|i94data|Fact Data|Travel/Immigration Data from USNTTO|cicid is a unique identifer per person|
|i94visa|Dimension Data|Visa Type|

|i94port|Dimension Data|Port of Entry|
|i94mode|Dimension Data|Type of Travel Entry|
|i94cit_res|Dimension Data|Origin/Country of Residence|
|us_airports|Dimension Data|Listing of US Airpoprts|
|us_cities|Dimension Data|Listing of US Cities|

<!--|us_temp|Fact Data|Temperature by Month by City|-->
<!--|i94addr|Dimension Data|State/Location of Destination|-->

Data will be loaded from these staging df's into a proper star schema. Based on the data set having only one two updating table (immigration and less frequently updating, person) The primary fact table will be made up of actual immigrations, with dimensional tables for airport, time, country, mode of transportation, i94 port names, and visa type.

<img src="dbSchema.png" size = '30%'>
<i><div style="text-align: right">  Graphic built with https://dbdiagram.io/d </div></i>

In a traditional ETL, we would perhaps be joining new data on some regular basis, requiring modification of the below ETL process ti import only new data. As it stands, this is a static data set, so a single insert is implied.


### Define Tables

In [22]:
person_table = i94data_revised.selectExpr("cicid as personID", "i94res as country", \
                                  "i94bir as age", "gender","admnum as admissionId", "occup as occupation")

In [23]:
time_table = i94data_revised.selectExpr("arrival_date as arrivalDate", "year(arrival_date) as year", \
                                        "month(arrival_date) as month", "day(arrival_date) day", \
                                        "hour(arrival_date) as hour", "weekofyear(arrival_date) week", \
                                        "dayofweek(arrival_date) weekDay").dropDuplicates()

In [24]:
country_table = i94cit_res.selectExpr("ID as countryID", "Name")

In [25]:
airport_table = us_airports.filter(us_airports.type != "small_airport").\
                        filter(us_airports.type != "heliport").\
                        filter(us_airports.type != "closed").\
                        filter(us_airports.type != "balloonport").\
                        filter(us_airports.type != "seaplane_base").\
                        filter(us_airports.iso_country == "US").\
                        selectExpr("ident as airportID", "name as name", \
                                   "municipality as city", "state", "iata_code as iataCode")

In [26]:
visa_table = i94visa.selectExpr("ID as visaID", "Name")

In [27]:
port_table = i94port_revised.selectExpr("ID as portID", "City", "State")

In [28]:
mode_table = i94mode.selectExpr("ID as modeID", "Name")

In [29]:
immigration_fact_table = i94data_revised.selectExpr("cicid as personID", "departure_date", "arrival_date", \
                           "i94visa as visaTypeID", "i94port as portID", \
                           "airline as airlineName", "fltno as flightNumber", "admnum as admissionNumber",
                           "i94mode as modeID")

### Check Tables

In [30]:
visa_table.limit(5).toPandas().head()

Unnamed: 0,visaID,Name
0,1,Business
1,2,Pleasure
2,3,Student


In [31]:
mode_table.limit(5).toPandas().head()

Unnamed: 0,modeID,Name
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [32]:
airport_table.limit(5).toPandas().head()

Unnamed: 0,airportID,name,city,state,iataCode
0,5A8,Aleknagik / New Airport,Aleknagik,AK,WKK
1,K79J,South Alabama Regional At Bill Benton Field Ai...,Andalusia/Opp,AL,
2,KABE,Lehigh Valley International Airport,Allentown,PA,ABE
3,KABI,Abilene Regional Airport,Abilene,TX,ABI
4,KABQ,Albuquerque International Sunport,Albuquerque,NM,ABQ


In [33]:
country_table.limit(5).toPandas().head()

Unnamed: 0,countryID,Name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [34]:
time_table.limit(5).toPandas().head()

Unnamed: 0,arrivalDate,year,month,day,hour,week,weekDay
0,2016-04-07,2016,4,7,0,14,5
1,2016-04-05,2016,4,5,0,14,3
2,2016-04-20,2016,4,20,0,16,4
3,2016-04-27,2016,4,27,0,17,4
4,2016-04-25,2016,4,25,0,17,2


In [35]:
person_table.limit(5).toPandas().head()

Unnamed: 0,personID,country,age,gender,admissionId,occupation
0,5748517.0,438.0,40.0,F,94953870000.0,
1,5748518.0,438.0,32.0,F,94955620000.0,
2,5748519.0,438.0,29.0,M,94956410000.0,
3,5748520.0,438.0,29.0,F,94956450000.0,
4,5748521.0,438.0,28.0,M,94956390000.0,


In [36]:
port_table.limit(5).toPandas().head()

Unnamed: 0,portID,City,State
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [37]:
immigration_fact_table.limit(5).toPandas().head()

Unnamed: 0,personID,departure_date,arrival_date,visaTypeID,portID,airlineName,flightNumber,admissionNumber,modeID
0,5748517.0,2016-05-08,2016-04-30,1.0,LOS,QF,11,94953870000.0,1.0
1,5748518.0,2016-05-17,2016-04-30,1.0,LOS,VA,7,94955620000.0,1.0
2,5748519.0,2016-05-08,2016-04-30,1.0,LOS,DL,40,94956410000.0,1.0
3,5748520.0,2016-05-14,2016-04-30,1.0,LOS,DL,40,94956450000.0,1.0
4,5748521.0,2016-05-14,2016-04-30,1.0,LOS,DL,40,94956390000.0,1.0


## Load Parquet files in for processing (Write and Read)

Define tables to upload in spark/parquet. Count records going out, then read data back in and do another count to ensure everything is good.

In [40]:
#This function provided on Stack Overflow to get a dataframe name when the object itself is passed
# as an argument. Written/provided by cors https://stackoverflow.com/a/54138398

def get_df_name(df):
    name =[x for x in globals() if globals()[x] is df][0]
    return name


In [41]:
tables = [immigration_fact_table, port_table, time_table, \
          mode_table, country_table, visa_table, \
          airport_table, person_table]


In [42]:

for table in tables:
        fullloc = output_location + get_df_name(table)
        table.write.mode("overwrite").parquet(fullloc)
        print("Exporting {} records from {}".format(table.count(),fullloc))
    

Exporting 3096313 records from my_local_data/immigration_fact_table
Exporting 660 records from my_local_data/port_table
Exporting 30 records from my_local_data/time_table
Exporting 4 records from my_local_data/mode_table
Exporting 289 records from my_local_data/country_table
Exporting 3 records from my_local_data/visa_table
Exporting 862 records from my_local_data/airport_table
Exporting 3096313 records from my_local_data/person_table


In [43]:
for table in tables:
        fullloc = output_location + get_df_name(table)
        table = spark.read.parquet(fullloc)
        print("{} imported. {} records found".format(fullloc, table.count()))
    


my_local_data/immigration_fact_table imported. 3096313 records found

my_local_data/port_table imported. 660 records found

my_local_data/time_table imported. 30 records found

my_local_data/mode_table imported. 4 records found

my_local_data/country_table imported. 289 records found

my_local_data/visa_table imported. 3 records found

my_local_data/airport_table imported. 862 records found

my_local_data/person_table imported. 3096313 records found


### Create Data Dictionary

In [71]:
f = open("dictionary.txt", "w")


for table in tables:
    f.write(get_df_name(table) + '\n')
    f.write(table._jdf.schema().treeString() + '\n')
    
f.close()

## Conclusion

The project required a definition and ETL of data of 1 million+ rows. Data was imported, scrubbed for known bad types of data, transformed to deal with incomplete/poor naming, brought into new data frames, and finally uploaded to the output location (local or AWS). From here, the files can be read back into spark and joined or queried as necessary. A common query would be to join Airport Data, Port Data, Immigration, and Person tables to get proper names of airports the visitors

As this was a large data set, spark was a clear choice for dealing with a large number of records. Because of the parquet files, this could also be spun up into multiple instances easily on AWS clusters.

This data should only be run once in it's current form due to the static nature of the data sources. However, if this were a regular update, once a month would likely suffice for new immigration data.

### Scenarios
* Data Increased by 100x? As this is a Spark Job, I do not forsee any issues based on this being an ETL process that uses Spark. The longest piece is the read and write of parquet data, which only takes a few minutes at the current step. Multiplying this by 100 would require the job to run for several hours, but this could likely be broken down into two or more ETL's for simultaneous runs
* Pipline run at 7AM? This ETL only takes a few minutes to run, so this would not be a challenge as currently written. The only changes necessary would be accounting for new file names and potentially exporting files with a date component in the parquet name.
* 100+ People need access to data? Additional EMR clusters would need to be spun up to handle the additionial load, but no other changes would need to be made.