# Data Engineering Capstone Project

#### Project Summary
For this project I pick 2 different main datasets to make analysis which datasets are I94 US Immigration and US city demopgraphic data. I prefer to use python pandas and Apache Spark for data wrangling and data cleansing operations and serving final product of data tables are parquet files  analysis data for data science and analysis teams.


* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project we are creating analysis data for answering set of questions that help our company to increase profit and productivity to further.<br>

Here our questions that we need to get really detailed answers for our analysis ;<br>
What is the reason of coming usa studying , touristic or business ? <br>
What is the population of state that they arrive ?<br>
Which countries people are coming from to Usa ?<br>
What are the age average of the students, tourists or business people ? <br>
What are the top 3 states that tourists visit ?<br>


#### Describe and Gather Data 
We are going to use 2 different data sources and addition "I94_SAS_Labels_Descriptions.SAS" for this project.

I94 Immigration Data: <br>
Data contains of really cool details about  international visitor arrival statistics by world regions. We can answer plenty of questions to make analysis after we complete our work. This data comes from the US National Tourism and Trade Office and this is going to be our core data and we are planning to create our fact table from this immigration dataset then at the end we will going to place our dimension tables around it. You can reach dataset from https://www.trade.gov/national-travel-and-tourism-office

U.S. City Demographic Data: <br>
This data comes from OpenSoft. You can reach dataset from https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

I94_SAS_Labels_Descriptions.SAS:<br>
This file is like dictionary file and I will use this file as my some of  dim table data source


### Step 2: Explore and Assess the Data
#### Explore the Data 



## Importing Required Libraries

In [87]:
import pandas as pd
from utility_functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col , monotonically_increasing_id
from pyspark.sql.types import StructType,StructField, StringType, IntegerType , DateType


In [88]:
# Setting up our spark session and create
spark = SparkSession.builder.\
config("spark.jars.repositories","https://repos.spark-packages.org/").\
config("spark.jars.packages","com.amazonaws:aws-java-sdk:1.7.4,saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2").\
config("spark.hadoop.fs.s3a.path.style.access", True). \
config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"). \
config("com.amazonaws.services.s3.enableV4", True). \
config("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"). \
enableHiveSupport().getOrCreate()


In [89]:
# Defining path of files that we are going to use inside data exploration functions below
path_i94='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
path_demo="./other_datasets/us-cities-demographics.csv"

### Immigration Dataset Exploration

In [90]:
def i94_data_exp(fn_94_part):
    """
    this function accepts f_94_part as file path and return spark datafram
    """
    #just to see whats inside this dataset I read 1 month.
    df_i94 = spark.read.format('com.github.saurfang.sas.spark').load(fn_94_part)
    return df_i94



In [91]:
df_i94=i94_data_exp(path_i94)
df_i94.limit(3).toPandas()
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### i94 Data Dictionary

Column | Description 
-----|-----
cicid|Unique record ID
i94yr|4 digit year
i94mon|Numeric month
i94cit|3 digit code for immigrant country of birth
i94res|3 digit code for immigrant country of residence
i94port|Port
arrdate|Arrival Date in the USA
i94mode|Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
i94addr|State of arrival
depdate|Departure Date
i94bir|Age
i94visa|Visa codes (1 = Business, 2 = Pleasure, 3 = Student)
count|Used for summary statistics
dtadfile|Character Date Field
visapost|Department of State where where Visa was issued
occup|Occupation that will be performed in U.S
entdepa|Arrival Flag - admitted or paroled into the U.S.
entdepd|Departure Flag - Departed, lost I-94 or is deceased
entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence
matflag|Match flag - Match of arrival and departure records
biryear|4 digit year of birth
dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)
gender|Non-immigrant sex
insnum|INS number
airline|Airline used to arrive in U.S.
admnum|Admission Number
fltno|Flight number of Airline used to arrive in U.S.
visatype|Class of admission legally admitting the non-immigrant to temporarily stay in U.S.

### Demographics Dataset Exploration

In [92]:
# loading demographics dataset to our dataset
fn_demo = "./other_datasets/us-cities-demographics.csv"
df_demo = spark.read.csv(fn_demo, inferSchema=True, header=True, sep=';')

In [93]:
# checking 3 rows of demo data
df_demo.limit(3).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759


In [94]:
# printing schema for better view of dataset
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



 ### Demographics Data Dictionary

Column | Description 
-----|-----
City|City Name
State|US State where city is located
Median Age|Median age of the population
Male Population|Count of male population
Female Population|Count of female population
Total Population|Count of total population
Number of Veterans|Count of total Veterans
Foreign born|Count of residents of the city that were not born in the city
Average Household Size|Average city household size
State Code|Code of the US state
Race|Race
Count|Count of city ,  per race


### Extracting Country from Description File - SAS

In [95]:
def extract_country():
    """
    This function has no parameter 
    extracts countries from sas file and return 
    """
    df_country = extract_content("i94cntyl")
    df_country.loc[len(df_country.index)] = ['99', 'All Other Codes']
    df_country['code'] = df_country['code'].astype('int32')
    return df_country

In [96]:
df_country = extract_country()
df_country.head(5)

Unnamed: 0,code,definition
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


### Extracting States from Description File - SAS

In [97]:
def extract_state():
    """
    This function has no parameter 
    extracts states from sas file and return 
     """
    df_state = extract_content("i94addrl")
    df_state.insert(0,'id',range(1,1+len(df_state)))
    return df_state

In [98]:
df_state = extract_state()
df_state.head(5)

Unnamed: 0,id,code,definition
0,1,AL,ALABAMA
1,2,AK,ALASKA
2,3,AZ,ARIZONA
3,4,AR,ARKANSAS
4,5,CA,CALIFORNIA


### Extracting Modes from Description File - SAS

In [99]:
def extract_mode():
    """
    This function has no parameter 
    extracts mode from sas file and return 
     """
    df_mode = extract_content("i94mode")
    df_mode.loc[len(df_mode.index)] = ['-99', 'No Mode Value']
    df_mode['code'] = df_mode['code'].astype('int32')

    return df_mode

In [100]:
df_mode = extract_mode()
df_mode.head(5)

Unnamed: 0,code,definition
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported
4,-99,No Mode Value


### Extracting Ports from Description File - SAS

In [134]:
def extract_port():
    """
    This function has no parameter 
    extracts port from sas file and return 
     """
    df_port = extract_content("$i94prtl")
    df_port.loc[len(df_port.index)] = ['-99', 'No Port Value']
    df_port.insert(0,'id',range(1,1+len(df_port)))
    return df_port

In [135]:
df_port = extract_port()
df_port.head(5)

Unnamed: 0,id,code,definition
0,1,ALC,"ALCAN, AK"
1,2,ANC,"ANCHORAGE, AK"
2,3,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,4,DAC,"DALTONS CACHE, AK"
4,5,PIZ,"DEW STATION PT LAY DEW, AK"


### Extracting Visa Types from Description File - SAS

In [103]:
def extract_visa():
    """
    This function has no parameter 
    extracts visa from sas file and return 
    """
    df_visa = extract_content("I94VISA")
    df_visa.loc[len(df_visa.index)] = ['-99', 'No Visa Value']

    return df_visa

In [104]:
df_visa = extract_visa()
df_visa.head(5)

Unnamed: 0,code,definition
0,1,Business
1,2,Pleasure
2,3,Student
3,-99,No Visa Value


### Combining parquet files to create main dataset

In [105]:
def combining_parquet_files_i94():
# I will in clude just 1 month for the request - if needed other months can be added and the code will collect and combine all of the data together
    mounts = ["jan"]
    fields = ['cicid','i94yr', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 
                  'depdate', 'i94bir', 'entdepa', 'entdepd', 'entdepu', 
                  'biryear', 'dtaddto', 'gender', 'airline', 'visatype','i94visa']

    for month in mounts:
        fn = f'../../data/18-83510-I94-Data-2016/i94_{month}16_sub.sas7bdat'
        # student visa only!
        df_temp = spark.read.format('com.github.saurfang.sas.spark').load(fn)
        df_temp = df_temp.select(fields)

        if month == 'jan':
            df_i94_final = df_temp
        else:
            df_i94_final = df_i94_final.union(df_temp)

    return df_i94_final


In [106]:
df_i94_final = combining_parquet_files_i94()
print('Total Count of i94 dataset : ',df_i94_final.count())

Total Count of i94 dataset :  2847924


### Data Checking & Cleaning Operations


##### Datasets are loaded and ready to data checking and cleaning operations


# i94 immigration data check and cleaning operations

In [107]:
def i94_data_check_cleanse(df_i94):
    print('starting data cleanse opearations of i94 dataset')
    df_i94_final=df_i94
    # Checking duplicated id's inside of df_i94_final
    print(f"Duplicated Cicid Values : {df_i94_final.groupby('cicid').count().filter(col('count')>1).count()}")
    
    df_i94_final = df_i94_final.dropDuplicates(['cicid'])
    print(f"Duplicated Cicid Values : {df_i94_final.groupby('cicid').count().filter(col('count')>1).count()}")
    
    # checking missing values of all columns of cicid cleaned dataset
    print('checking missing values of all columns')
    df_i94_final.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_i94_final.columns]).show()
    
    df_i94_final = df_i94_final.fillna({'i94cit':-99, 'i94mode':-99, 'i94addr':'-99', 'depdate':0, 'i94bir':0, 'entdepa':'', 'entdepd':'',
                              'entdepu':'', 'biryear':0, 'dtaddto':'', 'gender':'', 'airline': '-99', 'i94visa':'-99'})
    
    df_i94_final.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_i94_final.columns]).show()
    
    #Renaming columns of i94_final datset
    df_i94_final = df_i94_final.withColumnRenamed('cicid', 'id') \
        .withColumnRenamed('i94yr', 'year') \
        .withColumnRenamed('i94res', 'country') \
        .withColumnRenamed('i94port', 'port') \
        .withColumnRenamed('i94mode', 'mode') \
        .withColumnRenamed('i94addr', 'state') \
        .withColumnRenamed('i94bir', 'age') \
        .withColumnRenamed('gender', 'gender') \
        .withColumnRenamed('biryear', 'birth_year') \
        .withColumnRenamed('airline', 'air_line') \
        .withColumnRenamed('visatype', 'visa_type') \
        .withColumnRenamed('i94visa', 'visa') \
        .drop("i94cit") \
        .drop("depdate") \
        .drop("dtaddto") \
        .drop("entdepa") \
        .drop("entdepd") \
        .drop("entdepu") \
        .drop("arrdate")
    
    df_i94_final.show(1)
    
    print('data check and cleanse operations completed for i94_datset')
    return df_i94_final

In [108]:
# calling function i94 data cleanse , data check and column rename for use
df_i94_final = i94_data_check_cleanse(df_i94_final)

starting data cleanse opearations of i94 dataset
Duplicated Cicid Values : 0
Duplicated Cicid Values : 0
checking missing values of all columns
+-----+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------+-------+--------+-------+
|cicid|i94yr|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|entdepa|entdepd|entdepu|biryear|dtaddto|gender|airline|visatype|i94visa|
+-----+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------+-------+--------+-------+
|    0|    0|     0|     0|      0|      0|     60| 177129| 522612|  1190|     61| 521813|2847880|   1190|    707|216929|  61279|       0|      0|
+-----+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------+-------+--------+-------+

+-----+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+------

### Demographics data check and cleaning operations

In [109]:
def demographics_data_check_cleanse(df_demo):
    subset_cols = [
    'Male Population',
    'Female Population',
    'Number of Veterans',
    'Foreign-born',
    'Average Household Size'
    ]
    new_df_demo = df_demo.dropna(subset=subset_cols)

    dropped = df_demo.count() - new_df_demo.count()
    print("Dropped rows with missing values: {}".format(dropped))

    # dropping duplicate columns
    df_demo_final = new_df_demo.dropDuplicates(subset=['City', 'State', 'State Code', 'Race'])

    result  = new_df_demo.count() - df_demo_final.count()
    print(f"After dropping duplicates: {result}")
    
    # Grouping data by state then we can connect with state key to our immigratin date
    df_demo_final.createOrReplaceTempView("dm")
    sdf = spark.sql("SELECT dm.* FROM dm where dm.state ='Maryland'")
    sdf.show(3)
    
    # Rename and drop operations for demographic dataframe
    print('Rename and drop columns')
    df_demo_final = df_demo_final.drop('Median Age') \
        .withColumnRenamed('Male Population', 'male_population') \
        .withColumnRenamed('Female Population', 'female_population') \
        .withColumnRenamed('Total Population', 'total_population') \
        .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
        .withColumnRenamed('Foreign-born', 'foreign_born') \
        .withColumnRenamed('Average Household Size', 'average_household_size') \
        .withColumnRenamed('State Code', 'state_code') \
        .drop("Race") \
        .drop("City")
    df_demo_final.show(1)
    
    # Grouping all states and collecting all count data together for the spesific states and we can overview data by State
    print('grouping data with using state_code')
    df_demo_final = df_demo_final.groupby('state_code').agg({ 'male_population':'sum', 
                                                'female_population':'sum', 
                                                'total_population':'sum', 
                                                'number_of_veterans':'sum',
                                                'foreign_born':'sum',
                                                'average_household_size':'mean',
                                               })
    # Lets see what we did 
    df_demo_final.createOrReplaceTempView("dm")
    sdf = spark.sql("SELECT dm.* FROM dm where dm.state_code ='MD'")
    sdf.show(3)
    
    print('after aggrigating column names changed as sum avg titled , Cleaning these titles again')
    df_demo_final = df_demo_final.drop('Median Age') \
        .withColumnRenamed('sum(male_population)', 'male_population') \
        .withColumnRenamed('sum(female_population)', 'female_population') \
        .withColumnRenamed('sum(total_population)', 'total_population') \
        .withColumnRenamed('sum(number_of_veterans)', 'number_of_veterans') \
        .withColumnRenamed('sum(foreign_born)', 'foreign_born') \
        .withColumnRenamed('avg(average_household_size)', 'average_household_size') \
        .withColumnRenamed('State Code', 'state_code') 

    df_demo_final.show(1)
    
    print('Demographic data cleansing and data operations are completed')                                           
    
    
    return df_demo_final
    
    

In [110]:
# calling demo cleaning and data operation function
df_demo_final = demographics_data_check_cleanse(df_demo)

Dropped rows with missing values: 16
After dropping duplicates: 0
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|         City|   State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|       German|Maryland|      34.9|          41115|            43007|           84122|              2443|       27877|                  2.95|        MD|Black or African-...|22273|
|Silver Spring|Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|               Asian| 8841|
|     Columbia|Maryland|      37.9

### Country data check and cleaning operations

In [111]:
def country_data_check_cleanse(country):
    df_country=country
    # in country data there are some dirty values and need to clean or group them  to make data more reliable
    print('cleaning country dataset')
    df_country.drop_duplicates(subset=["code","definition"],keep=False,inplace = True)
    df_country.loc[df_country["definition"].str.contains("INVALID*"), "definition"] = "Other"
    df_country.loc[df_country["definition"].str.contains("No Country*"), "definition"] = "Other"
    df_country.loc[df_country["definition"].str.contains("Collapsed*"), "definition"] = "Other"
    print('cleanse completed')
    return df_country

In [112]:
country_data_check_cleanse(df_country)

cleaning country dataset
cleanse completed


Unnamed: 0,code,definition
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
5,324,ANGOLA
6,529,ANGUILLA
7,518,ANTIGUA-BARBUDA
8,687,ARGENTINA
9,151,ARMENIA


### Port data check and cleaning operations

In [113]:
def port_data_check_cleanse(port):
    df_port=port
    df_port.drop_duplicates(subset=["code","definition"],keep=False,inplace = True)
    df_port.loc[df_port["definition"].str.contains("INVALID*"), "definition"] = "Other"
    df_port.loc[df_port["definition"].str.contains("No Port*"), "definition"] = "Other"
    df_port.loc[df_port["definition"].str.contains("Collapsed*"), "definition"] = "Other"
    return port
    
    

In [114]:
port_data_check_cleanse(df_port)

Unnamed: 0,code,definition
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported
4,-99,No Mode Value


### Visa data check and cleaning operations

In [115]:
def visa_data_check_cleanse(visa):
    df_visa=visa
    df_visa.drop_duplicates(subset=["code","definition"],keep=False,inplace = True)
    print('dropping duplicates of visa dataset')
    return df_visa
    

In [116]:
visa_data_check_cleanse(df_visa)

dropping duplicates of visa dataset


Unnamed: 0,code,definition
0,1,Business
1,2,Pleasure
2,3,Student
3,-99,No Visa Value


### Mode data check and cleaning operations

In [117]:
def mode_data_check_cleanse(mode):
    df_mode = mode
    df_mode.drop_duplicates(subset=["code","definition"],keep=False,inplace = True)
    print('dropping duplicates of mode dataset - mode')
    return df_mode

In [118]:
mode_data_check_cleanse(df_mode)

dropping duplicates of mode dataset - mode


Unnamed: 0,code,definition
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported
4,-99,No Mode Value


### State data check and cleaning operations

In [119]:
def state_data_check_cleanse(state):
    df_state=state
    df_state.drop_duplicates(subset=["code","definition"],keep=False,inplace = True)
    df_state.loc[df_state["definition"].str.contains("INVALID*"), "definition"] = "Other"
    df_state.loc[df_state["definition"].str.contains("No Port*"), "definition"] = "Other"
    df_state.loc[df_state["definition"].str.contains("Collapsed*"), "definition"] = "Other"
    print('dropped duplicates and cleanse columns - state')
    return df_state

In [120]:
state_data_check_cleanse(df_state)

dropped duplicates and cleanse columns - state


Unnamed: 0,id,code,definition
0,1,AL,ALABAMA
1,2,AK,ALASKA
2,3,AZ,ARIZONA
3,4,AR,ARKANSAS
4,5,CA,CALIFORNIA
5,6,CO,COLORADO
6,7,CT,CONNECTICUT
7,8,DE,DELAWARE
8,9,DC,DIST. OF COLUMBIA
9,10,FL,FLORIDA


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Conceptual data model that I designed containes i94 usa immigration data as fact table and directly relations with demographics , port , visa , state , country and mode which are designed as dimension tables.<br>

These are just files but I transform them easy to implement as star Schema because of project concept and needs. Questions create queries focuses on immigration data and I want to replied as fast as we can even data size will be increase in time but based on data architecture we can still able to serve reliable performance.

![title](images/Final_Data_Model.png)

#### 3.2 Mapping Out Data Pipelines

* Loading datasets
* Extracting visa , port , mode , state and country datasets
* Cleaning all datasets
* Creating dims and fact table


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

Build the data pipelines to create the data model.

In [144]:
def converting_state_to_spark(state):
    df_state=state
    # Creating Schema and converting columns    -  STATE
    state_schema = StructType([StructField("id",IntegerType(),True)
                              ,StructField("code",StringType(),True) \
                              ,StructField("definition",StringType(),True)])

    state_spark_df = spark.createDataFrame(df_state,schema=state_schema)
    state_spark_df.printSchema()
    state_spark_df.show()
    return state_spark_df

In [145]:
state_spark_df=converting_state_to_spark(df_state)

root
 |-- id: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- definition: string (nullable = true)

+---+----+-----------------+
| id|code|       definition|
+---+----+-----------------+
|  1|  AL|          ALABAMA|
|  2|  AK|           ALASKA|
|  3|  AZ|          ARIZONA|
|  4|  AR|         ARKANSAS|
|  5|  CA|       CALIFORNIA|
|  6|  CO|         COLORADO|
|  7|  CT|      CONNECTICUT|
|  8|  DE|         DELAWARE|
|  9|  DC|DIST. OF COLUMBIA|
| 10|  FL|          FLORIDA|
| 11|  GA|          GEORGIA|
| 12|  GU|             GUAM|
| 13|  HI|           HAWAII|
| 14|  ID|            IDAHO|
| 15|  IL|         ILLINOIS|
| 16|  IN|          INDIANA|
| 17|  IA|             IOWA|
| 18|  KS|           KANSAS|
| 19|  KY|         KENTUCKY|
| 20|  LA|        LOUISIANA|
+---+----+-----------------+
only showing top 20 rows



In [146]:
def converting_country_to_spark(country):
    df_country = country
    # Creating Schema and converting columns    -  Country
    country_schema = StructType([StructField("code",StringType(),True) \
                              ,StructField("definition",StringType(),True)])

    country_spark_df = spark.createDataFrame(df_country,schema=country_schema)
    country_spark_df.printSchema()
    country_spark_df.show()
    return country_spark_df

In [147]:
country_spark_df=converting_country_to_spark(df_country)

root
 |-- code: string (nullable = true)
 |-- definition: string (nullable = true)

+----+--------------------+
|code|          definition|
+----+--------------------+
| 582|MEXICO Air Sea, a...|
| 236|         AFGHANISTAN|
| 101|             ALBANIA|
| 316|             ALGERIA|
| 102|             ANDORRA|
| 324|              ANGOLA|
| 529|            ANGUILLA|
| 518|     ANTIGUA-BARBUDA|
| 687|           ARGENTINA|
| 151|             ARMENIA|
| 532|               ARUBA|
| 438|           AUSTRALIA|
| 103|             AUSTRIA|
| 152|          AZERBAIJAN|
| 512|             BAHAMAS|
| 298|             BAHRAIN|
| 274|          BANGLADESH|
| 513|            BARBADOS|
| 104|             BELGIUM|
| 581|              BELIZE|
+----+--------------------+
only showing top 20 rows



In [148]:
def converting_mode_to_spark(mode):
    df_mode=mode
    # Creating Schema and converting columns    -  Mode
    mode_schema = StructType([StructField("code",StringType(),True) \
                              ,StructField("definition",StringType(),True)])

    mode_spark_df = spark.createDataFrame(df_mode,schema=mode_schema)
    mode_spark_df.printSchema()
    mode_spark_df.show()
    return mode_spark_df

In [149]:
mode_spark_df=converting_mode_to_spark(df_mode)

root
 |-- code: string (nullable = true)
 |-- definition: string (nullable = true)

+----+-------------+
|code|   definition|
+----+-------------+
|   1|          Air|
|   2|          Sea|
|   3|         Land|
|   9| Not reported|
| -99|No Mode Value|
+----+-------------+



In [150]:
def converting_visa_to_spark(visa):
    df_visa=visa
    # Creating Schema and converting columns    -  Visa
    visa_schema = StructType([StructField("code",StringType(),True) \
                              ,StructField("definition",StringType(),True)])

    visa_spark_df = spark.createDataFrame(df_visa,schema=visa_schema)
    visa_spark_df.printSchema()
    visa_spark_df.show()
    return visa_spark_df
    

In [151]:
visa_spark_df=converting_visa_to_spark(df_visa)

root
 |-- code: string (nullable = true)
 |-- definition: string (nullable = true)

+----+-------------+
|code|   definition|
+----+-------------+
|   1|     Business|
|   2|     Pleasure|
|   3|      Student|
| -99|No Visa Value|
+----+-------------+



In [152]:
def converting_port_to_spark(port):
    df_port=port
    # Creating Schema and converting columns    -  Port
    port_schema = StructType([StructField("id",IntegerType(),True)\
                              ,StructField("code",StringType(),True) \
                              ,StructField("definition",StringType(),True)])

    port_spark_df = spark.createDataFrame(df_port,schema=port_schema)
    port_spark_df.printSchema()
    port_spark_df.show()
    return port_spark_df

In [153]:
port_spark_df=converting_port_to_spark(df_port)

root
 |-- id: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- definition: string (nullable = true)

+---+----+--------------------+
| id|code|          definition|
+---+----+--------------------+
|  1| ALC|           ALCAN, AK|
|  2| ANC|       ANCHORAGE, AK|
|  3| BAR|BAKER AAF - BAKER...|
|  4| DAC|   DALTONS CACHE, AK|
|  5| PIZ|DEW STATION PT LA...|
|  6| DTH|    DUTCH HARBOR, AK|
|  7| EGL|           EAGLE, AK|
|  8| FRB|       FAIRBANKS, AK|
|  9| HOM|           HOMER, AK|
| 10| HYD|           HYDER, AK|
| 11| JUN|          JUNEAU, AK|
| 12| 5KE|       KETCHIKAN, AK|
| 13| KET|       KETCHIKAN, AK|
| 14| MOS|MOSES POINT INTER...|
| 15| NIK|         NIKISKI, AK|
| 16| NOM|             NOM, AK|
| 17| PKC|     POKER CREEK, AK|
| 18| ORI|  PORT LIONS SPB, AK|
| 19| SKA|         SKAGWAY, AK|
| 20| SNP| ST. PAUL ISLAND, AK|
+---+----+--------------------+
only showing top 20 rows



In [154]:
# Creating Schema and converting columns    -  i94 Immigration 

df_i94_final.printSchema()
df_i94_final.show(10)

root
 |-- id: double (nullable = true)
 |-- year: double (nullable = true)
 |-- country: double (nullable = true)
 |-- port: string (nullable = true)
 |-- mode: double (nullable = false)
 |-- state: string (nullable = false)
 |-- age: double (nullable = false)
 |-- birth_year: double (nullable = false)
 |-- gender: string (nullable = false)
 |-- air_line: string (nullable = false)
 |-- visa_type: string (nullable = true)
 |-- visa: double (nullable = true)

+------+------+-------+----+----+-----+----+----------+------+--------+---------+----+
|    id|  year|country|port|mode|state| age|birth_year|gender|air_line|visa_type|visa|
+------+------+-------+----+----+-----+----+----------+------+--------+---------+----+
| 299.0|2016.0|  103.0| BOS| 1.0|   RI|25.0|    1991.0|     M|      LX|       F1| 3.0|
| 305.0|2016.0|  103.0| FTL| 1.0|   FL|24.0|    1992.0|     M|      BW|       F1| 3.0|
| 558.0|2016.0|  104.0| PHI| 1.0|   CA|22.0|    1994.0|     F|      BA|       WT| 2.0|
| 596.0|2016.0| 

In [155]:
df_demo_final.printSchema()
df_demo_final.show(10)

root
 |-- state_code: string (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- total_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- number_of_veterans: long (nullable = true)

+----------+------------+---------------+----------------------+----------------+-----------------+------------------+
|state_code|foreign_born|male_population|average_household_size|total_population|female_population|number_of_veterans|
+----------+------------+---------------+----------------------+----------------+-----------------+------------------+
|        AZ|     3411565|       11137275|    2.7743749999999996|        22497710|         11360435|           1322525|
|        SC|      134019|        1265291|     2.469583333333333|         2586976|          1321685|            163334|
|        LA|      417095|        3134990|                 2.465|         65

In [156]:
#  exporting files into parquet

In [157]:
def exporting_final_parquet_files():
    #Export the demographics table.
    print('Writing Demographic parquet files to /data_output/demographics_data')
    df_demo_final.write.mode("overwrite").parquet("./data_output/demographics_data")
    print('Demographic data writing completed')
    
    print('Writing i94 parquet files to /data_output/i94_data')
    df_i94_final.write.mode("overwrite").parquet("./data_output/i94_data")
    print('i94 data writing completed')
    
    print('Writing Port parquet files to /data_output/port_data')
    port_spark_df.write.mode("overwrite").parquet("./data_output/port_data")
    print('Port data writing completed')
    
    print('Writing Visa parquet files to /data_output/visa_data')
    visa_spark_df.write.mode("overwrite").parquet("./data_output/visa_data")
    print('Visa data writing completed')
    
    print('Writing State parquet files to /data_output/state_data')
    state_spark_df.write.mode("overwrite").parquet("./data_output/state_data")
    print('State data writing completed')
    
    print('Writing Country parquet files to /data_output/country_data')
    country_spark_df.write.mode("overwrite").parquet("./data_output/country_data")
    print('Country data writing completed')
    
    print('Writing Mode parquet files to /data_output/mode_data')
    mode_spark_df.write.mode("overwrite").parquet("./data_output/mode_data")
    print('Mode data writing completed')

In [158]:
# calling function that export all datasets into parquet files inside ./data/ with sub folders
exporting_final_parquet_files()

Writing Demographic parquet files to /data_output/demographics_data
Demographic data writing completed
Writing i94 parquet files to /data_output/i94_data
i94 data writing completed
Writing Port parquet files to /data_output/port_data
Port data writing completed
Writing Visa parquet files to /data_output/visa_data
Visa data writing completed
Writing State parquet files to /data_output/state_data
State data writing completed
Writing Country parquet files to /data_output/country_data
Country data writing completed
Writing Mode parquet files to /data_output/mode_data
Mode data writing completed


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [109]:
# Last clean product like below , and getting all tables count 
df_i94_final.show(10,truncate=False)
df_i94_final.count()

+------+------+-------+----+----+-----+----+----------+------+--------+---------+----+
|id    |year  |country|port|mode|state|age |birth_year|gender|air_line|visa_type|visa|
+------+------+-------+----+----+-----+----+----------+------+--------+---------+----+
|299.0 |2016.0|103.0  |BOS |1.0 |RI   |25.0|1991.0    |M     |LX      |F1       |3.0 |
|305.0 |2016.0|103.0  |FTL |1.0 |FL   |24.0|1992.0    |M     |BW      |F1       |3.0 |
|558.0 |2016.0|104.0  |PHI |1.0 |CA   |22.0|1994.0    |F     |BA      |WT       |2.0 |
|596.0 |2016.0|104.0  |SEA |1.0 |OR   |49.0|1967.0    |M     |DL      |WB       |1.0 |
|692.0 |2016.0|104.0  |NEW |1.0 |NY   |19.0|1997.0    |M     |UA      |F1       |3.0 |
|1051.0|2016.0|107.0  |ATL |1.0 |FL   |58.0|1958.0    |M     |KL      |B1       |1.0 |
|1761.0|2016.0|107.0  |SPM |1.0 |MN   |21.0|1995.0    |F     |DL      |F1       |3.0 |
|3901.0|2016.0|111.0  |NYC |1.0 |NY   |39.0|1977.0    |F     |AF      |WT       |2.0 |
|3980.0|2016.0|111.0  |PHI |1.0 |FL   |22.0

2847924

In [110]:
df_demo_final.show(10,truncate=False)
df_demo_final.count()

+----------+------------+---------------+----------------------+----------------+-----------------+------------------+
|state_code|foreign_born|male_population|average_household_size|total_population|female_population|number_of_veterans|
+----------+------------+---------------+----------------------+----------------+-----------------+------------------+
|AZ        |3411565     |11137275       |2.7743749999999996    |22497710        |11360435         |1322525           |
|SC        |134019      |1265291        |2.469583333333333     |2586976         |1321685          |163334            |
|LA        |417095      |3134990        |2.465                 |6502975         |3367985          |348855            |
|MN        |1069888     |3478803        |2.496851851851851     |7044165         |3565362          |321738            |
|NJ        |2327750     |3423033        |2.9608771929824558    |6931024         |3507991          |146632            |
|DC        |475585      |1598525        |2.24   

48

In [112]:
port_spark_df.show(5,truncate=False)
port_spark_df.count()

+---+----+----------------------------+
|id |code|definition                  |
+---+----+----------------------------+
|1  |ALC |ALCAN, AK                   |
|2  |ANC |ANCHORAGE, AK               |
|3  |BAR |BAKER AAF - BAKER ISLAND, AK|
|4  |DAC |DALTONS CACHE, AK           |
|5  |PIZ |DEW STATION PT LAY DEW, AK  |
+---+----+----------------------------+
only showing top 5 rows



661

In [114]:
visa_spark_df.show(5,truncate=False)
visa_spark_df.count()

+----+-------------+
|code|definition   |
+----+-------------+
|1   |Business     |
|2   |Pleasure     |
|3   |Student      |
|-99 |No Visa Value|
+----+-------------+



4

In [115]:
state_spark_df.show(5,truncate=False)
state_spark_df.count()

+---+----+----------+
|id |code|definition|
+---+----+----------+
|1  |AL  |ALABAMA   |
|2  |AK  |ALASKA    |
|3  |AZ  |ARIZONA   |
|4  |AR  |ARKANSAS  |
|5  |CA  |CALIFORNIA|
+---+----+----------+
only showing top 5 rows



55

In [116]:
country_spark_df.show(5,truncate=False)
country_spark_df.count()

+----+---------------------------------------------------------+
|code|definition                                               |
+----+---------------------------------------------------------+
|582 |MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|
|236 |AFGHANISTAN                                              |
|101 |ALBANIA                                                  |
|316 |ALGERIA                                                  |
|102 |ANDORRA                                                  |
+----+---------------------------------------------------------+
only showing top 5 rows



290

In [117]:
mode_spark_df.show(5,truncate=False)
mode_spark_df.count()

+----+-------------+
|code|definition   |
+----+-------------+
|1   |Air          |
|2   |Sea          |
|3   |Land         |
|9   |Not reported |
|-99 |No Mode Value|
+----+-------------+



5

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

# df_i94_final

Column | Description 
-----|-----
id|record id for each column 
year| 4 digit of year 
country | immigrants residence country
port| port information 
mode| which way to immigrant come to usa by air , sea ,land or not reported 
state| state that immigrant come to usa
age| age 
birth_year| year of birth
gender| gender of immigrant
air_line| which airline while coming to usa
visa_type| visa type for immigrant 
visa|visa (business , student , pleasure)


# df_demo_final

|state_code|foreign_born|male_population|average_household_size|total_population|female_population|number_of_veterans|

Column | Description 
-----|-----
state_code|state code
foreign_born| total count of foreign born in that state
male_population|total count of foreign male population in that state
average_household_size|Average of household size in that state
total_population|total count of population in that state
female_population|total count of  female popuplation in that state
number_of_veterans|total count of veterans in that state

# mode_spark_df
Column | Description 
-----|-----
code|mode code 
definition| definition stands for mode , how immigrant come to usa by air , land or sea

# country_spark_df
Column | Description 
-----|-----
code| country code
definition| country name

# state_spark_df
Column | Description 
-----|-----
id|record id 
code|code of state
definition| name of state

# visa_spark_df

Column | Description 
-----|-----
code|record if
definition| visa definition student ,business or pleasure

# port_spark_df
Column | Description 
-----|-----
id|record id 
code|code of port
definition| name of port

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Clearly state the rationale for the choice of tools and technologies for the project.
In this capstone project I mostly prefer to use apache spark and python pandas depending size on data. I like to use apache spark because of high read and transformation speed it's really easy to explore , clean and transform data with using Spark SQL is giving good advantage of time  and flexibility. After creating temporary views it's like normal sql operations but with high speed and big volume on data.

#### Propose how often the data should be updated and why.
The i94 immigration data should be updated once every 2 month and Demographic data should be updated every 6 month. We can monitor data increase size and compare them with every update after that we may update more often or later.


#### The data was increased by 100x.
Increased data size means we need more power of processing , storage and more complex solutions. I would prefer Amazon redshift clusters with increased node capacity depending on my workload and I would continue with using spark for my operations I will start to use EMR to able to store these kind of big data I go with HDFS too.

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
My first prefence will be definetly Apache Airflow but there are some other products exists too. I will implement elt steps by creating DAGs. like we did in airflow project I can create dags which can be run at 7 am  with daily scheduled and notifications can be added aswell by configurating our dag we can do that too.

#### The database needed to be accessed by 100+ people
I understand that this data is valuable for more people and we should serve our data product with using data warehouse and business intelligence tools. I prefer to use postgres with using amazon redshift and personally I prefer oracle business intelligence or power bi for the analytic reporting tools.