# Project Title
### Data Engineering Capstone Project

#### Project Summary
The aim of this project is to create a data warehouse This will be achieved by merging data from various sources for the purposes of analysis and for use in backend operations in the future.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [17]:
# Do all imports and installs here
import pandas as pd
import re
import os

import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, explode, split, regexp_extract, col, isnan, isnull, desc, when, sum, to_date, desc, regexp_replace, count, to_timestamp
from pyspark.sql.types import IntegerType, TimestampType



### Step 1: Scope the Project and Gather Data

#### Scope 
the gaol is to create an ETL pipeline that combines the I94 immigration dataset with the city temperature data andd demgraphics of each city, to allow users to make queries to see the correlation between destination temperature and immigration statistics and city statistics.

#### Describe and Gather Data 
**I94 Immigration Data** (SAS) The data includes information on visitor arrival, as well as details such as visa type, transportation mode, age groups, states visited, and the primary ports of entry .

**Temperature Data (CSV)** 

**City Demographic**(CSV) This dataset contains information about the demographics of  US cities 

In [37]:
# Read in the data here
df_immig=pd.read_csv('immigration_data_sample.csv',index_col=0).reset_index()
df_demo=pd.read_csv('us-cities-demographics.csv',index_col=0,delimiter=';')
df_airport=pd.read_csv('airport-codes_csv.csv',index_col=0)
de_temp=pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')

In [8]:
df_immig.head()

Unnamed: 0,index,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


### Step 2: Explore and Assess the Data


In [24]:
def calculate_nulls(df): 
    d={}
    for c in df.columns:
        count=df[c].isnull().sum()
        percentage= count/ len(df[c])
        d[c]=[count,percentage]
    return  d

calculate_nulls(df_immig)

{'index': [0, 0.0],
 'cicid': [0, 0.0],
 'i94yr': [0, 0.0],
 'i94mon': [0, 0.0],
 'i94cit': [0, 0.0],
 'i94res': [0, 0.0],
 'i94port': [0, 0.0],
 'arrdate': [0, 0.0],
 'i94mode': [0, 0.0],
 'i94addr': [59, 0.058999999999999997],
 'depdate': [49, 0.049000000000000002],
 'i94bir': [0, 0.0],
 'i94visa': [0, 0.0],
 'count': [0, 0.0],
 'dtadfile': [0, 0.0],
 'visapost': [618, 0.61799999999999999],
 'occup': [996, 0.996],
 'entdepa': [0, 0.0],
 'entdepd': [46, 0.045999999999999999],
 'entdepu': [1000, 1.0],
 'matflag': [46, 0.045999999999999999],
 'biryear': [0, 0.0],
 'dtaddto': [0, 0.0],
 'gender': [141, 0.14099999999999999],
 'insnum': [965, 0.96499999999999997],
 'airline': [33, 0.033000000000000002],
 'admnum': [0, 0.0],
 'fltno': [8, 0.0080000000000000002],
 'visatype': [0, 0.0]}

In [25]:
df_immig.duplicated().sum()

0

In [26]:
df_immig.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
index       1000 non-null int64
cicid       1000 non-null float64
i94yr       1000 non-null float64
i94mon      1000 non-null float64
i94cit      1000 non-null float64
i94res      1000 non-null float64
i94port     1000 non-null object
arrdate     1000 non-null float64
i94mode     1000 non-null float64
i94addr     941 non-null object
depdate     951 non-null float64
i94bir      1000 non-null float64
i94visa     1000 non-null float64
count       1000 non-null float64
dtadfile    1000 non-null int64
visapost    382 non-null object
occup       4 non-null object
entdepa     1000 non-null object
entdepd     954 non-null object
entdepu     0 non-null float64
matflag     954 non-null object
biryear     1000 non-null float64
dtaddto     1000 non-null object
gender      859 non-null object
insnum      35 non-null float64
airline     967 non-null object
admnum      1000 non-null float64
fltno

In [31]:
df_airport.info()

<class 'pandas.core.frame.DataFrame'>
Index: 55075 entries, 00A to ZZZZ
Data columns (total 11 columns):
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(10)
memory usage: 5.0+ MB


In [27]:
calculate_nulls(df_airport)

{'type': [0, 0.0],
 'name': [0, 0.0],
 'elevation_ft': [7006, 0.127208352246936],
 'continent': [27719, 0.50329550612800722],
 'iso_country': [247, 0.0044847934634589196],
 'iso_region': [0, 0.0],
 'municipality': [5676, 0.10305946436677259],
 'gps_code': [14045, 0.25501588742623693],
 'iata_code': [45886, 0.83315478892419426],
 'local_code': [26389, 0.47914661824784383],
 'coordinates': [0, 0.0]}

In [28]:
df_airport.duplicated().sum()

103

#### clean data


In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

spark_immg = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat').persist()
spark_immg.limit(5).toPandas().head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [14]:
spark_immg = spark_immg.withColumn('dtadfile', to_date(col('dtadfile'), format='yyyyMMdd'))\
               .withColumn('dtadddto', to_date(col('dtaddto'), format='MMddyyyy'))

In [7]:
#fixing date issue subtracting the nuber of days to the refarnce

@udf(TimestampType())
def fix_date(x):
    try:
        return pd.to_timedelta(x, unit='D') + pd.Timestamp('1960-1-1')
    except:
        return pd.Timestamp('1900-1-1')

spark_immg = spark_immg.withColumn('arrdate', to_date(fix_date(col('arrdate'))))\
               .withColumn('depdate', to_date(fix_date(col('depdate'))))



In [8]:
spark_immg.limit(5).toPandas().head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,dtadddto
0,6.0,2016.0,4.0,692.0,692.0,XXX,2016-04-29,,,1900-01-01,...,,1979.0,10282016,,,,1897628000.0,,B2,2016-10-28
1,7.0,2016.0,4.0,254.0,276.0,ATL,2016-04-07,1.0,AL,1900-01-01,...,,1991.0,D/S,M,,,3736796000.0,296.0,F1,
2,15.0,2016.0,4.0,101.0,101.0,WAS,2016-04-01,1.0,MI,2016-08-25,...,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2,2016-09-30
3,16.0,2016.0,4.0,101.0,101.0,NYC,2016-04-01,1.0,MA,2016-04-23,...,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2,2016-09-30
4,17.0,2016.0,4.0,101.0,101.0,NYC,2016-04-01,1.0,MA,2016-04-23,...,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2,2016-09-30


In [46]:
toInt = udf(lambda x: int(x) if x!=None else x, IntegerType())

for colname, coltype in spark_immg.dtypes:
    if coltype == 'double':
        spark_immg = spark_immg.withColumn(colname, toInt(colname)) 


In [52]:
spark_immg=spark_immg.drop("insnum")

In [50]:
spark_airport = spark.read.csv('airport-codes_csv.csv', header=True, inferSchema=True)


In [51]:
spark_airport = spark_airport.drop_duplicates()


In [53]:
spark_immg.printSchema() 

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (nulla

In [51]:
#removing s[aces in the col names of demographic csv

df_demo= df_demo.rename(columns={"Male Population": "Male_Population", "Female Population": "Female_Population", 'Median Age':'Median_Age',
                        'Number of Veterans': 'No_of_Veterans', 'Average Household Size':'Avg_Household_Size', 'Foreign-born':'Foreign_born',
                        'Total Population':'Total_Population', 'State Code':'State_Code'})

df_demo.to_csv('us-cities-demographics.csv',mode='w+' )


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

since the purpce is quering the data and doing analytics so we are going to use star schema whch consists of :

1 fact table for immgration (imgration_id, cic_id, arrdate,departure_date, visa , mode,city, state )


5 dim 

**personal** (immi_personal_id, cic_id, citizen_country, residence_country, birth_year, gender, visa_id)

**arirline**(cic_id, airline, admin_num, flight number)
     
**visa**(visa_num, visa_type, visa_post)
       
**temp**(dt, city_code, avg_ temp, avg_temp_uncertnty, year, month)
       
**demograohics**(city_code, state_code, male_population, famale_population, num_veterans, foreign_born, race median age, Average Household Size) 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
run the etl.py file in the terminal

In [None]:
# Write code here

immigration_df = clean_i94_data(file_to_spark_df(sas_filenames[0]))
immigration_df.write \
    .mode("append") \
    .partitionBy("i94port") \
    .parquet("/tables/immigration.parquet")

temp_df = clean_temp_data(csv_to_spark_df("data/GlobalLandTemperaturesByCity.csv"))
temp_df.write \
    .mode("append") \
    .partitionBy("i94port") \
    .parquet("/tables/temperature.parquet")

# Create temporary views of the immigration and temperature data
immigration_df.createOrReplaceTempView("immigration_view")
temp_df.createOrReplaceTempView("temp_view")

# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql("""
SELECT immigration_view.i94yr as year,
       immigration_view.i94mon as month,
       immigration_view.i94cit as city,
       immigration_view.i94port as i94port,
       immigration_view.i94mode as i94mode,
       immigration_view.i94bir as i94bir,
       immigration_view.arrdate as arrival_date,
       immigration_view.depdate as departure_date,
       immigration_view.i94visa as reason,
       temp_view.AverageTemperature as temperature,
FROM immigration_view
JOIN temp_view ON (immigration_view.i94port = temp_view.i94port)
""")

# Write fact table to parquet files partitioned by i94port
fact_table.write \
    .mode("append") \
    .partitionBy("i94port") \

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [59]:
# Perform quality checks here
from pathlib import Path

path=Path('./output/')
for file_dir in path.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        if path!= 'output/.ipynb_checkpoints':
            df = spark.read.parquet(path)
            record_num = df.count()
            if record_num <= 0:
                raise ValueError("This table is empty!")
            else:
                print("Table: " + path.split('/')[-1] + f" is not empty: total {record_num} records.")

Table: fact_immigration is not empty: total 3096313 records.
Table: personal is not empty: total 3096313 records.
Table: demographics is not empty: total 2891 records.
Table: airline is not empty: total 3096313 records.
Table: vis_dim is not empty: total 3096313 records.
Table: dim_temperature is not empty: total 687004 records.


#### 4.3 Data dictionary 
fact table 

            'cicid':
            'i94yr'
            'i94mon' 
            'i94port'
            'i94addr'
            'arrdate' 
            'depdate' 
            'i94mode'
            'i94visa'
            'immigration_id'
         
personal table 

               'cicid'
               'i94cit'
               'i94res'
               'biryear'
               'gender'
               'i94visa'
               'personal_id'
               
               
airline table 

              'cicid'
              'airline'
              'admnum'
              'fltno'
              'immi_airline_id'
              
visa table 

           'Visa_id'
           "visa_type"
           "visapost
           
temperture 

           'dt'
           'AverageTemperature'
           'AverageTemperatureUncertainty'
            'City'
            'Country'
            'year'
            'month'
            
demographics 

             'City'
             'State'
             'Male Population'
             'Female Population'
             'Number of Veterans'
             'Foreign-born'
             'Race'
             'Median Age'
             'Average Household Size'

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### techonlogies used:

python(pandas)
    
pyspark (beacause  Pyspark is a great tool for ETL as it can handle large volumes of data,  fast , works with data formats , easy to use and program

### how often the data should be updated and why:
demoghrphic and temprture data tables should be updated once per year as it doesn't change frequently 
immgration tables should be udated monthly to avoid buliding up of so much data 

### the data was increased by 100x. 
it would be too much to handel on single maching , we need to make distrubuted database, we can also use cloud serives for thos task as AWS Ec2

### The data populates a dashboard that must be updated on a daily basis by 7am every day.
 we can use some pipline platforms as airflow

### the database needed to be accessed by 100+ people.
we can either adjust spark JDBC(derby) setting to allow concurrent reads , or use cloud service as AWS redshift 

In [None]:
import shutil
shutil.make_archive('save', 'zip', '.')