### Data Engineering Capstone Project

#### Project Summary
The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when
from pyspark.sql.types import IntegerType,BooleanType,DateType
import os


### Step 1: Scope the Project and Gather Data

#### Scope 
**Data Information**
<br>-This project is to analyze US Immigration data in April 2016 and detect which factors that affects immigrants numbers in US
<br>-Also study Average global temperature over the last 100 years by country and city then trying to find its relation with immigration data 
<br>I also used demograpic data to get information about US states and cities as well as total population
<br>-In addition to that I used airport codes to get insights about most visited airport by immigrants.
<br>I've aggregated data by state and month as I'm tagetting us states in my project
<br>**End Solution will be**
<br>1- CSV files for pipelined tables
<br>2-Data Visuals in Power BI to get some visual insights
<br>**Tools**
<br>-I'm using PySpark for immigration data,Pandas for other data and Power BI for visualization


#### Describe and Gather Data 
<br>**1-I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. I'm using data of imimgrants in April,2016. it comes from [here](https://travel.trade.gov/research/reports/i94/historical/2016.html)
<br>**2-World Temperature Data**: This dataset came from Kaggle from [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). 
<br>**3-U.S. City Demographic Data**: This data comes from OpenSoft from [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). 
<br>**4-Airport Code Table**: This is a simple table of airport codes and corresponding cities from [here](https://datahub.io/core/airport-codes#data). 
<br>**5-SAS Country Codes**:This is a simple table contains SAS country code and its corresponding country name. It's extracted from I94 immigration data dictionary
<br>**6-SAS Cities Codes**:This is a simple table contains SAS City/Port code and its corresponding City name and country or state code. It's extracted from I94 immigration data dictionary
<br>**7-Visa type**:This is a simple table contains immigrants visa type code and its corresponding type name whether student,business or pleasure . It's extracted from I94 immigration data dictionary
<br>**8-Travel mode**:This is a simple table contains immigrants travel mode code and its corresponding mode name whether by Air,Sea,Land or Not Specified. It's extracted from I94 immigration data dictionary

##### 1- Read Demographic csv file

In [264]:
# Read in the data here
demoDf=pd.read_csv("us-cities-demographics.csv",sep=';')
demoDf.tail()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
2886,Stockton,California,32.5,150976.0,154674.0,305650,12822.0,79583.0,3.16,CA,American Indian and Alaska Native,19834
2887,Southfield,Michigan,41.6,31369.0,41808.0,73177,4035.0,4011.0,2.27,MI,American Indian and Alaska Native,983
2888,Indianapolis,Indiana,34.1,410615.0,437808.0,848423,42186.0,72456.0,2.53,IN,White,553665
2889,Somerville,Massachusetts,31.0,41028.0,39306.0,80334,2103.0,22292.0,2.43,MA,American Indian and Alaska Native,374
2890,Coral Springs,Florida,37.2,63316.0,66186.0,129502,4724.0,38552.0,3.17,FL,White,90896


##### 2- Read Airoport codes cvs file

In [265]:
airportDf=pd.read_csv("airport-codes_csv.csv")
airportDf.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### 3- Read Global Temperature csv file

In [266]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
GTempDf = pd.read_csv(fname)
GTempDf.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [267]:
GTempDf.dt.max()

'2013-09-01'

In [268]:
GTempDf.Country.unique()

array(['Denmark', 'Turkey', 'Kazakhstan', 'China', 'Spain', 'Germany',
       'Nigeria', 'Iran', 'Russia', 'Canada', "Côte D'Ivoire",
       'United Kingdom', 'Saudi Arabia', 'Japan', 'United States', 'India',
       'Benin', 'United Arab Emirates', 'Mexico', 'Venezuela', 'Ghana',
       'Ethiopia', 'Australia', 'Yemen', 'Indonesia', 'Morocco',
       'Pakistan', 'France', 'Libya', 'Burma', 'Brazil', 'South Africa',
       'Syria', 'Egypt', 'Algeria', 'Netherlands', 'Malaysia', 'Portugal',
       'Ecuador', 'Italy', 'Uzbekistan', 'Philippines', 'Madagascar',
       'Chile', 'Belgium', 'El Salvador', 'Romania', 'Peru', 'Colombia',
       'Tanzania', 'Tunisia', 'Turkmenistan', 'Israel', 'Eritrea',
       'Paraguay', 'Greece', 'New Zealand', 'Vietnam', 'Cameroon', 'Iraq',
       'Afghanistan', 'Argentina', 'Azerbaijan', 'Moldova', 'Mali',
       'Congo (Democratic Republic Of The)', 'Thailand',
       'Central African Republic', 'Bosnia And Herzegovina', 'Bangladesh',
       'Switzerland'

##### 4-read cities code csv file

In [269]:
citiescodesDF=pd.read_csv("cities_code.csv")
citiescodesDF.head()

Unnamed: 0,code,city,country/state code
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [270]:
citiescodesDF.count()

code                  591
city                  591
country/state code    583
dtype: int64

##### 5- read immigration data sas7bdat file for April 2016

In [271]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')


In [272]:
df_spark.take(5)

[Row(cicid=7.0, i94yr=2016.0, i94mon=1.0, i94cit=101.0, i94res=101.0, i94port='BOS', arrdate=20465.0, i94mode=1.0, i94addr='MA', depdate=None, i94bir=20.0, i94visa=3.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu=None, matflag=None, biryear=1996.0, dtaddto='D/S', gender='M', insnum=None, airline='LH', admnum=346608285.0, fltno='424', visatype='F1'),
 Row(cicid=8.0, i94yr=2016.0, i94mon=1.0, i94cit=101.0, i94res=101.0, i94port='BOS', arrdate=20465.0, i94mode=1.0, i94addr='MA', depdate=None, i94bir=20.0, i94visa=3.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu=None, matflag=None, biryear=1996.0, dtaddto='D/S', gender='M', insnum=None, airline='LH', admnum=346627585.0, fltno='424', visatype='F1'),
 Row(cicid=9.0, i94yr=2016.0, i94mon=1.0, i94cit=101.0, i94res=101.0, i94port='BOS', arrdate=20469.0, i94mode=1.0, i94addr='CT', depdate=20480.0, i94bir=17.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=N

In [273]:
df_spark.count()

2847924

In [274]:
df_spark.show(1)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------------+-----+--------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|    D/S|     M|  null|     LH|3.46608285E8|  424|      F1|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+----

##### 6-Read SAS countries codes csv file

In [275]:
countrcodesDF=pd.read_csv("SAS_CountrCode.csv")
countrcodesDF.head()

Unnamed: 0,SAS_CountCode,Country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


##### 7- Read Travel modes cvs file

In [276]:
travelmodesDF=pd.read_csv("travelmode.csv")
travelmodesDF.head()

Unnamed: 0,mode_id,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not Specified


##### 8- Read visa type cvs file

In [277]:
visatypesDF=pd.read_csv("visa_type.csv")
visatypesDF.head()

Unnamed: 0,Visa_id,Type
0,1,Business
1,2,Pleasure
2,3,Student


### Step 2: Explore and Assess the Data


#### 1-Cleaning US Demography Table

In [301]:
demoDf.head(1)

Unnamed: 0,State,StateCode,MalePopulation,FemalePopulation,TotalPopulation,AverageHouseholdSize
0,Alabama,AL,2448200.0,2715106.0,5163306,2.43


In [279]:
demoDf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


In [280]:
demoDf.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


##### Creating cleandemographyDF

In [281]:
def cleandemographyDF(demoDf):
    ##### group by State
    demoDf=demoDf.groupby(['State','State Code'], as_index=False,).agg({'Male Population':'sum','Female Population':'sum','Total Population':'sum'\
                            ,'Average Household Size':'mean' })
    ##### removing space from columns name
    demoDf.columns=demoDf.columns.str.replace(' ','')
    return demoDf

#### 2-Cleaning Airport code

In [300]:
airportDf.head(1)

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iata_code,coordinates,state_code
223,03N,small_airport,Utirik Airport,4.0,MH,UTK,"169.852005, 11.222",UTI


In [283]:
airportDf.describe()

Unnamed: 0,elevation_ft
count,48069.0
mean,1240.789677
std,1602.363459
min,-1266.0
25%,205.0
50%,718.0
75%,1497.0
max,22000.0


In [284]:
airportDf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


##### create cleanAirportDF function

In [285]:
def cleanAirportDF(airportDf):
    ##### drop 'iata_code','gps_code','local_code','continent' columns
    airportDf.drop(columns=['gps_code','local_code','continent','municipality'],inplace=True) 
    
    ##### extract state code from iso_region into new column state_code
    airportDf["state_code"]=airportDf.iso_region.str.split(pat='-',expand=True)[1]
    airportDf.drop(columns=['iso_region'],inplace=True)
    
    ##### drop rows with null in iso_country
    airportDf.dropna(subset=['iso_country'],inplace=True)
    
    ##### Drop nulls in iata_code as it represents the airport codes
    airportDf.dropna(inplace=True,subset=["iata_code"])
    
    return airportDf

#### 3-Cleaning Temperature table

In [299]:
GTempDf.head(1)

Unnamed: 0,City,Country,month,Latitude,Longitude,AverageTemperature
0,A Coruña,Spain,1,42.59N,8.73W,9.498917


In [287]:
GTempDf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [288]:
GTempDf.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


##### create cleanTemperatureDF

In [289]:
def cleanTemperatureDF(GTempDf):
    ##### we will get average temp for the last century so drop temperature before 1900
    GTempDf.query("dt >= '1990-01-01'",inplace=True)
    
    ##### drop null values in AverageTemperature column
    GTempDf.dropna(inplace=True,subset=['AverageTemperature'])
    
    ##### changing dt to datetime column
    GTempDf.dt=pd.to_datetime(GTempDf.dt)
    
    ##### Add separate columns for year month day
    GTempDf['year']=GTempDf['dt'].dt.year
    GTempDf['month']=GTempDf['dt'].dt.month
    GTempDf['day']=GTempDf['dt'].dt.day
    
    ##### Group temp by month, city and country
    GTempDf=GTempDf.groupby(['City','Country','month','Latitude','Longitude'], as_index=False,).agg({'AverageTemperature':'mean',\
                                                             'AverageTemperatureUncertainty':'mean'  })
    ##### Add uncertainty column to temp. column then drop it
    GTempDf.AverageTemperature=GTempDf.AverageTemperature+GTempDf.AverageTemperatureUncertainty
    GTempDf.drop(columns='AverageTemperatureUncertainty',inplace=True)
    
    return GTempDf


#### 4-Cleaning cities code data

In [290]:
citiescodesDF.head(1)

Unnamed: 0,code,city,country/state code
0,ALC,ALCAN,AK


In [291]:
citiescodesDF.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 591 entries, 0 to 590
Data columns (total 3 columns):
code                  591 non-null object
city                  591 non-null object
country/state code    583 non-null object
dtypes: object(3)
memory usage: 13.9+ KB


##### create cleancitiesCodesDF

In [292]:
def cleancitiesCodesDF(citiescodesDF):
    ##### drop cities that don't have country/state codes
    citiescodesDF.dropna(subset=["country/state code"],inplace=True)
    
    ##### replace spaces in codes
    citiescodesDF["country/state code"]=citiescodesDF["country/state code"].str.replace('\t','')
    citiescodesDF["country/state code"]=citiescodesDF["country/state code"].str.replace(' ','')

    citiescodesDF.columns=citiescodesDF.columns.str.replace(' ','_')
    citiescodesDF.columns=citiescodesDF.columns.str.replace('/','OR')
    
    ##### capitalize first letter only
    citiescodesDF.city=citiescodesDF.city.str.title()
    return citiescodesDF

#### 5-Cleaning Immigration Data

##### create cleanImmigrationDF function

In [293]:
def cleanImmigrationDF(df_spark):
    ##### drop rows with nulls in  i94addr','I94PORT' columns
    df_spark=df_spark.dropna(subset=('i94addr','I94PORT'))

    ##### convert month, year column to int data type
    df_spark=df_spark.withColumn("i94mon",df_spark.i94mon.cast(IntegerType()))
    df_spark=df_spark.withColumn("i94yr",df_spark.i94yr.cast(IntegerType()))
    df_spark=df_spark.withColumn("i94cit",df_spark.i94cit.cast(IntegerType()))
    df_spark=df_spark.withColumn("i94mode",df_spark.i94mode.cast(IntegerType()))
    df_spark=df_spark.withColumn("i94visa",df_spark.i94visa.cast(IntegerType()))
    df_spark=df_spark.withColumn("cicid",df_spark.cicid.cast(IntegerType()))

    df_spark.registerTempTable("df_spark")

#### 6-Cleaning SAS Country codes data

In [294]:
countrcodesDF.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 236 entries, 0 to 235
Data columns (total 2 columns):
SAS_CountCode    236 non-null int64
Country          236 non-null object
dtypes: int64(1), object(1)
memory usage: 3.8+ KB


In [295]:
countrcodesDF.nunique()

SAS_CountCode    236
Country          236
dtype: int64

In [296]:
countrcodesDF.head()

Unnamed: 0,SAS_CountCode,Country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


##### create cleanCountrcodesDF function

In [297]:
def cleanCountrcodesDF(countrcodesDF):
    ##### update country column string to be title
    countrcodesDF.Country=countrcodesDF.Country.str.title()
    return countrcodesDF

In [298]:
airportDf=cleanAirportDF(airportDf)
GTempDf=cleanTemperatureDF(GTempDf)
citiescodesDF=cleancitiesCodesDF(citiescodesDF)
demoDf=cleandemographyDF(demoDf)
countrcodesDF=cleanCountrcodesDF(countrcodesDF)
cleanImmigrationDF(df_spark)


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
This data model is a star model; I chose this model to easily create simple relations between tables and get insights for every table with its related table then visualize them by Power BI 
<br>why star model:
<br>-easy to understand especially by Bi-tools
<br>-less table joins that leads to less reading time

![Data Model](https://lh4.googleusercontent.com/fsDeOchBE3K_CXVVnhQO4kc_eXs9wuz3eaI_HHVdKiCWXLCUt_NKUUZqVpRRcz2rO78=w2400)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create temp table for Cities codes

In [302]:
#convert pandas df to spark df
cities_spark_df= spark.createDataFrame(citiescodesDF)

#create temp view from demo spark df
cities_spark_df.createOrReplaceTempView("Cities_Codes_table")
cities_spark_df.show(10)


+----+--------------------+-------------------+
|code|                city|countryORstate_code|
+----+--------------------+-------------------+
| ALC|               Alcan|                 AK|
| ANC|           Anchorage|                 AK|
| BAR|Baker Aaf - Baker...|                 AK|
| DAC|       Daltons Cache|                 AK|
| PIZ|Dew Station Pt La...|                 AK|
| DTH|        Dutch Harbor|                 AK|
| EGL|               Eagle|                 AK|
| FRB|           Fairbanks|                 AK|
| HOM|               Homer|                 AK|
| HYD|               Hyder|                 AK|
+----+--------------------+-------------------+
only showing top 10 rows



##### Create temp table for Country SAS codes dim_country_table

In [303]:
#convert pandas df to spark df
dim_countries_spark= spark.createDataFrame(countrcodesDF)

#create temp view from demo spark df
dim_countries_spark.createOrReplaceTempView("dim_country_table")
dim_countries_spark.show(5)


+-------------+--------------------+
|SAS_CountCode|             Country|
+-------------+--------------------+
|          582|Mexico Air Sea, A...|
|          236|         Afghanistan|
|          101|             Albania|
|          316|             Algeria|
|          102|             Andorra|
+-------------+--------------------+
only showing top 5 rows



#### 1-Create sql demography table dim_demography_table

In [304]:
def create_dim_demography_table(demoDf):
    #convert pandas df to spark df
    demog_spark_df= spark.createDataFrame(demoDf)

    #create temp view from demo spark df
    demog_spark_df.createOrReplaceTempView("demography_table")
    
    # extract columns to create demography tables 
    dim_demography_table=spark.sql(''' select StateCode , State,
    MalePopulation,FemalePopulation,TotalPopulation,AverageHouseholdSize from demography_table  ''')
    dim_demography_table.registerTempTable("dim_demography_table")

#### 2- Create Airport codes table dim_airportcodes_table

In [305]:
def create_dim_airportcodes_table(airportDf):
    #convert pandas df to spark df
    airports_spark_df= spark.createDataFrame(airportDf)

    #create temp view from demo spark df
    airports_spark_df.createOrReplaceTempView("airport_table")
    
    # extract columns to create dim table by grouping SAS ports/cities table and demography table
    dim_airportcodes_table=spark.sql(''' select 
    CASE 
        WHEN isnull(a.ident) THEN concat("K",c.code)
        ELSE a.ident
    END as airportId,a.type,

    CASE 
        WHEN isnull(a.name) THEN c.city
        ELSE a.name
    END as name,

    a.elevation_ft,a.coordinates,
    c.code as portcode,

    CASE 
        WHEN isnull(a.state_code) THEN c.countryORstate_code
        ELSE a.state_code
    END as state_code,d.State,

    a.iso_country as country_code


    from airport_table a right join Cities_Codes_table c on c.code=a.iata_code and 
    c.countryORstate_code=a.state_code 
    left join dim_demography_table d on a.state_code=d.StateCode''')
    dim_airportcodes_table.registerTempTable("dim_airportcodes_table")

#### 3- Creating global temperature dim_GlTempr_table

In [306]:
def create_dim_GlobalTemperature_table(GTempDf):
    #convert pandas df to spark df
    gltemp_spark_df= spark.createDataFrame(GTempDf)

    #create temp view from demo spark df
    gltemp_spark_df.createOrReplaceTempView("gtemp_table")
    
    # extract columns to create dim table
    dim_GlTempr_table=spark.sql(''' select  ROW_NUMBER() OVER(ORDER BY t.City,t.Country ASC) AS GlTemprId,
    cont.SAS_CountCode,t.Country,t.City,t.month,t.AverageTemperature
    ,t.Latitude,t.Longitude
    from gtemp_table as t left join dim_country_table as cont on cont.Country=t.Country ''')
    dim_GlTempr_table.registerTempTable("dim_GlobalTemperature_table")
    
    dim_GlTempr_table=spark.table("dim_GlobalTemperature_table").withColumn("month",col("month").cast("integer"))
    dim_GlTempr_table.registerTempTable("dim_GlobalTemperature_table")


#### 4-create table dim_travel_mode and dim_visa_table

In [307]:
def create_dim_travel_mode(travelmodesDF):
    #convert pandas df to spark df
    dim_travel_table= spark.createDataFrame(travelmodesDF)
    
    #create temp view from demo spark df
    dim_travel_table.createOrReplaceTempView("dim_travelmode_table")


In [308]:
def create_dim_visa_table(visatypesDF):
    #convert pandas df to spark df
    dim_visa_table= spark.createDataFrame(visatypesDF)

    #create temp view from demo spark df
    dim_visa_table.createOrReplaceTempView("dim_visa_table")


#### 5-Creating Immigration table fact_immigration_table

In [309]:
def create_fact_immigration_table(dfname):
    fact_immigration_table=spark.sql(''' select cicid as id,i94cit as citizenship_id,i94port as portId,i94mode as travelmode_id,i94addr as address_id,
    i94visa as visa_id,gender,i94mon as month,i94yr as year from %s ''' %(dfname))
    fact_immigration_table.registerTempTable("fact_immigration_table")

##### call creating model function

In [310]:
create_dim_airportcodes_table(airportDf)
create_dim_demography_table(demoDf)
create_dim_GlobalTemperature_table(GTempDf)
create_dim_travel_mode(travelmodesDF)
create_dim_visa_table(visatypesDF)
create_fact_immigration_table("df_spark")
    

#### Write tables to cvs files to be imported in Power BI

In [311]:
dim_GlTempr_table.repartition(1).write.csv("dim_globaltemperature_table",mode="overwrite",header=True)
dim_airportcodes_table.repartition(1).write.csv("dim_airportcodes_table",mode="overwrite",header=True)
dim_demography_table.repartition(1).write.csv("dim_demography_table",mode="overwrite",header=True)
dim_travel_table.repartition(1).write.csv("dim_travelmode_table",mode="overwrite",header=True)
dim_visa_table.repartition(1).write.csv("dim_visa_table",mode="overwrite",header=True)
fact_immigration_table.repartition(1).write.csv("fact_immigration_table",mode="overwrite",header=True)


#### save immigration table to Amazon s3 redshift (in case we joins another months files of immigration data)

In [78]:
#spark._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", " ")
#spark._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", " ")
#spark._jsc.hadoopConfiguration().set("fs.s3n.endpoint", " ")
#fact_immigration_table.write.mode("overwrite").csv("s3a://dev/immigration.csv")


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### check tables schema and total rows

##### 1-dim_demography_table data check

In [326]:
dim_demography_table.printSchema()

root
 |-- StateCode: string (nullable = true)
 |-- State: string (nullable = true)
 |-- MalePopulation: double (nullable = true)
 |-- FemalePopulation: double (nullable = true)
 |-- TotalPopulation: long (nullable = true)
 |-- AverageHouseholdSize: double (nullable = true)



In [327]:
dim_demography_table.count()

49

##### 2-dim_airportcodes_table data check

In [328]:
dim_airportcodes_table.printSchema()


root
 |-- airportId: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: double (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- portcode: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- country_code: string (nullable = true)



In [329]:
dim_airportcodes_table.count()


584

##### 3-dim_GlobalTemperature_table data check

In [330]:
dim_GlTempr_table.printSchema()


root
 |-- GlTemprId: integer (nullable = true)
 |-- SAS_CountCode: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [331]:
dim_GlTempr_table.count()


42120

##### 4-dim_travel_mode data check

In [332]:
dim_travel_table.printSchema()

root
 |-- mode_id: long (nullable = true)
 |-- mode: string (nullable = true)



In [319]:
dim_travel_table.count()

4

In [333]:
dim_travel_table.show()

+-------+-------------+
|mode_id|         mode|
+-------+-------------+
|      1|          Air|
|      2|          Sea|
|      3|         Land|
|      9|Not Specified|
+-------+-------------+



##### 5-dim_visa_table data check

In [334]:
dim_visa_table.printSchema()

root
 |-- Visa_id: long (nullable = true)
 |-- Type: string (nullable = true)



In [322]:
dim_visa_table.count()

3

In [335]:
dim_visa_table.show()

+-------+--------+
|Visa_id|    Type|
+-------+--------+
|      1|Business|
|      2|Pleasure|
|      3| Student|
+-------+--------+



##### 6-fact_immigration_table data check

In [336]:
fact_immigration_table.printSchema()


root
 |-- id: integer (nullable = true)
 |-- citizenship_id: integer (nullable = true)
 |-- portId: string (nullable = true)
 |-- travelmode_id: integer (nullable = true)
 |-- address_id: string (nullable = true)
 |-- visa_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [337]:
fact_immigration_table.count()


2670795

#### 4.3 Data dictionary 
For each field, I've provided a brief description of what the data is and where it came from. I included the data dictionary in an Excel file called data dictionary.

#### Step 5: Complete Project Write Up


##### *The rationale for the choice of tools and technologies for the project.
###### Pyspark is a large library that can handle big data with low processing time and high stability

##### *Data should be updated monthly because our model is designed to be analyzed monthly 

##### * If The data was increased by 100x : I'll have to setup a cluster with more than worker node and split the data across the nodes then collect the results"

##### *If The data populates a dashboard that must be updated on a daily basis by 7am every day. I'll have to user Airflow and setup a scheduler

##### *If The database needed to be accessed by 100+ people: I should upload my database to AWS Redshift because it supports concurrent operations as well as ACID

#### Step 6: Data Insights

##### *example of final pipeline by running a query that 2 dimension tables with a fact table to get insights

In [338]:
#show the top 10 most lived state by immigrants and their visa type
spark.sql(''' select d.State,v.Type as VisaType,count(f.id) as total_immigrants 
from fact_immigration_table as f 
join dim_demography_table d on f.address_id=d.StateCode 
join dim_visa_table as v on f.visa_id=v.Visa_id
group by  d.State,VisaType order by total_immigrants desc,State limit 10
''').show()

+----------+--------+----------------+
|     State|VisaType|total_immigrants|
+----------+--------+----------------+
|   Florida|Pleasure|          545364|
|California|Pleasure|          267623|
|  New York|Pleasure|          239567|
|    Hawaii|Pleasure|          178672|
|California|Business|          109626|
|California| Student|           66215|
|     Texas|Pleasure|           65979|
|    Nevada|Pleasure|           63425|
|   Florida|Business|           55831|
|  New York|Business|           55150|
+----------+--------+----------------+



###### Here the most lived US state for pleasure visa type is Florida and the most lived US state for Business Visa as well as Student visa types is California

[Here's a link for my PowerBI Report](https://app.powerbi.com/view?r=eyJrIjoiMWY0NTFkMzktNDVkNy00MjE2LTg3MjItNzZlMWZlMDJiZTQyIiwidCI6IjU4ODYzNGViLWQ2MDAtNDhmOC1hNjdjLTAzMzMyYzA5NThjOCJ9&pageName=ReportSection)