# Analysis of Immigration and Temperature data by State
### Data Engineering Capstone Project

#### Project Summary
Main Purpose of this project is analyse the immigration, demographics, temperature and Airport data by State after performing the required all ETL Operation.

In the process of building it I have designed and developed to create an star schema comprising of 4 dimension table and an fact table

Dimension Table Details
1. DIM_IMMIGRATION
2. DIM_TEMPERATURE
3. DIM_DEMOGRAHICS
4. DIM_AIRPORT

FACT TABLE Details
1. FACT_TABLE

ETL Process has the following process

##### Extract
Data is obtained from all open sources 

I94 Immigration data is obtained from following source
        [ImmigrationData](https://travel.trade.gov/research/reports/i94/historical/2016.html)

Temperature Data
        [Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data/data?select=GlobalLandTemperaturesByState.csv)

U.S. City Demographic Data: This data comes from OpenSoft. [Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

Airport Code Table: This is a simple table of airport codes and corresponding cities. [Airport Data](https://datahub.io/core/airport-codes#data)

##### Transform

Data is transformed after cleaning the data

##### Load

After performing the cleaning process data is loaded to dimension table and Fact table. These dataframes are written as parquet file
These files are partitioned by I94 port. so that people who query these data will have faster results

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up




In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import udf

In [2]:
# Create an Spark session to use pyspark as an processing engine
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 

Analyse all 4 sources of dat. Using the above 4 sources create anFact table by combining it. The fact table 

All the stuff has been done using Pyspark operation. In pyspark most of the operations is done by using the Dataframe utilty and SQL operations of pyspark

#### Describe and Gather Data 
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. This is where the data comes from. [Here](https://travel.trade.gov/research/reports/i94/historical/2016.html)
2. World Temperature Data: This dataset came from Kaggle. [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data/data?select=GlobalLandTemperaturesByState.csv)
3. U.S. City Demographic Data: This data comes from OpenSoft. [Here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
4. Airport Code Table: This is a simple table of airport codes and corresponding cities. [Here](https://datahub.io/core/airport-codes#data)

In [3]:
# Read in the data here
demographics_data=spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")
airport_data=spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")
temperature_data=spark.read.format("csv").option("header", "true").load("GlobalLandTemperaturesByState.csv")
immigration_data=spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

In [4]:
# See the data frame values using show functionality
print(demographics_data.show(1))
print(airport_data.show(1))
print(temperature_data.show(1))
print(immigration_data.show(1))

+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|   State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
+-------------+--------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
only showing top 1 row

None
+-----+--------+-----------------+------------+---------+-----------+----------+-

In [5]:
# See the data frame schema
print('Schema of demographics_data are as follows')
print(demographics_data.schema)
print()
print('Schema of airport_data are as follows')
print(airport_data.schema)
print()
print('Schema of temperature_data are as follows')
print(temperature_data.schema)
print()
print('Schema of immigration_data are as follows')
print(immigration_data.schema)

Schema of demographics_data are as follows
StructType(List(StructField(City,StringType,true),StructField(State,StringType,true),StructField(Median Age,StringType,true),StructField(Male Population,StringType,true),StructField(Female Population,StringType,true),StructField(Total Population,StringType,true),StructField(Number of Veterans,StringType,true),StructField(Foreign-born,StringType,true),StructField(Average Household Size,StringType,true),StructField(State Code,StringType,true),StructField(Race,StringType,true),StructField(Count,StringType,true)))

Schema of airport_data are as follows
StructType(List(StructField(ident,StringType,true),StructField(type,StringType,true),StructField(name,StringType,true),StructField(elevation_ft,StringType,true),StructField(continent,StringType,true),StructField(iso_country,StringType,true),StructField(iso_region,StringType,true),StructField(municipality,StringType,true),StructField(gps_code,StringType,true),StructField(iata_code,StringType,true),St

In [6]:
# See the data frame Columns
print('columns of demographics_data are as follows')
print(demographics_data.columns)
print()
print('columns of airport_data are as follows')
print(airport_data.columns)
print()
print('columns of temperature_data are as follows')
print(temperature_data.columns)
print()
print('columns of immigration_data are as follows')
print(immigration_data.columns)

columns of demographics_data are as follows
['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race', 'Count']

columns of airport_data are as follows
['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country', 'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code', 'coordinates']

columns of temperature_data are as follows
['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'State', 'Country']

columns of immigration_data are as follows
['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'validres', 'delete_days', 'delete_mexl', 'delete_dup', 'delete_visa', 'delete_recdup', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype']


### Step 2: Explore and Assess the Data
#### Explore the Data 
Here using the pyspark sql utility I have created each dimension after performing not null check and by taking an distinct distinct values

#### Cleaning Steps
1. DIM_AIRPORT

    1. upper(iso_country) = "US" 
    2. iso_region is not null 
    3. type = 'heliport'
    4. Performed Casting

2. DIM_TEMPERATURE

    1. trim(upper(Country)) = 'UNITED STATES' 
    2. AverageTemperature IS NOT NULL 
    3. AverageTemperatureUncertainty is not null
    4. Performed Casting

3. DIM_DEMOGRAPHICS
    1. Performed Casting

4. DIM_IMMIGRATION
    1. i94addr is not null
    2. i94port not in ('XXX')
    3. Performed Casting

In [7]:
# Read the i94_port.txt and convert it to dictionary
i94_df = pd.read_csv('i94port.txt',sep ='=',names = ['3COdeAbrev','values'],header=None)
i94_df.head()
i94_df['3COdeAbrev'] = i94_df['3COdeAbrev'].str.replace('\t','').str.strip().str.replace("'",'')
i94_df['values'] = i94_df['values'].str.replace('\t','').str.strip().str.replace("'",'')
i94_df['State'] = i94_df['values'].str.split(',').str[0]
i94_df['2CodeAbbrev'] = i94_df['values'].str.split(',').str[-1]
i94_df.head()
i94_spark_df = spark.createDataFrame(i94_df)
i94_spark_df.createOrReplaceTempView('StateCode')

In [8]:
# This is done to convert STATE to the abbreviation which can be used in later stage to derive the state column
us_state_abbrev = {
    'Alabama': 'AL',
    'Alaska': 'AK',
    'Arizona': 'AZ',
    'Arkansas': 'AR',
    'California': 'CA',
    'Colorado': 'CO',
    'Connecticut': 'CT',
    'Delaware': 'DE',
    'Florida': 'FL',
    'Georgia': 'GA',
    'Hawaii': 'HI',
    'Idaho': 'ID',
    'Illinois': 'IL',
    'Indiana': 'IN',
    'Iowa': 'IA',
    'Kansas': 'KS',
    'Kentucky': 'KY',
    'Louisiana': 'LA',
    'Maine': 'ME',
    'Maryland': 'MD',
    'Massachusetts': 'MA',
    'Michigan': 'MI',
    'Minnesota': 'MN',
    'Mississippi': 'MS',
    'Missouri': 'MO',
    'Montana': 'MT',
    'Nebraska': 'NE',
    'Nevada': 'NV',
    'New Hampshire': 'NH',
    'New Jersey': 'NJ',
    'New Mexico': 'NM',
    'New York': 'NY',
    'North Carolina': 'NC',
    'North Dakota': 'ND',
    'Ohio': 'OH',
    'Oklahoma': 'OK',
    'Oregon': 'OR',
    'Pennsylvania': 'PA',
    'Rhode Island': 'RI',
    'South Carolina': 'SC',
    'South Dakota': 'SD',
    'Tennessee': 'TN',
    'Texas': 'TX',
    'Utah': 'UT',
    'Vermont': 'VT',
    'Virginia': 'VA',
    'Washington': 'WA',
    'West Virginia': 'WV',
    'Wisconsin': 'WI',
    'Wyoming': 'WY',
    'District Of Columbia': 'DC',
    'Georgia (State)':'GA'
}
us_state_abbrev_df = pd.Series(us_state_abbrev).to_frame().reset_index()
us_state_abbrev_df.columns = ['State','Abbrev']
us_state_abbrev_df.head()
us_state_abbrev_df_spark = spark.createDataFrame(us_state_abbrev_df)
us_state_abbrev_df_spark.createOrReplaceTempView('us_State_abbrev')

In [9]:
# airport_data
# CReated an SPark view
# using th above created view derived the dimension table
# Once the seclect query is excecuted then the spark dimension view is created
airport_data.createOrReplaceTempView('airport_data')

DIM_AIRPORT = spark.sql(''' 
select distinct  monotonically_increasing_id() as airport_surr_key, cast(trim(name) as string) as name, 
trim(sc.2CodeAbbrev) as state,
cast(elevation_ft as float) as elevation_ft,
cast(coordinates as string) as coordinates
from airport_data ad
join StateCode sc
on trim(upper(cast(substring(ad.iso_region,4,2) as string))) = trim(sc.2CodeAbbrev)
where upper(iso_country) = "US" and iso_region is not null and type = 'heliport' and elevation_ft is not null 
and name is not null and coordinates is not null
''')
DIM_AIRPORT.createOrReplaceTempView('DIM_AIRPORT')
DIM_AIRPORT.show()

+----------------+--------------------+-----+------------+--------------------+
|airport_surr_key|                name|state|elevation_ft|         coordinates|
+----------------+--------------------+-----+------------+--------------------+
|             111|Valdez Hospital H...|   AK|        96.0|-146.345001220703...|
|             266|        Otp Heliport|   AK|        38.0|-150.018563, 70.4...|
|             316|Campbell BLM Heli...|   AK|       235.0|-149.792007446, 6...|
|             354|North Douglas Hel...|   AK|        25.0|-134.496994018554...|
|             596| Big Salmon Heliport|   AK|       292.0|-136.013611, 59.4...|
|            1119|Era Chulitna Rive...|   AK|       960.0|-150.235992431640...|
|            1172|Aviator Hotel Anc...|   AK|       123.0|-149.886482, 61.2...|
|            1190|     Pad-66 Heliport|   AK|        60.0|-149.589004516601...|
|            1402|Marshall Medical ...|   AL|      1024.0|-86.422975, 34.36...|
|            1906|S R P Tolleson Ce...| 

In [10]:
# temperature_data
# CReated an SPark view
# using th above created view derived the dimension table
# Once the seclect query is excecuted then the spark dimension view is created
temperature_data.createOrReplaceTempView('temperature_data')

DIM_TEMPERATURE = spark.sql(''' 
select distinct  monotonically_increasing_id() as temperature_surr_key,
dt, cast(AverageTemperature as float) as AverageTemperature, cast(AverageTemperatureUncertainty as float) as AverageTemperatureUncertainty
, sc.Abbrev as state, Country
from temperature_data td
join us_State_abbrev sc
on trim(upper(cast(td.State as string))) = upper(trim(sc.State))
where trim(upper(Country)) = 'UNITED STATES' AND AverageTemperature IS NOT NULL and AverageTemperatureUncertainty is not null
 ''')
DIM_TEMPERATURE.createOrReplaceTempView('DIM_TEMPERATURE')
DIM_TEMPERATURE.show()

+--------------------+----------+------------------+-----------------------------+-----+-------------+
|temperature_surr_key|        dt|AverageTemperature|AverageTemperatureUncertainty|state|      Country|
+--------------------+----------+------------------+-----------------------------+-----+-------------+
|         42949673032|1756-07-01|            23.134|                        1.811|   NJ|United States|
|         42949673057|1758-09-01|             16.37|                        2.459|   NJ|United States|
|         42949673177|1769-07-01|            23.046|                        5.051|   NJ|United States|
|         42949673333|1784-12-01|            -0.847|                        2.341|   NJ|United States|
|         42949673969|1837-12-01|             0.588|                        4.618|   NJ|United States|
|         42949674247|1861-02-01|              1.72|                        1.185|   NJ|United States|
|         42949674256|1861-11-01|             5.148|                     

In [11]:
# Demographic
# CReated an SPark view
# using th above created view derived the dimension table
# Once the seclect query is excecuted then the spark dimension view is created
demographics_data.createOrReplaceTempView('demographics_data')

DIM_DEMOGRAPHICS = spark.sql(''' 
select distinct  monotonically_increasing_id() as demographics_surr_key,
cast(`Median Age` as int) as Median_Age, cast(`Male Population` as float) as Male_population, 
cast(`Female Population` as float) as Female_Population,
cast(`Total Population` as float) as Total_Population, 
cast(`Number of Veterans` as int) as Num_Of_Verterans,
cast(`Foreign-born` as int) as Foreign_Born,
cast(Abbrev as string ) as state
from demographics_data dd
join us_State_abbrev sc
on trim(upper(cast(dd.State as string))) = upper(trim(sc.State))
where `Median Age` is not null and `Total Population` is not null and `Female Population` is not null 
and cast(`Male Population` as float) is not null
 ''')
DIM_DEMOGRAPHICS.createOrReplaceTempView('DIM_DEMOGRAPHICS')
DIM_DEMOGRAPHICS.show()

+---------------------+----------+---------------+-----------------+----------------+----------------+------------+-----+
|demographics_surr_key|Median_Age|Male_population|Female_Population|Total_Population|Num_Of_Verterans|Foreign_Born|state|
+---------------------+----------+---------------+-----------------+----------------+----------------+------------+-----+
|                   31|        38|        91764.0|          97350.0|        189114.0|           16637|       12691|   AL|
|                   43|        46|       115712.0|         121132.0|        236844.0|           16798|       27207|   AZ|
|                   96|        23|        33224.0|          37093.0|         70317.0|            3667|        4262|   AZ|
|                  107|        39|        62875.0|          65567.0|        128442.0|           11109|        9929|   AZ|
|                  197|        39|        53817.0|          52757.0|        106574.0|            3782|       56640|   CA|
|                  280| 

In [12]:
# immigration
# CReated an SPark view
# using th above created view derived the dimension table
# Once the seclect query is excecuted then the spark dimension view is created
immigration_data.createOrReplaceTempView('immigration_data')

DIM_IMMIGRATION = spark.sql(''' 
select distinct  monotonically_increasing_id() as immigration_surr_key,
i94addr as state,i94yr,i94mon,i94port as city_port_name,cicid,i94visa,i94mode,occup
from immigration_data id 
join us_State_abbrev sc
on trim(upper(cast(id.i94addr as string))) = upper(trim(sc.abbrev))
where i94addr is not null and i94port not in ('XXX') and occup is not null and i94port is not null and i94addr is not null
and i94yr is not null
 ''')
DIM_IMMIGRATION.createOrReplaceTempView('DIM_IMMIGRATION')
DIM_IMMIGRATION.show()

+--------------------+-----+------+------+--------------+---------+-------+-------+-----+
|immigration_surr_key|state| i94yr|i94mon|city_port_name|    cicid|i94visa|i94mode|occup|
+--------------------+-----+------+------+--------------+---------+-------+-------+-----+
|         85899345966|   MN|2016.0|   6.0|           LOS|3549289.0|    3.0|    1.0|  STU|
|        360777252901|   VA|2016.0|   6.0|           BOS|2355046.0|    3.0|    1.0|  STU|
|        420906795016|   KY|2016.0|   6.0|           DAL|1309334.0|    3.0|    1.0|  STU|
|        558345748999|   CA|2016.0|   6.0|           LOS|2131399.0|    3.0|    1.0|  STU|
|        558345749509|   CA|2016.0|   6.0|           LOS|3780229.0|    3.0|    1.0|  STU|
|        558345749632|   CA|2016.0|   6.0|           SFR|4004493.0|    3.0|    1.0|  STU|
|        558345749663|   CA|2016.0|   6.0|           LOS|4188095.0|    3.0|    1.0|  STU|
|        558345749835|   CA|2016.0|   6.0|           SFR|4809535.0|    3.0|    1.0|  GLS|
|        5

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

##### Dimensions

DIM_AIRPORT
```
    airport_surr_key big int,
    name string,
    state string,
    elevation_ft float,
    coordinates string
```

DIM_TEMPERATURE
```
    temperature_surr_key bigint,
    dtdate,
    AverageTemperature float ,
    AverageTemperatureUncertainty float, 
    state varchar, 
    Country varchar
```

DIM_DEMOGRAPHICS
```
    demographics_surr_key big int, 
    Median_Age float, 
    Male_population float,
    Female_Population float, 
    Total_Population float, 
    Num_Of_Verterans float,
    Foreign_Born float, 
    state string
```

DIM_IMMIGRATION
```
    immigration_surr_key Big int,
    state string,
    i94yr float,
    i94mon float,
    city_port_name string,
    cicid float,
    i94visa float,
    i94mode float,
    occup string
```
##### Fact

FINAL_FACT
```
    state varchar,
    i94yr int,
    i94mon int,
    city_port_name varchar,
    Median_Age float, 
    Male_population float,
    Female_Population float, 
    Total_Population float, 
    Num_Of_Verterans float,
    Foreign_Born float
```


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Loaded the following data by partition by state
    * DIM_AIRPORT
    * DIM_TEMPERATURE
    * DIM_DEMOGRAPHICS
    * DIM_IMMIGRATION

Loaded the following data by partition by state
    * FACT_TABLE

In [13]:
# Write the dimensions partitioned by state
print('Load the DIM_AIRPORT')
DIM_AIRPORT.write.partitionBy("state").parquet('Final_output' + '/DIM_AIRPORT',mode='overwrite')
print('Load the DIM_TEMPERATURE')
DIM_TEMPERATURE.write.partitionBy("state").parquet('Final_output' + '/DIM_TEMPERATURE',mode='overwrite')
print('Load the DIM_DEMOGRAPHICS')
DIM_DEMOGRAPHICS.write.partitionBy("state").parquet('Final_output' + '/DIM_DEMOGRAPHICS',mode='overwrite')
print('Load the DIM_IMMIGRATION')
DIM_IMMIGRATION.write.partitionBy("state").parquet('Final_output' + '/DIM_IMMIGRATION',mode='overwrite')

Load the DIM_AIRPORT
Load the DIM_TEMPERATURE
Load the DIM_DEMOGRAPHICS
Load the DIM_IMMIGRATION


In [14]:
# Load the fact table after combining the dimension column
# hERE Specifically considered 2 of the dimesnion table for joining but this can be enhached to all 4 dimension table
FACT_TABLE = spark.sql(
'''
select  
    I.state ,
    i94yr ,
    i94mon ,
    city_port_name ,
    Median_Age , 
    Male_population ,
    Female_Population , 
    Total_Population , 
    Num_Of_Verterans ,
    Foreign_Born   
from DIM_IMMIGRATION I
join DIM_DEMOGRAPHICS D 
ON D.STATE = I.STATE
where i94yr = '2016' AND OCCUP = 'STU' 

''')
FACT_TABLE.createOrReplaceTempView('FACT_TABLE')

In [15]:
# Writing of FACT_TABLE based on Partition on state and month
FACT_TABLE.write.partitionBy("state","i94mon").parquet('Final_output' + '/FACT_TABLE',mode='overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

Created an Data Quality check if an tables are loaded. By checking the count. if count of records is zero then fail out

In [16]:
# Perform quality checks here
def data_quality_check(df):
    '''
    :df :param: Spark dataframe Name
    '''
    
    count = df.count()
    if count == 0:
        print("Data quality check failed for {} with zero records".format(df))
    else:
        print("Data quality check passed for {} with {} records".format(df, count))
    return None

In [17]:
print(data_quality_check(DIM_AIRPORT))
print(data_quality_check(DIM_TEMPERATURE))
print(data_quality_check(DIM_DEMOGRAPHICS))
print(data_quality_check(DIM_IMMIGRATION))
print(data_quality_check(FACT_TABLE))

Data quality check passed for DataFrame[airport_surr_key: bigint, name: string, state: string, elevation_ft: float, coordinates: string] with 88907 records
None
Data quality check passed for DataFrame[temperature_surr_key: bigint, dt: string, AverageTemperature: float, AverageTemperatureUncertainty: float, state: string, Country: string] with 141930 records
None
Data quality check passed for DataFrame[demographics_surr_key: bigint, Median_Age: int, Male_population: float, Female_Population: float, Total_Population: float, Num_Of_Verterans: int, Foreign_Born: int, state: string] with 2875 records
None
Data quality check passed for DataFrame[immigration_surr_key: bigint, state: string, i94yr: double, i94mon: double, city_port_name: string, cicid: double, i94visa: double, i94mode: double, occup: string] with 10232 records
None
Data quality check passed for DataFrame[state: string, i94yr: double, i94mon: double, city_port_name: string, Median_Age: int, Male_population: float, Female_Popula

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
        Used Python, Jupyter Notebook, Spark, Spark SQL(HIVE)
* Propose how often the data should be updated and why.
        Data will be updated once an month as this can be used for analysis by year,month
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
        We can use of redshift, EMR (WIth Multiple Nodes) if data got increased
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
        We can schedule it to run every morning 7 am if neccessary using AIrflow
 * The database needed to be accessed by 100+ people.
        We can host it in AWS Cloud. By increasing the number of Nodes it can be accessed by 100 people