# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project is going to build a dataware with Spark. Internal customers and users can analyze immigration data to generate monthly immigration report or other business analysis. The main data source is I94 immigration data from US National Tourism and Trade Office. It also integrates weather data, City Demographic Data and Airport codes from different data sources.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [60]:
# Do all imports and installs here
import pandas as pd
import os
import configparser
import boto3
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, LongType, FloatType, StringType, DateType
from pyspark.sql.window import Window

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [2]:
# spark withou s3 and sas support
spark = SparkSession.builder \
    .appName("ImmigrationDataLake") \
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will extract data from four data sources, transform and load them into a star schema model through Spark. The end solution is one fact table of immigration records and four dimentional tables: persons, cities, dates and transport. Only Apache Spark will be used and tables will be saved in parquet format, so there is no database needed here.

#### Describe and Gather Data 
The data sources includes I94 Immigration data, world temperature data, U.S. City Demographic Data and Airport code.
I94 Immigration data comes from US National Tourism and Trade Office, which keep track of the arrival and departure to/from the United States of people who are not United States citizens or lawful permanent residents.
World temperature data is from Kaggle, 

### Read in the Immigration sample data

In [6]:
im_df = pd.read_csv('immigration_data_sample.csv', index_col=0)

In [7]:
im_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


#### Read in Weather data

In [15]:
weather_df = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")

In [9]:
weather_df.count()

(8599212, 7)

In [10]:
weather_df.show(n=5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### Read in city demographics

In [11]:
demo_df = pd.read_csv("./us-cities-demographics.csv", sep=";")
print(demo_df.shape)
demo_df.head()

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### i94 immigration data cleaning
- First, the data types of columns are casted to correct ones. All values are recognized as double float.
- Columns with almost none values like occup, entdepu, insnum are dropped
- SAS dates are converted to date format

In [3]:
imm_df = spark.read.parquet('./sas_data/{}/{}'.format("i94yr=2016", "i94mon=4"))
imm_df.limit(5).toPandas()

Unnamed: 0,cicid,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,32.0,1.0,1.0,20160430,SYD,,G,O,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,29.0,1.0,1.0,20160430,SYD,,G,O,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,28.0,1.0,1.0,20160430,SYD,,G,O,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [4]:
imm_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable 

In [6]:
@F.udf(DateType())
def sasToDate(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None

@F.udf(DateType())
def strToDate(x):
    try:
        return datetime.strptime(x, '%m%d%Y')
    except:
        return None

In [7]:
# select relevant columns and convert them to correct data types
# convert country code to country

imm_df = imm_df.select(F.col('cicid').cast(LongType()),
              F.col('i94cit').cast(IntegerType()),
              F.col('i94res').cast(IntegerType()),
              F.col('i94port'),
              F.col('i94addr'),
              sasToDate('arrdate').alias('arrdate'),
              sasToDate('depdate').alias('depdate'),
              strToDate('dtaddto').alias('dtaddto'),
              F.col('biryear').cast(IntegerType()),
              F.col('i94bir').cast(IntegerType()),
              F.col('gender'),
              F.col('i94visa').cast(IntegerType()),
              F.col('visatype'),
              F.col('i94mode').cast(IntegerType()),
              F.col('airline'),
              F.col('fltno'),
              F.col('entdepa'),
              F.col('entdepd'),
              F.col('matflag'),
              F.col('admnum').cast(LongType()),
              F.col('count').cast(IntegerType())
             )

In [8]:
imm_df.limit(5).toPandas()

Unnamed: 0,cicid,i94cit,i94res,i94port,i94addr,arrdate,depdate,dtaddto,biryear,i94bir,gender,i94visa,visatype,i94mode,airline,fltno,entdepa,entdepd,matflag,admnum,count
0,5748517,245,438,LOS,CA,2016-04-30,2016-05-08,2016-10-29,1976,40,F,1,B1,1,QF,11,G,O,M,94953870030,1
1,5748518,245,438,LOS,NV,2016-04-30,2016-05-17,2016-10-29,1984,32,F,1,B1,1,VA,7,G,O,M,94955622830,1
2,5748519,245,438,LOS,WA,2016-04-30,2016-05-08,2016-10-29,1987,29,M,1,B1,1,DL,40,G,O,M,94956406530,1
3,5748520,245,438,LOS,WA,2016-04-30,2016-05-14,2016-10-29,1987,29,F,1,B1,1,DL,40,G,O,M,94956451430,1
4,5748521,245,438,LOS,WA,2016-04-30,2016-05-14,2016-10-29,1988,28,M,1,B1,1,DL,40,G,O,M,94956388130,1


#### Weather data cleaning
Null data is clean, only data after year 2000 and from US cities is kept

In [38]:
weather_df = spark.read.csv('/data2/GlobalLandTemperaturesByCity.csv', inferSchema=True, header=True)

weather_df = weather_df.filter("AverageTemperature is not null")
weather_df = weather_df.withColumn("year", F.year('dt'))
weather_df = weather_df.withColumn("month", F.month('dt'))

weather_df = weather_df.filter("year>2000 and Country='United States'")
weather_df.count()

39320

In [39]:
weather_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
0,2001-01-01,4.465,0.286,Abilene,United States,32.95N,100.53W,2001,1
1,2001-02-01,8.142,0.331,Abilene,United States,32.95N,100.53W,2001,2
2,2001-03-01,9.951,0.298,Abilene,United States,32.95N,100.53W,2001,3
3,2001-04-01,18.488,0.429,Abilene,United States,32.95N,100.53W,2001,4
4,2001-05-01,23.189,0.181,Abilene,United States,32.95N,100.53W,2001,5


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
There are one fact table imm_facts and four dimensional tables. The fact table includes most of items from i94 immigration records and keys linked to dimensional tables. Dimensioanl tables includes persons, cities, dates and transport. Fact table and persons table can be joined to analyze the compoents of visitors to US, age distribution, visa types. Fact table and cities can be used to find the popular landing ports and relations to city's population, temperature. Joining fact table and dates table can discover the popular month of visiting US and the trends with time. Dimensional table transport and fact table help to analyze the popular airlines.

### Fact table: 
#### imm_facts
    cicid primary key
    person_key,
    city_key
    transport_key
    arrival_date
    departure_date
    allowed_date
    arrival_flag
    depart_flag
    match_flag
    count
    
    
### Dimensional tables
#### persons
    person_key
    admnum
    birth_year
    gender
    age
    citizenship
    residence
    visa_category
    visa_type
    
#### cities
    city_key
    city
    states
    latitude
    longitude
    population
    foreign_born
    avg_temp

#### dates
    date_key
    year
    month
    day
    
#### transports
    transport_key
    type
    airline
    flight_no

#### 3.2 Mapping Out Data Pipelines
Here I am not using any s3 or redshift database. All data is processing in this workspace. The steps are just read in tabels, join tables and creating fact and dimensional tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create dimension table persons

In [11]:
# read in i94cit_i94res_codes.csv which is manually created from SAS description file 
i94cit_i94res = spark.read.csv("./i94_codes/i94cit_i94res_codes.csv", sep='|', inferSchema=True, header=True)

In [12]:
i94cit_i94res.limit(5).toPandas()

Unnamed: 0,code,i94ctry
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [13]:
persons_df = imm_df.select(F.col("admnum").alias('person_key'),
                         F.col("admnum"),
                         F.col("biryear").alias("birth_year"),
                         F.col("gender"),
                         F.col("i94bir").alias("age"),
                         F.col("i94cit"),
                         F.col("i94res"),
                         F.col("i94visa").alias("visa_category"),
                         F.col("visatype").alias("visa_type"))

In [14]:
persons_df = persons_df.dropDuplicates()

In [32]:
# join country code, replace code of i94cit column with country names
persons_df = persons_df.join(i94cit_i94res, persons_df['i94cit']==i94cit_i94res['code'], 'left_outer') \
    .select(persons_df["*"], i94cit_i94res["i94ctry"].alias('citizenship')).drop('i94cit')

In [34]:
# join country code, replace code of i94res column with country name 
persons_df = persons_df.join(i94cit_i94res, persons_df['i94res']==i94cit_i94res['code'], 'left_outer') \
    .select(persons_df['*'], i94cit_i94res["i94ctry"].alias('residence')).drop('i94res')

In [35]:
persons_df.limit(5).toPandas()

Unnamed: 0,person_key,admnum,birth_year,gender,age,visa_category,visa_type,citizenship,residence
0,94953870030,94953870030,1976,F,40,1,B1,"CHINA, PRC",AUSTRALIA
1,94955622830,94955622830,1984,F,32,1,B1,"CHINA, PRC",AUSTRALIA
2,94956406530,94956406530,1987,M,29,1,B1,"CHINA, PRC",AUSTRALIA
3,94956451430,94956451430,1987,F,29,1,B1,"CHINA, PRC",AUSTRALIA
4,94956388130,94956388130,1988,M,28,1,B1,"CHINA, PRC",AUSTRALIA


In [36]:
# save the dimensional table as parquet file
persons_df.write.parquet("persons.parquet")

#### Create dimensional table cities

- city_key
- city
- states
- latitude
- longitude
- population
- foreign_born
- avg_temp

In [28]:
# read in i94port_codes.csv which is manually created
i94port_data = spark.read.csv("./i94_codes/i94port_data.csv", inferSchema=True, header=True, sep=",")

In [53]:
# extract i94port from i94 immigration table
cities = imm_df.select('i94port').dropDuplicates()

In [55]:
# join i94port_codes to append city and state columns
cities = cities.join(i94port_data, cities['i94port']==i94port_data['code'], 'left_outer') \
    .select(cities['i94port'].alias('city_key'), i94port_data['city'], i94port_data['state_name'].alias('state'))

In [56]:
cities = cities.dropna()

In [34]:
cities.limit(5).toPandas()

Unnamed: 0,city_key,city,state
0,FMY,Fort Myers,Florida
1,BGM,Bangor,Maine
2,HEL,Helena,Montana
3,DNS,Dunseith,North Dakota
4,MOR,Franklin,Vermont


In [64]:
# join weather information for cities
weather_avg = weather_df.selectExpr("City as city", "dt", "AverageTemperature as avg_temp", "Latitude as latitude", "Longitude as longitude") \
    .groupBy("city", "latitude", "longitude").agg(F.avg("avg_temp").alias("avg_temp"))

In [65]:
weather_avg = weather_avg.dropDuplicates()

In [66]:
win = Window().partitionBy('city').orderBy('avg_temp')
weather_avg = weather_avg.withColumn('nx', F.row_number().over(win)).where("nx=1").drop("nx")

In [67]:
cities = cities.join(weather_avg, cities['city']==weather_avg['city'], "left") \
    .select(cities['*'], weather_avg['latitude'], weather_avg['longitude'], weather_avg['avg_temp'])

In [69]:
# Read in demographics and combine demographics into cities
demo_df = spark.read.csv("us-cities-demographics.csv", inferSchema=True, header=True, sep=";")

In [70]:
demo_df = demo_df.select("City", "State", "Total Population", "Foreign-born").dropDuplicates()

In [71]:
cities = cities.join(demo_df, [cities['city']==demo_df['City'], cities['state']==demo_df['State']], 'left_outer') \
    .select(cities["*"], demo_df["Total Population"].alias('population'), demo_df["Foreign-born"].alias('foreign_born'))

In [73]:
cities.limit(5).toPandas()

Unnamed: 0,city_key,city,state,latitude,longitude,avg_temp,population,foreign_born
0,FRB,Fairbanks,Alaska,,,,,
1,CHS,Charleston,West Virginia,32.95N,79.47W,19.56715,,
2,CHL,Charleston,South Carolina,32.95N,79.47W,19.56715,135524.0,5767.0
3,MDT,Harrisburg,Pennsylvania,,,,,
4,LUB,Lubec,Maine,,,,,


In [74]:
# write cities table into parquet file
cities.write.parquet("cities.parquet")

#### Dates

In [79]:
# extract and merge all dates
dates = imm_df.select(F.col('arrdate').alias('date_key')).dropDuplicates() \
    .union(imm_df.select(F.col('depdate').alias('date_key')).dropDuplicates()) \
    .union(imm_df.select(F.col('dtaddto').alias('date_key')).dropDuplicates())

In [80]:
dates = dates.orderBy('date_key').dropDuplicates().dropna()

In [81]:
@F.udf(IntegerType())
def yearUDF(x):
    return x.year

@F.udf(IntegerType())
def monthUDF(x):
    return x.month

@F.udf(IntegerType())
def dayUDF(x):
    return x.day

dates_df = dates.select(F.col('date_key'), yearUDF('date_key').alias('year'), \
                        monthUDF('date_key').alias('month'), dayUDF('date_key').alias('day'))

In [82]:
dates_df.limit(5).toPandas()

Unnamed: 0,date_key,year,month,day
0,2001-07-20,2001,7,20
1,2012-04-12,2012,4,12
2,2012-04-14,2012,4,14
3,2014-04-22,2014,4,22
4,2014-04-24,2014,4,24


In [83]:
dates_df.write.parquet('dates.parquet')

#### Create transport dimensional table

- transport_key
- type
- airline
- flight_no

In [18]:
transport_df = imm_df.select(F.md5(F.concat('i94mode', 'airline', 'fltno')).alias('transport_key') \
                             , F.col('i94mode').alias('type'), F.col('airline'), F.col('fltno').alias('flight_no')) \
                    .dropDuplicates()

In [20]:
transport_df = transport_df.dropna()

In [21]:
transport_df.write.parquet('transports.parquet')

#### create fact table 

- cicid primary key
- person_key,
- city_key
- transport_key
- arrival_date
- departure_date
- allowed_date
- arrival_flag
- depart_flag
- match_flag
- count

In [27]:
imm_facts = imm_df.select(F.col('cicid'), F.col('admnum').alias('person_key'), F.col('i94port').alias('city_key'), \
             F.md5(F.concat('i94mode', 'airline', 'fltno')).alias('transport_key'), \
             F.col('arrdate').alias('arrival_date'), \
             F.col('depdate').alias('departure_date'), \
             F.col('dtaddto').alias('allowed_date'), \
             F.col('entdepa').alias('arrival_flag'), \
             F.col('entdepd').alias('depart_flag'), \
             F.col('matflag').alias('match_flag'), \
             F.col('count'))

In [28]:
imm_facts.limit(5).toPandas()

Unnamed: 0,cicid,person_key,city_key,transport_key,arrival_date,departure_date,allowed_date,arrival_flag,depart_flag,match_flag,count
0,5748517,94953870030,LOS,5832ca7bd5357a042d71c466f8fb8d87,2016-04-30,2016-05-08,2016-10-29,G,O,M,1
1,5748518,94955622830,LOS,c08e32c52ad9483d23693388d93e1f83,2016-04-30,2016-05-17,2016-10-29,G,O,M,1
2,5748519,94956406530,LOS,dc57ffd9fcf2b8592b188771a4da696b,2016-04-30,2016-05-08,2016-10-29,G,O,M,1
3,5748520,94956451430,LOS,dc57ffd9fcf2b8592b188771a4da696b,2016-04-30,2016-05-14,2016-10-29,G,O,M,1
4,5748521,94956388130,LOS,dc57ffd9fcf2b8592b188771a4da696b,2016-04-30,2016-05-14,2016-10-29,G,O,M,1


In [29]:
imm_facts.write.parquet('imm_facts.parquet')

#### 4.2 Data Quality Checks
Quality checks includes:
 * The primary key of every table is unique.
 * Check the count number of every table is not zero
 
Run Quality Checks

In [3]:
# fact table quality check
imm_facts = spark.read.parquet('imm_facts.parquet')

In [4]:
print(imm_facts.count())

3096313


In [5]:
if imm_facts.count() == imm_facts.select('cicid').dropDuplicates().count():
    print("primary key cicid is unique")
else:
    print("primary key cicid is not unique")

primary key cicid is unique


In [6]:
# dimensional table persons quality check
persons = spark.read.parquet('persons.parquet')
print(persons.count())

3096313


In [7]:
if persons.count() == persons.select('person_key').dropDuplicates().count():
    print("primary key person_key is unique")
else:
    print("primary key person_key is not unique")

primary key person_key is not unique


In [75]:
# dimensional table cities quality check
cities = spark.read.parquet('cities.parquet')
print(cities.count())

299


In [76]:
if cities.count() == cities.select('city_key').dropDuplicates().count():
    print("primary key city_key is unique")
else:
    print("primary key city_key is not unique")

primary key city_key is unique


In [77]:
# dimensional table dates quality check
dates = spark.read.parquet('dates.parquet')
print(dates.count())

1037


In [84]:
if dates.count() == dates.select('date_key').dropDuplicates().count():
    print("primary key date_key is unique")
else:
    print("primary key date_key is unique")

primary key date_key is unique


In [85]:
# dimensional table transports quality check
transports = spark.read.parquet('transports.parquet')
print(transports.count())

10369


In [86]:
if transports.count() == transports.select('transport_key').dropDuplicates().count():
    print("primary key transport_key is unique")
else:
    print('primary key transport_key is not unique')

primary key transport_key is unique


#### 4.3 Data dictionary 

| table_name | field          | data source              | description |
| ---------- | -------------- | ------------------------ | ----------- |
| imm_facts  | cicid          | I94 Immigration Data | Primary key of fact table |
| imm_facts  | person_key     | I94 Immigration Data | Foreign key of dimension table persons |
| imm_facts  | city_key       | I94 Immigration Data | Foreign key of dimension table cities |
| imm_facts  | transport_key  | MD5 of data from I94 | Foreign key of dimension table transports |
| imm_facts  | arrival_date   | I94 Immigration Data | Arrival date |
| imm_facts  | departure_date | I94 Immigration Data | Departure date |
| imm_facts  | allowed_date   | I94 Immigration Data | Allowed date |
| imm_facts  | arrival_flag   | I94 Immigration Data | Flag of arrival |
| imm_facts  | depart_flag    | I94 Immigration Data | Flag of departure |
| imm_facts  | match_flag     | I94 Immigration Data | Matching flag |
| imm_facts  | count          | I94 Immigration Data | Always 1 for count |
| persons | person_key | Same as admnum | Primary key of dimension table Persons |
| persons | admnum | I94 Immigration Data | Admission number |
| persons | birth_year | I94 Immigration Data | birth year |
| persons | gender | I94 Immigration Data | gender |
| persons | age | I94 Immigration Data | age |
| persons | citizenship | I94 Immigration Data | Country of Citizenship |
| persons | residence | I94 Immigration Data | Country of residence |
| persons | visa_category | I94 Immigration Data | Visa Category |
| persons | visa_type | I94 Immigration Data | Visa Type |
| cities | city_key                | I94 Immigration Data | Primary key of dimension table cities |
| cities | city | SAS description | City |
| cities | state | SAS description | State |
| cities | latitude | World Temperature Data | Latitude |
| cities | longitude | World Temperature Data | Longitude |
| cities | population | U.S. City Demographic Data | Population of the city |
| cities | foreign_born | U.S. City Demographic Data | Population born in other countries |
| cities | avg_temp | World Temperature Data | Average temperature |
| dates | date_key | I94 Immigration Data | Date |
| dates | year | date_key | year |
| dates | month | date_key | Month |
| dates | day | date_key | Day |
| transports | transport_key | MD5 of type, airline and flight_no | Primary key of dimension table transports |
| transports | type | I94 Immigration Data | Type |
| transports | airline | I94 Immigration Data | Airline |
| transports | flight_no | I94 Immigration Data | Flight Number |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### Tools and technologies 
The project is currently finished in Udacity workspace and only Apache Spark is used. It can be moved to Amazon EMR platform and S3 as the storage.

##### Data Update schedule
The data is suggested to update monthly. It can reduce the amount of data for analysis every time.

##### Changes required on solution if data changed by the following factors:

* The data was increased by 100x.

If the data was increased by 100x, the Amazon EMR platform will be a good choice to process the data. And airflow can be used to author and schedule the workflow periodically, like daily or hourly. In this way, the amount of data needed to process will be decreased dramatically each time.

* The data populates a dashboard that must be updated on a daily basis by 7am every day.

The best choice will be using Airflow to author and schedule the workflow to meet the daily requirement

* The database needed to be accessed by 100+ people.

In order to speed the paralization, the data model can be stored in HDFS, instead of s3. We can also consider redshift as the database which has better performance for paralized access.