# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 0: Preparation and import data from s3
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
#Before we continue, we need to install related python package.
import sys

!{sys.executable} -m pip install boto3
!{sys.executable} -m pip install s3fs
!{sys.executable} -m pip install pyspark
!{sys.executable} -m pip install cqlsh
!{sys.executable} -m pip install findspark
!{sys.executable} -m pip install pyarrow

You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [1]:
# Do all imports and installs here
import configparser
import pandas as pd
import os
import boto3
import uuid
from pyspark.sql import types as T
from time import sleep

In [2]:
config = configparser.ConfigParser()
config.read('iam.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDS']['AWS_SECRET_ACCESS_KEY']

client=boto3.client('s3')


# Set spark environments
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3'

### Scope the Project and Gather Data

#### Project description:

This project will be separate to multiple parts, and all four dataset will be used. 

Before we talk about the details, we need to know the characteristics of relational DB and non-relational DB.

For relational DB, its characteristics is low redundancy and high completeness, which means it is very suitable for small or medium size data, and the database does not change so much. In our case, we should store temperature, airport code and US cities demographic data into a relational database that meets 3NF because it does not always change so much and the volume of data is not that large.

The final solution will work as a a database management system. When user input the time or time period and the column they interested in (e.g., visa type), the system will return related data as a dataframe. For example, a user needs to know where are the busiest airport for investment visa holder(E-1 visa) in 2016 and its basic information such as temperature, and the status of the city such as population(age, majority race, etc.), or when is the peak-time for international student come to the United States and where are they come from.

* Data will be imported from Amazon S3
* Relational DB will be implement on AWS Redshift
* Non-Relationalship DB will be implement on Amazon Keyspace, and data backup will be stored at S3 as parquet format.
* Data cleaning and ETL process will be implement on Amazon EMR with Spark

#### The dataset is going to use in this project are:

* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. https://travel.trade.gov/research/reports/i94/historical/2016.html is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
* World Temperature Data: This dataset came from Kaggle. You can read more about it here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.
* U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it here: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/.
* Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from here:https://datahub.io/core/airport-codes#data.

In [110]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import SQLContext
from pyspark.sql import types as T
from pyspark import SparkContext

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.driver.memory", "15g")\
.enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [111]:
i94 = pd.read_sas('i94_jan16_sub.sas7bdat', 'sas7bdat',encoding="ISO-8859-1").drop_duplicates()
i94['id_'] = pd.Series([str(uuid.uuid1()) for each in range(len(i94))])
i94['arrival_date'] = pd.to_timedelta(i94['arrdate'],unit='D') + pd.Timestamp('1960-1-1')
i94=spark.createDataFrame(i94)

In [158]:
def mapping_processor(names):
    origin=open('mappings/{}.txt'.format(names),'r')
    code=[]
    name=[]
    for each in origin:
        line=" ".join(each.split())
        try:
            code.append(int(line[:line.index('=')]))
        except:
            code.append(line[1:line.index('=')-1])
        name.append(line[line.index('=')+2:-1])
    origin.close()
    col_code=names+'_code'
    col_name=names+'_name'
    df=pd.DataFrame(list(zip(code,name)),columns=[col_code,col_name])
    df=spark.createDataFrame(df)
    return df

In [159]:
country=mapping_processor('country')
mode=mapping_processor('mode')
port=mapping_processor('port')
us_states=mapping_processor('us_states')
visacode=mapping_processor('visacode')

country.createOrReplaceTempView('country')
mode.createOrReplaceTempView('mode')
port.createOrReplaceTempView('port')
us_states.createOrReplaceTempView('us_states')
visacode.createOrReplaceTempView('visacode')

In [162]:
sql="""SELECT DISTINCT i.i94addr,
              u.us_states_name
       FROM i94 AS i JOIN us_states AS u 
       ON i.i94addr = u.us_states_code"""
temp=spark.sql(sql)
temp.show()

+-------+-----------------+
|i94addr|   us_states_name|
+-------+-----------------+
|     AZ|          ARIZONA|
|     SC|      S. CAROLINA|
|     LA|        LOUISIANA|
|     MN|        MINNESOTA|
|     NJ|       NEW JERSEY|
|     DC|DIST. OF COLUMBIA|
|     OR|           OREGON|
|     99|All Other Codes' |
|     VA|         VIRGINIA|
|     RI|     RHODE ISLAND|
|     KY|         KENTUCKY|
|     WY|          WYOMING|
|     NH|    NEW HAMPSHIRE|
|     MI|         MICHIGAN|
|     NV|           NEVADA|
|     WI|        WISCONSON|
|     ID|            IDAHO|
|     CA|       CALIFORNIA|
|     CT|      CONNECTICUT|
|     NE|         NEBRASKA|
+-------+-----------------+
only showing top 20 rows



In [11]:
sql="""SELECT i94yr AS year,
              i94mon AS month,
              i94cit AS citizenship,
              i94res AS resident,
              i94port AS port,
              arrival_date,
              i94mode AS mode,
              i94addr AS us_state,
              depdate AS depart_date,
              i94bir AS age,
              i94visa visa_category,
              dtadfile AS date_added,
              visapost AS visa_issued_by,
              occup AS occupation,
              entdepa AS arrival_flag,
              entdepd AS depart_flag,
              entdepu AS update_flag,
              matflag AS match_arrival_depart_flag,
              biryear AS birth_year,
              dtaddto AS allowed_date,
              gender,
              insnum AS ins_number,
              airline,
              admnum AS admission_number,
              fltno AS flight_no,
              visatype,
              id_
              FROM i94 AS i;
       """
temp=spark.sql(sql)

In [109]:
#Tmorrow we start from here.

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [6]:
global_land_temperature_url = 's3://srk-data-eng-capstone/GlobalLandTemperaturesByCity.csv'
airport_codes_url = 's3://srk-data-eng-capstone/airport-codes_csv.csv'
us_city_demographics_url = 's3://srk-data-eng-capstone/us-cities-demographics.csv'

In [7]:
global_land_temperature = pd.read_csv(global_land_temperature_url)
us_cities = global_land_temperature['Country']=='United States'
global_land_temperature = global_land_temperature[us_cities]
global_land_temperature = global_land_temperature.drop(['Country'],axis=1)
start_date = '2012-01-01'
end_date = '2012-12-01'
period = (global_land_temperature['dt']>=start_date) & (global_land_temperature['dt']<=end_date)
global_land_temperature=global_land_temperature[period].reset_index()
global_land_temperature['id_'] = pd.Series([uuid.uuid1() for each in range(len(global_land_temperature))])

In [8]:
airport_codes=pd.read_csv(airport_codes_url)
acceptable_airport = ['small_airport','medium_airport','large_airport']
airport_codes = airport_codes[airport_codes['type'].isin(acceptable_airport)].reset_index()
state_code=[]
for each in airport_codes['iso_region']:
    if each[:2] == 'US':
        state_code.append(each[3:])
    else:
        state_code.append(None)
airport_codes['us_code']=pd.Series(state_code)
airport_codes['id_'] = pd.Series([uuid.uuid1() for each in range(len(airport_codes))])

In [9]:
us_city_demographics=pd.read_csv(us_city_demographics_url, sep=';')
us_city_demographics['id_'] = pd.Series([uuid.uuid1() for each in range(len(us_city_demographics))])

In [10]:
us_city_demographics.head()
#The relationship between this table and global temperature table: city=City
#The relationship between this table and nosql table: State Code=i94addr

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count,id_
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924,a09b4d12-f203-11ea-b6e8-acde48001122
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723,a09b4dc6-f203-11ea-bca1-acde48001122
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759,a09b4e18-f203-11ea-ab80-acde48001122
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437,a09b4e48-f203-11ea-9b70-acde48001122
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402,a09b4e70-f203-11ea-a248-acde48001122


In [11]:
airport_codes
#The relationship between nosql and this table: i94port=iata_code
#The relationship between this table and global temperature table: city=municipality
#The relationship between this table and US city demographics: us_code=State Code

Unnamed: 0,index,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,us_code,id_
0,1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",KS,a06bb5f4-f203-11ea-bd8e-acde48001122
1,2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",AK,a06bb69c-f203-11ea-a7da-acde48001122
2,3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",AL,a06bb6d8-f203-11ea-8884-acde48001122
3,5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028",OK,a06bb70a-f203-11ea-a632-acde48001122
4,6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484",AZ,a06bb734-f203-11ea-875b-acde48001122
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
39137,55069,ZYYJ,medium_airport,Yanji Chaoyangchuan Airport,624.0,AS,CN,CN-22,Yanji,ZYYJ,YNJ,,"129.451004028, 42.8828010559",,a0861c1c-f203-11ea-b96e-acde48001122
39138,55070,ZYYK,medium_airport,Yingkou Lanqi Airport,0.0,AS,CN,CN-21,Yingkou,ZYYK,YKH,,"122.3586, 40.542524",,a0861c3a-f203-11ea-8a18-acde48001122
39139,55071,ZYYY,medium_airport,Shenyang Dongta Airport,,AS,CN,CN-21,Shenyang,ZYYY,,,"123.49600219726562, 41.784400939941406",,a0861c4c-f203-11ea-8ad8-acde48001122
39140,55073,ZZ-0002,small_airport,Glorioso Islands Airstrip,11.0,AF,TF,TF-U-A,Grande Glorieuse,,,,"47.296388888900005, -11.584277777799999",,a0861c6c-f203-11ea-b433-acde48001122


In [12]:
global_land_temperature.head()

Unnamed: 0,index,dt,AverageTemperature,AverageTemperatureUncertainty,City,Latitude,Longitude,id_
0,49859,2012-01-01,7.996,0.204,Abilene,32.95N,100.53W,a000210c-f203-11ea-9678-acde48001122
1,49860,2012-02-01,8.434,0.252,Abilene,32.95N,100.53W,a00021b6-f203-11ea-9468-acde48001122
2,49861,2012-03-01,15.628,0.173,Abilene,32.95N,100.53W,a00021f4-f203-11ea-8358-acde48001122
3,49862,2012-04-01,21.069,0.388,Abilene,32.95N,100.53W,a0002224-f203-11ea-8795-acde48001122
4,49863,2012-05-01,24.698,0.323,Abilene,32.95N,100.53W,a000224c-f203-11ea-bdc0-acde48001122


In [14]:
# Performing cleaning tasks here
df = spark.read.parquet('16_jan.parquet.gzip')
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
df = df.withColumn('arrdate',get_date(df.arrdate))
df.createOrReplaceTempView('i94')
sql="""SELECT i94yr AS year,
              i94mon AS month,
              i94cit AS citizenship,
              i94res AS resident,
              i94port AS port,
              arrdate AS arrival_date,
              i94mode AS mode,
              i94addr AS us_state,
              depdate AS depart_date,
              i94bir AS age,
              i94visa visa_category,
              dtadfile AS date_added,
              visapost AS visa_issued_by,
              occup AS occupation,
              entdepa AS arrival_flag,
              entdepd AS depart_flag,
              entdepu AS update_flag,
              matflag AS match_arrival_depart_flag,
              biryear AS birth_year,
              dtaddto AS allowed_date,
              gender,
              insnum AS ins_number,
              airline,
              admnum AS admission_number,
              fltno AS flight_no,
              visatype,
              id_
              FROM i94;
       """
table=spark.sql(sql)
table.show()

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/Users/ishirunkang/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


In [110]:
table.createOrReplaceTempView('i94')
sample=spark.sql('SELECT * FROM i94')
sample.show()

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/Users/ishirunkang/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


needs cleaning:
    citizenship,resident,arrival_date,mode,visa_category,

In [38]:
k=table.select('visatype').collect()
k_list=[k[each].visatype for each in range(len(k))]
k_list

['B2',
 'WT',
 'F1',
 'WB',
 'B1',
 'GMT',
 'E2',
 'CP',
 'F2',
 'E1',
 'I',
 'M1',
 'GMB',
 'I1',
 'M2',
 'CPL',
 'SBP']

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

The data populates a dashboard that must be updated on a daily basis by 7am every day.

In this situation, after 7am, we can import data into a NoSQL database like below:

In [None]:
from cassandra.cluster import Cluster
from ssl import SSLContext, PROTOCOL_TLSv1, CERT_REQUIRED
from cassandra.auth import PlainTextAuthProvider
from cassandra import ConsistencyLevel

ssl_context = SSLContext(PROTOCOL_TLSv1)
ssl_context.load_verify_locations('AmazonRootCA1.pem')
ssl_context.verify_mode = CERT_REQUIRED
auth_provider = PlainTextAuthProvider(username=str(config['APACHE_CASSANDRA_CREDS']['CASSANDRA_USERNAME']), password=str(config['APACHE_CASSANDRA_CREDS']['CASSANDRA_PASSWORD']))
cluster = Cluster(['cassandra.eu-west-1.amazonaws.com'], ssl_context=ssl_context, auth_provider=auth_provider, port=9142)
print('Patient...')
session = cluster.connect()

create_keyspace="""CREATE KEYSPACE IF NOT EXISTS "i94"
                   WITH REPLICATION={'class':'SingleRegionStrategy'}"""
session.execute(create_keyspace)
sleep(10)

create_table="""CREATE TABLE IF NOT EXISTS "i94".i94 (
                                                      year DOUBLE,
                                                      month DOUBLE,
                                                      birth_country DOUBLE,
                                                      resident_country DOUBLE,
                                                      port TEXT,
                                                      arrive_date DOUBLE,
                                                      mode DOUBLE,
                                                      state_code TEXT,
                                                      departure_date DOUBLE,
                                                      age DOUBLE,
                                                      visa DOUBLE,
                                                      date_to_db DOUBLE,
                                                      visa_issued_dep TEXT,
                                                      occupation TEXT,
                                                      arrival_flag TEXT,
                                                      depart_flag TEXT,
                                                      update_flag TEXT,
                                                      match_arrival_depart TEXT,
                                                      birthyear DOUBLE,
                                                      allowed_date TEXT,
                                                      gender TEXT,
                                                      ins_num TEXT,
                                                      airline TEXT,
                                                      admission_number DOUBLE,
                                                      flight_no TEXT,
                                                      visatype TEXT,
                                                      id_ TEXT,
                                                      PRIMARY KEY(id_)
                ) """
session.execute(create_table)
sleep(10)
print('Table well-prepared. you can input data from dataset.')

For non-relational DB, its characteristics is higher elasticity, faster read & write speed and evoving data volume. In our case, we should save I94 data into non-relational DB. Because this piece of data need to make ETL process almost every minutes in real world background, and it need dynamic write and read for real-time data monitoring. 

In [None]:
original_sql="""INSERT INTO "i94".i94 ("cicid","i94yr","i94mon","i94cit","i94res","i94port","arrdate","i94mode","i94addr","depdate",
                              "i94bir","i94visa","count","dtadfile","visapost","occup","entdepa","entdepd","entdepu","matflag",
                              "biryear","dtaddto","gender","insnum","airline","admnum","fltno","visatype","id_")
                              VALUES ({0},{1},{2},{3},{4},'{5}',{6},{7},'{8}',{9},
                                      {10},{11},{12},{13},'{14}','{15}','{16}','{17}','{18}','{19}',
                                      {20},'{21}','{22}','{23}','{24}',{25},'{26}','{27}','{28}')"""

lists=[888,1991,10,999,666,'port_test',9527,777,'addr_test',10,10,10,10,10,"visapost",'occup','entdepa','entdepd',
      'entdepu','mat',1984,'dtaddto','M','insnumber','AerLingus',29,'filtnumber','H1B']
sql=original_sql.format(lists[0],lists[1],lists[2],lists[3],lists[4],lists[5],lists[6],lists[7],lists[8],lists[9],
                       lists[10],lists[11],lists[12],lists[13],lists[14],lists[15],lists[16],lists[17],lists[18],lists[19],
                       lists[20],lists[21],lists[22],lists[23],lists[24],lists[25],lists[26],lists[27],uuid.uuid1())
sql=session.prepare(sql)
sql.consistency_level = ConsistencyLevel.LOCAL_QUORUM
session.execute(sql)

# This part is going to be used in transcript.

# while True:
#     values=input("Insert data. Split values by comma. If data is empty, just input comma. Enter Q for quit.")
#     lists=values.split(',')
#     if len(values) < 28:
#         print('Did you lose something?')
#         continue
#     elif values.lower() == 'Q':
#         print('Quit.')
#         break
#     else:
#         sql=session.format(sql)
#         sql.consistency_level = ConsistencyLevel.LOCAL_QUORUM
#         session.execute(sql.format(lists[0],lists[1],lists[2],lists[3],lists[4],lists[5],lists[6],lists[7],lists[8],lists[9],
#                       lists[10],lists[11],lists[12],lists[13],lists[14],lists[15],lists[16],lists[17],lists[18],lists[19],
#                       lists[20],lists[21],lists[22],lists[23],lists[24],lists[25],lists[26],lists[27],uuid.uuid1()))
#         next_one=input('Done. Do you wish to continue?Y/N')
#         if next_one.lower() == 'y':
#             continue
#         else:
#             print('Thanks. Quit.')
#             break


temp = session.execute('SELECT * FROM i94.i94')
df = pd.DataFrame(temp, columns=['id_','admnum','airline','arrdate','biryear','cicid','count','depdate','dtadtto','dtadfile','entdepa','entdepd','entdepu','fltno','gender','i94addr','i94bir','i94cit','i94mode','i94mon','i94port','i94res','i94visa','i94yr','insnum','matflag','occup','visapost','visatype'])
df.to_parquet('dashboard.parquet.gzip',compression='gzip')

And we run the script above at 7AM everyday to create