# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [23]:
# Do all imports and installs here
import pandas as pd
import os
import glob
from datetime import datetime
import datetime as dt

import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col,avg,asc,desc,lit
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,to_timestamp,unix_timestamp
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.sql.window import Window

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Preparing AWS redshift account and password

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

#os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

['dl.cfg']

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

## US 2016 immigration data

This data is in .sas format and is already stored in the local.

In [3]:
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [None]:
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

In [4]:
df_spark.printSchema()
df_spark.limit(5).toPandas()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [111]:
df_spark.count()

3096313

Data explanation

- cicid: Unique id of each entry
- i94yr: 4 digit year
- i94mon: Numeric month
- i94cit: 3 digit code of immigrant origin country
- i94res: 3 digit code for immigrant country of residence
- i94port: Port of admission
- arrdate: Arrival Date in the USA
- i94mode: Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- i94addr: USA State of arrival
- depdate: Departure Date from the USA
- i94bir: Age of Respondent in Years
- i94visa: Visa codes collapsed into three categories
- count: Field used for summary statistics
- dtadfile: Character Date Field - Date added to I-94 Files
- visapost: Department of State where where Visa was issued
- occup: Occupation that will be performed in U.S
- entdepa: Arrival Flag - admitted or paroled into the U.S.
- entdepd: Departure Flag - Departed, lost I-94 or is deceased
- entdepu: Update Flag - Either apprehended, overstayed, adjusted to perm residence
- matflag: Match flag - Match of arrival and departure records
- biryear: 4 digit year of birth
- dtaddto: Character Date Field - Date to which admitted to U.S. (allowed to stay until)
- gender: Non-immigrant sex
- insnum: INS number
- airline: Airline used to arrive in U.S.
- admnum: Admission Number
- fltno: Flight number of Airline used to arrive in U.S.
- visatype: Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

## Tweets US airline Sentiment data

As the original source says,

A sentiment analysis job about the problems of each major U.S. airline. Twitter data was scraped from February of 2015 and contributors were asked to first classify positive, negative, and neutral tweets, followed by categorizing negative reasons (such as "late flight" or "rude service").

It contains whether the sentiment of the tweets in this set was positive, neutral, or negative for six US airlines.


In [4]:
tweet = spark.read.csv('Tweets.csv', header=True, inferSchema=True)
tweet.limit(5).toPandas()

Unnamed: 0,tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
0,570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
1,570301130888122368,positive,0.3486,,0.0,Virgin America,,jnardino,,0,@VirginAmerica plus you've added commercials t...,,2015-02-24 11:15:59 -0800,,Pacific Time (US & Canada)
2,570301083672813571,neutral,0.6837,,,Virgin America,,yvonnalynn,,0,@VirginAmerica I didn't today... Must mean I n...,,2015-02-24 11:15:48 -0800,Lets Play,Central Time (US & Canada)
3,570301031407624196,negative,1.0,Bad Flight,0.7033,Virgin America,,jnardino,,0,"""@VirginAmerica it's really aggressive to blas...",,2015-02-24 11:15:36 -0800,,Pacific Time (US & Canada)
4,570300817074462722,negative,1.0,Can't Tell,1.0,Virgin America,,jnardino,,0,@VirginAmerica and it's a really big bad thing...,,2015-02-24 11:14:45 -0800,,Pacific Time (US & Canada)


In [6]:
tweet.groupby('airline_sentiment_gold').count().show()
#df.groupBy($"value").count.orderBy($"count".desc)

+----------------------+-----+
|airline_sentiment_gold|count|
+----------------------+-----+
|              positive|    5|
|        Rossford, Ohio|    1|
|  Central Time (US ...|    2|
|                  null|14788|
|  Atlantic Time (Ca...|    2|
|                London|    1|
|               neutral|    3|
|              negative|   32|
|  Eastern Time (US ...|    3|
+----------------------+-----+



### Data Explanation

- tweet_id: tweet identifier
- airline_sentiment: sentiment toward one airline (negative, positive and neutral)
- airline_sentiment_confidence: the probability of predicting the sentiment of the tweet.
- negativereason: reason to give when the comment is negative
- negativereason_confidence: the probability of predicting the reason of the negative sentiment
- airline: airline name 
- airline_sentiment_gold
- name: name of user
- negativereason_gold
- retweet_count: the number the tweet has been retweeted
- text: text content of the comment
- tweet_coord: geographic coordination where the tweets were posted
- tweet_created: date the tweet has been created
- tweet_location: location where the tweets have been posted
- user_timezone: user timezone

## Airlines Code data

The aireline list and its relevant IATA and ICAO code. This data is use to join the tweets data, so we can know the acronym of the airline.

In [5]:

#airlines = pd.read_csv('airlines.dat', sep=',')
#airlines.columns =['index', 'airline', 'none', 'IATA','ICAO','unknown','COUNTRY','NONE2']
#airlines.head()
airlines = spark.read.csv('airlines.dat', header=True, inferSchema=True)
airlines.printSchema()

root
 |-- -1: integer (nullable = true)
 |-- Unknown: string (nullable = true)
 |-- \N2: string (nullable = true)
 |-- -: string (nullable = true)
 |-- N/A: string (nullable = true)
 |-- \N5: string (nullable = true)
 |-- \N6: string (nullable = true)
 |-- Y: string (nullable = true)



In [6]:
#airlines.columns =['index', 'airline', 'none', 'IATA','ICAO','unknown','COUNTRY','NONE2']
airlines = airlines.withColumnRenamed("-1","index").withColumnRenamed("Unknown","airline").withColumnRenamed("N2","none").withColumnRenamed("-","IATA").withColumnRenamed("N/A","ICAO").withColumnRenamed("N5","UNKNOWN").withColumnRenamed("N6","COUNTRY").withColumnRenamed("Y","NONE2")
airlines = airlines.select('airline','IATA','ICAO')
airlines.show(5)

+--------------------+----+----+
|             airline|IATA|ICAO|
+--------------------+----+----+
|      Private flight|   -| N/A|
|         135 Airways|null| GNL|
|       1Time Airline|  1T| RNX|
|2 Sqn No 1 Elemen...|null| WYT|
|     213 Flight Unit|null| TFU|
+--------------------+----+----+
only showing top 5 rows



In [96]:
tweet_airline = tweet.join(airlines, on='airline', how='left')
tweet_airline.limit(5).toPandas()

Unnamed: 0,airline,tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone,IATA,ICAO
0,Virgin America,570306133677760513,neutral,1.0,,,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada),VX,VRD
1,Virgin America,570301130888122368,positive,0.3486,,0.0,,jnardino,,0,@VirginAmerica plus you've added commercials t...,,2015-02-24 11:15:59 -0800,,Pacific Time (US & Canada),VX,VRD
2,Virgin America,570301083672813571,neutral,0.6837,,,,yvonnalynn,,0,@VirginAmerica I didn't today... Must mean I n...,,2015-02-24 11:15:48 -0800,Lets Play,Central Time (US & Canada),VX,VRD
3,Virgin America,570301031407624196,negative,1.0,Bad Flight,0.7033,,jnardino,,0,"""@VirginAmerica it's really aggressive to blas...",,2015-02-24 11:15:36 -0800,,Pacific Time (US & Canada),VX,VRD
4,Virgin America,570300817074462722,negative,1.0,Can't Tell,1.0,,jnardino,,0,@VirginAmerica and it's a really big bad thing...,,2015-02-24 11:14:45 -0800,,Pacific Time (US & Canada),VX,VRD


## Border Entry Data
This data comes from Kaggle.

The Bureau of Transportation Statistics (BTS) Border Crossing Data provide summary statistics for inbound crossings at the U.S.-Canada and the U.S.-Mexico border at the port level. Data are available for trucks, trains, containers, buses, personal vehicles, passengers, and pedestrians. Border crossing data are collected at ports of entry by U.S. Customs and Border Protection (CBP). The data reflect the number of vehicles, containers, passengers or pedestrians entering the United States. CBP does not collect comparable data on outbound crossings. Users seeking data on outbound counts may therefore want to review data from individual bridge operators, border state governments, or the Mexican and Canadian governments.

COVERAGE: Incoming vehicle, container, passenger, and pedestrian counts at U.S.-Mexico and U.S.-Canada land border ports.

In [8]:
border_entry = spark.read.csv('Border_Crossing_Entry_Data.csv',header=True, inferSchema=True)
border_entry.limit(16).toPandas()

Unnamed: 0,Port Name,State,Port Code,Border,Date,Measure,Value
0,Alcan,AK,3104,US-Canada Border,2/1/2020 00:00,Personal Vehicle Passengers,1414
1,Alcan,AK,3104,US-Canada Border,2/1/2020 00:00,Personal Vehicles,763
2,Alcan,AK,3104,US-Canada Border,2/1/2020 00:00,Truck Containers Empty,412
3,Alcan,AK,3104,US-Canada Border,2/1/2020 00:00,Truck Containers Full,122
4,Alcan,AK,3104,US-Canada Border,2/1/2020 00:00,Trucks,545
5,Alexandria Bay,NY,708,US-Canada Border,2/1/2020 00:00,Bus Passengers,1174
6,Alexandria Bay,NY,708,US-Canada Border,2/1/2020 00:00,Buses,36
7,Alexandria Bay,NY,708,US-Canada Border,2/1/2020 00:00,Personal Vehicle Passengers,68630
8,Alexandria Bay,NY,708,US-Canada Border,2/1/2020 00:00,Personal Vehicles,31696
9,Alexandria Bay,NY,708,US-Canada Border,2/1/2020 00:00,Truck Containers Empty,1875


In [11]:
border_entry.groupby('Border').count().show()

+----------------+------+
|          Border| count|
+----------------+------+
|US-Mexico Border| 82673|
|US-Canada Border|272838|
+----------------+------+



In [11]:
border_entry.printSchema()

root
 |-- Port Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Port Code: integer (nullable = true)
 |-- Border: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Measure: string (nullable = true)
 |-- Value: integer (nullable = true)



In [12]:
border_entry.count()

355511

#### Data explanation:

- Bus Crossings: Number of arriving buses at a particular port, whether or not they are carrying passengers.
- Container: A Container is defined as any conveyance entering the U.S. used for commercial purposes, either full or empty. Includes containers moving in-bond for the port initiating the bonded movements.
- Types of Containers: The following are examples of a Container: Stakebed truck, truck with a car carrier, van, pickup truck/car, flatbed truck, piggyback truck with two linked trailers/containers = 2 containers, straight truck, bobtail truck, railcar, rail flatbed car stacked with four containers = 4 containers (on each rail car if there is multiple box containers count each container and the flatbed car.), and tri-level boxcar with multiple containers inside = 3 containers
- Passengers Crossing In Buses: Number of persons arriving by bus requiring U.S. Customs and Border Protection (CBP) processing.
- Passengers Crossing In Privately Owned Vehicles: Persons entering the United States at a particular port by private automobiles, pick-up trucks, motorcycles, recreational vehicles, taxis, ambulances, hearses, tractors, snowmobiles and other motorized private ground vehicles.
- Passengers Crossing In Trains: Number of passengers and crew arriving by train and requiring CBP processing.
- Pedestrian Crossings: The number of persons arriving on foot or by certain conveyance (such as bicycles, mopeds, or wheel chairs) requiring CBP processing.
- Privately Owned Vehicle Crossings: Number of privately owned vehicles (POVs) arriving at a particular port. Includes pick-up trucks, motorcycles, recreational vehicles, taxis, snowmobiles, ambulances, hearses, and other motorized private ground vehicles.
- Rail Container Crossings (loaded and empty): A container is any conveyance entering the U.S. used for commercial purposes, full or empty. In this case, it is the number of full or empty rail containers arriving at a port. This series includes containers moving as inbound shipments.
- Train Crossings: Number of arriving trains at a particular port.
- Truck Container Crossings (loaded and empty): A container is any conveyance entering the U.S. used for commercial purposes, full or empty. In this case, it is the number of full or empty truck containers arriving at a port. This series includes containers moving as inbound shipments.
- Truck Crossings: Number of arriving trucks; does not include privately owned pick-up trucks.

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [13]:
def missing_values(df):
    """Visualize missing values in a spark dataframe
    
    :param df: spark dataframe
    """
    # create a dataframe with missing values count per column
    nan_count_df = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()
    
    # convert dataframe from wide format to long format
    nan_count_df = pd.melt(nan_count_df, var_name='cols', value_name='values')
    
    # count total records in df
    total = df.count()
    
    
    # now lets add % missing values column
    nan_count_df['% missing values'] = 100*nan_count_df['values']/total
    
    nan_count_df = nan_count_df.sort_values('% missing values', ascending=False)
    
    return nan_count_df

## US 2016 immigration data

In [106]:
missing_values(df_spark)

Unnamed: 0,cols,values,% missing values
18,entdepu,3095921,99.98734
15,occup,3088187,99.737559
23,insnum,2982605,96.327632
14,visapost,1881250,60.757746
22,gender,414269,13.379429
8,i94addr,152592,4.928184
9,depdate,142457,4.600859
19,matflag,138429,4.470769
17,entdepd,138429,4.470769
24,airline,83627,2.700857


We can see from the missing data columns, columns 'entdepu','occup' and 'isnum' have almost only null value data. So we are going to drop these columns

In [9]:
to_drop = ['entdepu','occup','insnum']
immigration_data = df_spark.drop(*to_drop)
immigration_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



Now let's check whether there are duplicated data. Since 'cicid' is the unique identifier, we should drop the duplicated data by using 'cicid'

In [10]:
immigration_data = immigration_data.dropDuplicates(['cicid'])
immigration_data.count()

3096313

## Tweeter Data

By repeating the same cleaning steps from the immigration dataset.

In [112]:
missing_values(tweet_airline)

Unnamed: 0,cols,values,% missing values
8,negativereason_gold,14805,99.784323
6,airline_sentiment_gold,14788,99.669745
11,tweet_coord,13768,92.795039
15,IATA,11420,76.969738
16,ICAO,11420,76.969738
4,negativereason,5573,37.561502
14,user_timezone,5103,34.393745
13,tweet_location,5010,33.766934
5,negativereason_confidence,4229,28.503067
12,tweet_created,389,2.621824


In [97]:
to_drop = ['negativereason_gold','airline_sentiment_gold','tweet_coord']
tweet_airline = tweet_airline.drop(*to_drop)
tweet_airline.printSchema()

root
 |-- airline: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- name: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)



In [12]:
tweet_airline = tweet_airline.dropDuplicates(['tweet_id'])
tweet_airline.count()

14657

In [46]:
tweet_airline.groupby('airline_sentiment').count().show()

+--------------------+-----+
|   airline_sentiment|count|
+--------------------+-----+
|   ubetter do smth!"|    1|
|[35.23185283, -80...|    1|
|            positive| 2334|
| we had a good ru...|    1|
| never submits. F...|    1|
|[40.7740308, -73....|    1|
|                   0|    6|
|     please????????"|    1|
|                null|  136|
| flight AA1469 2/...|    1|
|[51.44284934, -0....|    1|
|[40.65062011, -73...|    1|
| or just days tha...|    1|
|             neutral| 3069|
|            negative| 9082|
| and I might choo...|    1|
|            Virginia|    1|
|          [0.0, 0.0]|    1|
| you've been rath...|    1|
| gave ticks away ...|    1|
+--------------------+-----+
only showing top 20 rows



In [98]:
tweet_airline = tweet_airline.where((col("airline_sentiment") == "positive") | (col("airline_sentiment") == "neutral") | (col("airline_sentiment") == "negative"))

In [67]:
tweet_airline.groupby('airline_sentiment').count().show()

+-----------------+-----+
|airline_sentiment|count|
+-----------------+-----+
|         positive| 2363|
|          neutral| 3099|
|         negative| 9178|
+-----------------+-----+



## Border entry data

In [117]:
missing_values(border_entry)

Unnamed: 0,cols,values,% missing values
0,Port Name,0,0.0
1,State,0,0.0
2,Port Code,0,0.0
3,Border,0,0.0
4,Date,0,0.0
5,Measure,0,0.0
6,Value,0,0.0


In [13]:
border_entry = border_entry.withColumn("datetype_timestamp", to_timestamp("Date", "MM/dd/yyyy HH:mm"))

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The method I am going to use to build my data model is **Star Schema**. It will be made by 4 dimension tables and one fact table.

<img src="star_schema2.JPG"> 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Visit dimension table

In [14]:
# select the columns which I want to make visit dimenstion table
visit = immigration_data.select(col('cicid').alias('visit_id'),\
                               col('i94cit').alias('origin_country'),\
                               col('i94res').alias('residence_country'),\
                               col('gender').alias('gender'),\
                               col('i94bir').alias('age'),\
                               col('i94port').alias('arrival_airport'),\
                               col('i94addr').alias('arrival_state'),\
                               col('i94mode').alias('transportation_mode'),\
                               col('I94visa').alias('visa_category'),\
                               col('visatype').alias('visa_type'),\
                               col('airline').alias('arrival_airline'),\
                               col('fltno').alias('flight_number'))
visit.printSchema()

root
 |-- visit_id: double (nullable = true)
 |-- origin_country: double (nullable = true)
 |-- residence_country: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- arrival_state: string (nullable = true)
 |-- transportation_mode: double (nullable = true)
 |-- visa_category: double (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- arrival_airline: string (nullable = true)
 |-- flight_number: string (nullable = true)



In [18]:
visit.limit(5).toPandas()

Unnamed: 0,visit_id,origin_country,residence_country,gender,age,arrival_airport,arrival_state,transportation_mode,visa_category,visa_type,arrival_airline,flight_number
0,299.0,103.0,103.0,,54.0,NYC,NY,1.0,2.0,WT,OS,87
1,305.0,103.0,103.0,,63.0,NYC,NY,1.0,2.0,WT,OS,87
2,496.0,103.0,103.0,,64.0,CHI,IL,1.0,1.0,WB,OS,65
3,558.0,103.0,103.0,M,42.0,SFR,CA,1.0,1.0,WB,LH,454
4,596.0,103.0,103.0,M,24.0,NAS,FL,1.0,2.0,WT,UP,221


In [18]:
# Save the table into parquet and partion by 'arrival_year','arrival_month'
partitions = ['arrival_state','arrival_airport']

visit.write.parquet("visit", partitionBy=partitions, mode="overwrite")

### Time Dimension Table

In [15]:
# select the columns which I want to make visitor dimenstion table
time = immigration_data.select(col('arrdate').alias('arrival_date'),\
                               col('depdate').alias('departure_date'))
time.printSchema()

root
 |-- arrival_date: double (nullable = true)
 |-- departure_date: double (nullable = true)



In [16]:
# Convert SAS numeric date into normal datetime
get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

time = time.withColumn('arrival_date_time', get_datetime(time['arrival_date']))

time = time.withColumn('arrival_year', year('arrival_date_time'))
time = time.withColumn('arrival_month', month('arrival_date_time'))
time = time.withColumn('arrival_day', dayofmonth('arrival_date_time'))
time = time.withColumn('arrival_week', weekofyear('arrival_date_time'))
time = time.withColumn('arrival_day_of_week', dayofweek('arrival_date_time'))

time = time.withColumn('departure_date_time', get_datetime(time['arrival_date']))
time = time.withColumn('departure_year', year('departure_date_time'))
time = time.withColumn('departure_month', month('departure_date_time'))
time = time.withColumn('departure_day', dayofmonth('departure_date_time'))
time = time.withColumn('departure_week', weekofyear('departure_date_time'))
time = time.withColumn('departure_day_of_week', dayofweek('departure_date_time'))

In [29]:
time.printSchema()

root
 |-- arrival_date: double (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- arrival_date_time: string (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- arrival_week: integer (nullable = true)
 |-- arrival_day_of_week: integer (nullable = true)
 |-- departure_date_time: string (nullable = true)
 |-- departure_year: integer (nullable = true)
 |-- departure_month: integer (nullable = true)
 |-- departure_day: integer (nullable = true)
 |-- departure_week: integer (nullable = true)
 |-- departure_day_of_week: integer (nullable = true)



In [30]:
partitions = ['arrival_year','arrival_month']

time.write.parquet("time", partitionBy=partitions, mode="overwrite")

### Tweet_airline dimension table

In [99]:
from pyspark.sql.functions import when

tweet_airline = tweet_airline.withColumn("positive_sentiment", when(tweet_airline.airline_sentiment == "positive", 1).otherwise(0)).\
                                      withColumn("negative_sentiment", when(tweet_airline.airline_sentiment == "negative", 1).otherwise(0)).\
                                      withColumn("neutral_sentiment", when(tweet_airline.airline_sentiment == "neutral", 1).otherwise(0))      

In [100]:
tweet_airline = tweet_airline.filter("'airline' is not null").filter("'airline_sentiment' is not null")

tweet_airline = tweet_airline.select(col('ICAO').alias('airline_ICAO_code'),\
                               col('IATA').alias('airline_IATA_code'),\
                               col('airline').alias('airline_name'),\
                               col('positive_sentiment'),\
                               col('negative_sentiment'),\
                               col('neutral_sentiment'))     
tweet_airline.printSchema()

root
 |-- airline_ICAO_code: string (nullable = true)
 |-- airline_IATA_code: string (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- positive_sentiment: integer (nullable = false)
 |-- negative_sentiment: integer (nullable = false)
 |-- neutral_sentiment: integer (nullable = false)



In [103]:
tweet_airline = tweet_airline.groupby('airline_ICAO_code','airline_IATA_code','airline_name').sum()

In [106]:
tweet_airline.printSchema()

root
 |-- airline_ICAO_code: string (nullable = true)
 |-- airline_IATA_code: string (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- sum(positive_sentiment): long (nullable = true)
 |-- sum(negative_sentiment): long (nullable = true)
 |-- sum(neutral_sentiment): long (nullable = true)



In [108]:
tweet_airline = tweet_airline.withColumnRenamed("sum(positive_sentiment)","positive_sentiment").\
                              withColumnRenamed("sum(negative_sentiment)","negative_sentiment").\
                              withColumnRenamed("sum(neutral_sentiment)","neutral_sentiment")

In [109]:
tweet_airline.printSchema()

root
 |-- airline_ICAO_code: string (nullable = true)
 |-- airline_IATA_code: string (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- positive_sentiment: long (nullable = true)
 |-- negative_sentiment: long (nullable = true)
 |-- neutral_sentiment: long (nullable = true)



In [110]:
partitions = ['airline_name']

tweet_airline.write.parquet("tweet_airline", partitionBy=partitions, mode="overwrite")

### Border_entry dimension table

In [107]:
border_entry.printSchema()

root
 |-- Port Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Port Code: integer (nullable = true)
 |-- Border: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Measure: string (nullable = true)
 |-- Value: integer (nullable = true)
 |-- datetype_timestamp: timestamp (nullable = true)



In [18]:
border_entry = border_entry.select(col('Port Code').alias('border_port_code'),\
                               col('Port Name').alias('border_port_code'),\
                               col('State').alias('border_state'),\
                               col('Border').alias('border'),\
                               col('datetype_timestamp').alias('border_entry_date'),\
                               col('Measure').alias('border_transportation_measure'),\
                               col('Value').alias('border_entry_number'))

In [109]:
partitions = ['border_port_code','border_port_code']

border_entry.write.parquet("border_entry", partitionBy=partitions, mode="overwrite")

### Immigration fact table

In [27]:
immigration_fact = immigration_data.select(col('cicid').alias('visit_id'),\
                               col('airline').alias('arrival_airline'),\
                               col('arrdate').alias('arrival_date_original'),\
                               col('i94addr').alias('arrival_state'),\
                               col('i94port').alias('arrival_airport'),\
                               col('I94visa').alias('visa_category'),\
                               col('visatype').alias('visa_type'),\
                               col('i94cit').alias('origin_country'),\
                               col('i94res').alias('residence_country'),\
                               col('gender').alias('gender'),\
                               col('i94bir').alias('age'))

get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

immigration_fact = immigration_fact.withColumn('arrival_date', get_datetime(immigration_fact['arrival_date_original']))

immigration_fact = immigration_fact.drop('arrival_date_original')

immigration_fact = immigration_fact.withColumn('airport_entry_count', lit(1))

immigration_fact.printSchema()

root
 |-- visit_id: double (nullable = true)
 |-- arrival_airline: string (nullable = true)
 |-- arrival_state: string (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- visa_category: double (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- origin_country: double (nullable = true)
 |-- residence_country: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- airport_entry_count: integer (nullable = false)



In [26]:
immigration_fact.limit(5).toPandas()

Unnamed: 0,visit_id,arrival_airline,arrival_state,arrival_airport,visa_category,visa_type,origin_country,residence_country,gender,age,arrival_date,count
0,299.0,OS,NY,NYC,2.0,WT,103.0,103.0,,54.0,2016-04-01,1
1,305.0,OS,NY,NYC,2.0,WT,103.0,103.0,,63.0,2016-04-01,1
2,496.0,OS,IL,CHI,1.0,WB,103.0,103.0,,64.0,2016-04-01,1
3,558.0,LH,CA,SFR,1.0,WB,103.0,103.0,M,42.0,2016-04-01,1
4,596.0,UP,FL,NAS,2.0,WT,103.0,103.0,M,24.0,2016-04-01,1


In [None]:
partitions = ['arrival_date']

border_entry.write.parquet("immigration_fact", partitionBy=partitions, mode="overwrite")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [111]:
# Perform quality checks here
# Source/Count checks to ensure completeness
def completeness_check(df, table_names):
    
    total_count = df.count()

    if total_count == 0:
        print(f"Data quality check failed. {table_names} contained 0 rows")
    else:
        print(f"Data quality on table {table_names} check passed with {df.count()} records")
    return 0

#create a dictionary for the tables
table_names = {
    'immigration_fact_table': immigration_fact,
    'visit_dimension_table': visit,
    'time_dimension_table': time,
    'tweet_airline_dimension_table': tweet_airline,
    'border_entry_dimension_Table': border_entry
}

for table_names, df in table_names.items():
    completeness_check(df, table_names)
                                

Data quality on table immigration_fact_table check passed with 3096313 records
Data quality on table visit_dimension_table check passed with 3096313 records
Data quality on table time_dimension_table check passed with 3096313 records
Data quality on table tweet_airline_dimension_table check passed with 6 records
Data quality on table border_entry_dimension_Table check passed with 355511 records


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Visits dimension table

| **Data Point** | **Explanation** |
| --- | --- | 
|visit_id | Unique ID of visit |
|origin_country|Countries where the visitors come grom
|residence_country|Countries where the visitors live in
|gender|gender of visitors
|age|age of visitors
|arrival_airport|airport where visitors arrive
|arrival_state|state of US where visitors arrive
|transportation_mode|tranporation mode : 1 = 'Air', 2 ='Sea', 3='Land', 9='Not reported'.
|visa_category|visa categories:1 = Business , 2 = Pleasure, 3 = Student
|visa_type|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
|arrival_airline|airline code with which visitors come to US
|flight_number|flight number of airline


### Time dimension table

| **Data Point** | **Explanation** |
| --- | --- | 
|arrival_date	|Date when visitors arrive in US
|arrival_year	|Year when visitors arrive in US
|arrival_month	|Month when visitors arrive in US
|arrival_day	|Day when visitors arrive in US
|arrival_week	|Week when visitors arrive in US
|arrival_day_of_week	|Week when visitors arrive in US
|departure_date|	Date when visitors depare in US
|departure_year	|Year when visitors depare in US
|departure_month	|Month when visitors depare in US
|departure_day	|Day when visitors depare in US
|departure_week	|Week when visitors depare in US
|departure_day_of_week	|Week when visitors depare in US


### Tweet airline dimension table

| **Data Point** | **Explanation** |
| --- | --- | 
|airline_ICAO_code	|Airline ICAO code which showed in tweets
|airline_IATA_code	|Airline IATA code which showed in tweets
|airline_name	|airline name which tweets comment on
|positive_sentiment	|number of positive comments on the airline
|negative_sentiment	|number of negative comments on the airline
|neutral_sentiment	|number of neutral comments on the airline


### Border entry dimension table

| **Data Point** | **Explanation** |
| --- | --- | 
|border_state (Primary Key)	|US state where has immigrant border
|border_port_code	|code of the border
|border_port_name	|name of the border
|border	|border: US-Mexico, US-Canada
|border_entry_date	|date of immigrant enter the border
|border_transportation_measure	|transportation measure  with which immigrant enter border


### Immigrant fact table

| **Data Point** | **Explanation** |
| --- | --- |
|visit_id (PK)	|identifier of visits
|arrival_airline (FK)	|airline (code) with which visitor arrive in US
|arrival_date (FK)	|date when visitors arrive in US
|arrival_state (FK)	|US state where visitors arrive
|airport_entry_count	|entry count: 1
|arrival_airport	|airport where visitors arrive in US
|visa_category	|visa categories:1 = Business , 2 = Pleasure, 3 = Student
|visa_type	 |Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
|origin_country	|Countries where the visitors come grom
|residence_country	|Countries where the visitors live in
|gender	|gender of visitors
|age	|age of visitors



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

The tool I used in this project is Spark. Because Spark can easily handle large dataset while it will also allow us the manage and run the dataset in local computer. While Spark also gives the flexibility to manage the dataset in Cloud by using AWS EMR.

* Propose how often the data should be updated and why.

Ideally the data should be updated daily since every day there will be new visits and immigrant information be stored in the system. Since the data is partitioned by arrival date, so it will stored in small chunks.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
 I would suggest to add more nodes or transfer the data completely to AWS cloud, it can be managed there, then it won't occupy space in your PC.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
 I think either in local PC or AWS, we can schedule to run the dashboard at 7am every day without problem.
 
 * The database needed to be accessed by 100+ people.
 Give people access to read the data in AWS S3 bucket. 