# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
import datetime
import numpy as np
import pandas as pd,re
%matplotlib inline
import matplotlib.pyplot as plt

### Step 1: Scope the Project and Gather Data
#### Scope


In this project we will clean and aggregate data immigration Data and make our first diminsion table then we will try to make the second diminsion table Airports . The two datasets will be joined on airports code to form the fact table.
The third diminsion contains informations about city demographics 
```
i94mon  = numeric month
i94cit  = 3 digit code of origin city
i94port = 3 character code of destination USA city
arrdate = arrival date in the USA
i94mode = 1 digit travel code
depdate = departure date from the USA
i94visa = reason for immigration

```
The Seconed diminsion :
```
Airport Name = Airoprt name
State_Code   = State CODE
i94port      = 3 character code of destination USA city
```
The Third diminsion : 
```
City              = City name                    
State             = State                   
Median Age        = Median age if the city               
Male Population   = Male population        
Total Population  = Total population        
Foreign-born      = Number of foreign-born
```
#### Describe and Gather Data 


In [2]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

In [3]:
path="./us-cities-demographics.csv"

In [4]:
df_demo = spark.read.option("sep", ";").csv(path,header=True)

In [5]:
df_demo.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [6]:
df_demo_pd=pd.read_csv(path,delimiter=';')

In [7]:
df_demo_pd.head(20)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [8]:
df_demo_pd.isna().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [9]:
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [10]:
df_demo_pd[df_demo_pd["City"]=="Bay"].head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
1352,Bay,Texas,32.9,37977.0,37508.0,75485,3478.0,13192.0,2.59,TX,Hispanic or Latino,31672
2106,Bay,Texas,32.9,37977.0,37508.0,75485,3478.0,13192.0,2.59,TX,Asian,2819
2351,Bay,Texas,32.9,37977.0,37508.0,75485,3478.0,13192.0,2.59,TX,White,48797
2554,Bay,Texas,32.9,37977.0,37508.0,75485,3478.0,13192.0,2.59,TX,Black or African-American,16782
2860,Bay,Texas,32.9,37977.0,37508.0,75485,3478.0,13192.0,2.59,TX,American Indian and Alaska Native,2167


In [11]:
df_demo.describe().show()

+-------+-------+---------+-----------------+------------------+------------------+------------------+------------------+------------------+----------------------+----------+--------------------+------------------+
|summary|   City|    State|       Median Age|   Male Population| Female Population|  Total Population|Number of Veterans|      Foreign-born|Average Household Size|State Code|                Race|             Count|
+-------+-------+---------+-----------------+------------------+------------------+------------------+------------------+------------------+----------------------+----------+--------------------+------------------+
|  count|   2891|     2891|             2891|              2888|              2888|              2891|              2878|              2878|                  2875|      2891|                2891|              2891|
|   mean|   null|     null|35.49488066413016| 97328.42624653739|101769.63088642659|198966.77931511588| 9367.832522585128|40653.598679638635|

In [12]:
df_i94_port=pd.read_csv('./i94_airport_codes.csv')

In [13]:
df_i94_port.head()

Unnamed: 0.1,Unnamed: 0,i94port_clean,i94_airport_name_clean,i94_state_clean
0,0,ALC,ALCAN,AK
1,1,ANC,ANCHORAGE,AK
2,2,BAR,BAKER AAF - BAKER ISLAND,AK
3,3,DAC,DALTONS CACHE,AK
4,4,PIZ,DEW STATION PT LAY DEW,AK


In [14]:
df_i94_port.drop(['Unnamed: 0'], inplace=True, axis=1)

In [15]:
Valid_ports=list(df_i94_port.i94port_clean.unique())

In [16]:
valid_list=[]

In [17]:
for i in Valid_ports:
    j = i.replace(' ','')
    valid_list.append(j)

In [18]:
df_i94_ports = spark.read.option("sep", ",").csv('i94_airport_codes.csv',header=True)

In [19]:
df_i94_ports.show()

+---+-------------+----------------------+------------------+
|_c0|i94port_clean|i94_airport_name_clean|   i94_state_clean|
+---+-------------+----------------------+------------------+
|  0|          ALC|                 ALCAN|   AK             |
|  1|          ANC|             ANCHORAGE|       AK         |
|  2|          BAR|  BAKER AAF - BAKER...|                AK|
|  3|          DAC|         DALTONS CACHE|           AK     |
|  4|          PIZ|  DEW STATION PT LA...|                AK|
|  5|          DTH|          DUTCH HARBOR|          AK      |
|  6|          EGL|                 EAGLE|   AK             |
|  7|          FRB|             FAIRBANKS|       AK         |
|  8|          HOM|                 HOMER|   AK             |
|  9|          HYD|                 HYDER|   AK             |
| 10|          JUN|                JUNEAU|    AK            |
| 11|          5KE|             KETCHIKAN|                AK|
| 12|          KET|             KETCHIKAN|       AK         |
| 13|   

In [20]:
df_i94_ports.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- i94port_clean: string (nullable = true)
 |-- i94_airport_name_clean: string (nullable = true)
 |-- i94_state_clean: string (nullable = true)



In [21]:
df_i94_ports=df_i94_ports.drop(df_i94_ports['_c0'])

In [25]:
df_i94_ports.show()

+-------------+----------------------+------------------+
|i94port_clean|i94_airport_name_clean|   i94_state_clean|
+-------------+----------------------+------------------+
|          ALC|                 ALCAN|   AK             |
|          ANC|             ANCHORAGE|       AK         |
|          BAR|  BAKER AAF - BAKER...|                AK|
|          DAC|         DALTONS CACHE|           AK     |
|          PIZ|  DEW STATION PT LA...|                AK|
|          DTH|          DUTCH HARBOR|          AK      |
|          EGL|                 EAGLE|   AK             |
|          FRB|             FAIRBANKS|       AK         |
|          HOM|                 HOMER|   AK             |
|          HYD|                 HYDER|   AK             |
|          JUN|                JUNEAU|    AK            |
|          5KE|             KETCHIKAN|                AK|
|          KET|             KETCHIKAN|       AK         |
|          MOS|  MOSES POINT INTER...|                AK|
|          NIK

In [None]:
valid_list_arr=np.array(valid_list)

In [35]:
# Read in the data here
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [36]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

In [37]:
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [38]:
df.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [39]:
def clean_i94_data(immigration_file):
    
    '''
    Input: Path to I94 immigration data file
    
    Output: Spark dataframe of I94 immigration data with valid i94port
    
    '''
    spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

    # Read I94 data into Spark
    spark_df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(immigration_file)
#     vartype = df_immigration.schema
#     df2 = spark.read.format('csv').option('header','True').option('delimiter','|').schema(vartype).load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.csv')

    # Filter out entries where i94port is invalid
    filtered=spark_df_immigration['i94port'].isin(valid_list_arr)

    return spark_df_immigration[filtered].show()

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
For the I94 immigration data, we want to drop all entries where the destination city code i94port is not a valid value (e.g., XXX, 99, etc) as described in I94_SAS_Labels_Description.SAS. we want to drop all entries where demographics is NaN.

In [40]:
df_demo.describe().show(10)

+-------+-------+---------+-----------------+------------------+------------------+------------------+------------------+------------------+----------------------+----------+--------------------+------------------+
|summary|   City|    State|       Median Age|   Male Population| Female Population|  Total Population|Number of Veterans|      Foreign-born|Average Household Size|State Code|                Race|             Count|
+-------+-------+---------+-----------------+------------------+------------------+------------------+------------------+------------------+----------------------+----------+--------------------+------------------+
|  count|   2891|     2891|             2891|              2888|              2888|              2891|              2878|              2878|                  2875|      2891|                2891|              2891|
|   mean|   null|     null|35.49488066413016| 97328.42624653739|101769.63088642659|198966.77931511588| 9367.832522585128|40653.598679638635|

In [41]:
df_demo_pd.isna().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [42]:
df_demo=df_demo.filter(df_demo['Male Population'] != 'NaN')
df_demo=df_demo.filter(df_demo['Female Population'] != 'NaN')
df_demo=df_demo.filter(df_demo['Foreign-born'] != 'NaN')
df_demo=df_demo.filter(df_demo['Total Population'] != 'NaN')

In [None]:
immigration_data = '/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Clean I94 immigration data and store as Spark dataframe
df_immigration = clean_i94_data(immigration_data)

# Extract columns for immigration dimension table
immigration_table = df_immigration.select(["i94yr", "i94mon", "i94cit", "i94port", "arrdate", "i94mode", "depdate", "i94visa","arrdate"])

# Write immigration dimension table to parquet files partitioned by i94port
immigration_table.write.mode("append").partitionBy("i94port").parquet("./results/immigration.parquet")

In [None]:
immigration_table.write.mode("append").partitionBy("i94port").parquet("./results/immigration.parquet")

In [44]:
import pyspark.sql.functions as F

# Extract columns for US Demographics dimension table
demog_table=df_demo.select(F.col("City").alias("City"),
  F.col("State").alias("State"),
  F.col("Median Age").alias("Median_Age"),
  F.col("Male Population").alias("Male_Population"),
  F.col("State Code").alias("State_Code"),
  F.col("Female Population").alias("Female_Population"),
  F.col("Total Population").alias("Total_Population"),
  F.col ("Foreign-born").alias("Foreign_born"))


In [47]:
demog_table.show(5)

+----------------+-------------+----------+---------------+----------+-----------------+----------------+------------+
|            City|        State|Median_Age|Male_Population|State_Code|Female_Population|Total_Population|Foreign_born|
+----------------+-------------+----------+---------------+----------+-----------------+----------------+------------+
|   Silver Spring|     Maryland|      33.8|          40601|        MD|            41862|           82463|       30908|
|          Quincy|Massachusetts|      41.0|          44129|        MA|            49500|           93629|       32935|
|          Hoover|      Alabama|      38.5|          38040|        AL|            46799|           84839|        8229|
|Rancho Cucamonga|   California|      34.5|          88127|        CA|            87105|          175232|       33878|
|          Newark|   New Jersey|      34.6|         138040|        NJ|           143873|          281913|       86253|
+----------------+-------------+----------+-----

In [49]:
# Write Demographics dimension table to parquet files partitioned by City
demog_table.write.mode("append").partitionBy("State_Code").parquet("./results/demographics.parquet")

In [50]:
airports_table=df_i94_ports.select(F.col("i94port_clean").alias("i94port"),
  F.col("i94_airport_name_clean").alias("i94_airport_name"),
  F.col("i94_state_clean").alias("i94_state"))

In [51]:
airports_table.write.mode("append").partitionBy("i94port").parquet("./results/airports.parquet")

In [None]:
df_immigration.createOrReplaceTempView("immigration_view")
df_i94_ports.createOrReplaceTempView("air_ports_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       air_ports_view.i94_airport_name_clean as Airport_name,
       air_ports_view.i94_state_clean as State Code,
       air_ports_view.i94port_clean i94port
FROM immigration_view
JOIN air_ports_view ON (immigration_view.i94port = air_ports_view.i94port)
''')
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

In [None]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The first dimension contains events from the I94 immigration data. The columns below will be extracted from the immigration dataframe:
```
* i94yr = 4 digit year
* i94mon = numeric month
* i94cit = 3 digit code of origin city
* i94port = 3 character code of destination city
* arrdate = arrival date
* i94mode = 1 digit travel code
* depdate = departure date
* i94visa = reason for immigration
```
The seconed dimension will contains the columns of airports code  names and state code:
```
* Airport_name= Airport name
* State Code = 3 digit code of origin city
* i94port = 3 character code of destination city
```
The Third dimension will contains the columns of us-demographics code  names and state code:
```
* City =City name 
* State=State Code
* Median_Age= Median age of the city
* Male_Population = Male Population count
* State_Code = State Code
* Female_Population = Female population count
* Total_Population = total population count
* Foreign_born = number of foreign born
```

#### 3.2 Mapping Out Data Pipelines
1- Clean I94 data as described in step 2 to create Spark dataframe df_immigration for each month
2- Clean demographics data as described in step 2 to create Spark dataframe df_demog
3- Create immigration dimension table by selecting relevant columns from df_immigration and write to parquet file partitioned by i94port
Create demographics dimension table by selecting relevant columns
Create fact table by joining immigration and demographics dimension tables on i94port and write to parquet file partitioned by i94port

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
def quality_check(df, description):
    '''
    Input: Spark dataframe, description of Spark datafram
    
    Output: Print outcome of data quality check
    
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

# Perform data quality check
quality_check(df_immigration, "immigration table")
quality_check(df_i94_ports, "temperature table")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
> Spark wawe could use a scheduling tool such as Airflow to run the ETL pipeline overnight.s chosen since and it can handle multiple file format  (including SAS) containing large amounts of data.Spark SQL used to manipulat large amount of data and doing join operation for tables.
* Propose how often the data should be updated and why.
> The data should be updated monthly 
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 > We couldn't deal with the data a single batch to perfome our analysis as I've tried to use the Race column on the fact table and I ran out of memory , and sure we are working in standalone mode but we could use Spark Cluster mode.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 > we could use a scheduling tool such as Airflow to run the ETL pipeline overnight.
 * The database needed to be accessed by 100+ people.
 > If the database needed to be accessed by 100+ people, we could consider publishing the parquet files to HDFS and giving read access to users that need it