# Project

Author: Weixin Zhang

## Summary
Given US-immigration data, process them and get some insights from the dataset.


### Step 1: Scope the Project and Gather Data

In [14]:
try:
    import findspark
    findspark.init()
except ImportError:
    print("findspark module not found, check if you need it")

In [1]:
import pandas as pd
import boto3
import configparser
import os
from pyspark.sql import SparkSession

In [2]:
config = configparser.ConfigParser()
config.read('configurations/config.cfg')
KEY                    = config.get('AWS','KEY')
SECRETE                 = config.get('AWS','SECRETE')
REGION                 = config.get('AWS', 'REGION')

DB_NAME        = config.get('CLUSTER', 'DB_NAME')
DB_USER        = config.get('CLUSTER', 'DB_USER')
DB_PASSWORD            = config.get('CLUSTER', 'DB_PASSWORD')
DB_PORT                = config.get('CLUSTER', 'DB_PORT')
DWH_ENDPOINT           = config.get('CLUSTER', 'HOST')

IAM_ROLE      = config.get("IAM_ROLE", "ARN")

DEMOGRAPHICS_DATA = config.get('S3', 'DEMOGRAPHICS_DATA')
IMMIGRATION_DATA = config.get('S3', 'IMMIGRATION_DATA')
AIRPORT_DATA = config.get('S3', 'AIRPORT_DATA')

DEMOGRAPHICS_TABLE="demographics"
IMMIGRATION_TABLE="immigration"
AIRPORT_TABLE="airport"

BUCKET='s3://'+config.get('S3', 'BUCKET')+'/'

In [3]:
session = boto3.Session(aws_access_key_id=KEY, aws_secret_access_key=SECRETE)

In [4]:
s3_client=session.client('s3', region_name=REGION)

In [5]:
demographic_data = s3_client.get_object(Bucket=config.get('S3', 'BUCKET'), 
                                       Key=config.get('S3', 'DEMOGRAPHICS_DATA'))
# Read in the data here
demographic_df = pd.read_csv(demographic_data['Body'], delimiter=';')

In [6]:
demographic_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
immigration_data = s3_client.get_object(Bucket=config.get('S3', 'BUCKET'), 
                                       Key=config.get('S3', 'IMMIGRATION_DATA'))
# Read in the data here
immigration_df = pd.read_csv(immigration_data['Body'], delimiter=',')
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [8]:
immigration_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [9]:
airport_data = s3_client.get_object(Bucket=config.get('S3', 'BUCKET'), 
                                       Key=config.get('S3', 'AIRPORT_DATA'))
# Read in the data here
airport_df = pd.read_csv(airport_data['Body'], delimiter=',')
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [10]:
airport_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [11]:
def create_spark_session():
    spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()
    return spark
spark = create_spark_session()

In [12]:
def process_parquet_data(spark, input_data, output_data, input_format='com.github.saurfang.sas.spark', write=True):
    df_spark =spark.read.format(input_format).load(input_data)
    if write:
        df_spark.write.parquet(output_data, mode='overwrite')
    return df_spark

In [13]:
i94_data=[]
for root, dirs, files in os.walk('/data/18-83510-I94-Data-2016/'):
    for file in files:
        print("Read: ", os.path.join(root, file))
        i94=spark.read.format('com.github.saurfang.sas.spark').load(os.path.join(root, file))
        i94_data.append(i94)
# i94_df=spark.read.format('com.github.saurfang.sas.spark').load('/data/18-83510-I94-Data-2016/*.sas7bdat')


Read:  /data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat
Read:  /data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat


In [14]:
if len(i94_data):
    while len(i94_data)>1:
        i94_data[0].union(i94_data.pop(0))


In [15]:
i94_df=i94_data[0]
i94_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Step 2: Explore and Assess the Data
#### Explore the Data 
From the df.count(), we can see some columns have more or less data, it might because some of rows have null or duplicated data.

#### Cleaning Steps
1. Remove NA data
2. Remove duplicated data

In [16]:
def cleanup_data(df, duplicate=True, dropna=True):
    df.drop_duplicates(inplace=True)
    df.dropna(inplace=True)
    return df

In [18]:
demographic_df=cleanup_data(demographic_df)
demographic_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2875 entries, 0 to 2890
Data columns (total 12 columns):
City                      2875 non-null object
State                     2875 non-null object
Median Age                2875 non-null float64
Male Population           2875 non-null float64
Female Population         2875 non-null float64
Total Population          2875 non-null int64
Number of Veterans        2875 non-null float64
Foreign-born              2875 non-null float64
Average Household Size    2875 non-null float64
State Code                2875 non-null object
Race                      2875 non-null object
Count                     2875 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 292.0+ KB


After cleanning data, we only eliminated about 10 rows of data, it is good. 

### Step 3: Define the Data Model
##### Connect to database first

#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

In [19]:
import create_tables
create_tables.main()


        Drop Table If Exists demographics;
    

        Drop Table If Exists immigration;
    

        Drop Table If Exists airport;
    
Create tables successfully


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

##### 3.2.1 Staging data to redshift

In [20]:
import etl
etl.main()


copy demographics
from 's3://us-immigration-data/us-cities-demographics.csv'
iam_role 'arn:aws:iam::332608265013:role/UdacityRedshift'
region 'us-west-2'
format as csv
delimiter as ';'
ignoreheader 1;


copy immigration
from 's3://us-immigration-data/immigration_data_sample.csv'
iam_role 'arn:aws:iam::332608265013:role/UdacityRedshift'
region 'us-west-2'
format as csv
delimiter as ','
ignoreheader 1;


copy airport
from 's3://us-immigration-data/airport-codes_csv.csv'
iam_role 'arn:aws:iam::332608265013:role/UdacityRedshift'
region 'us-west-2'
format as csv
delimiter as ','
ignoreheader 1;



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.
Detail will be in airflow

#### 4.2 Data Quality Checks
Quality check will be conducted in airflow code
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.