# Data Engineering Capstone Project
### Immigration Study in US by Demographics and Temperatures

#### Project Summary
I'll work with four datasets to complete the project. The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data. You're also welcome to enrich the project with additional data.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import configparser
import datetime
import requests
requests.packages.urllib3.disable_warnings()
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format, dayofweek,monotonically_increasing_id,udf,col,when,count,avg,isnan
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.types import DoubleType, DateType, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from datetime import datetime, timedelta




In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


### Step 1: Scope the Project and Gather Data

#### Scope 
##### Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use?

In this Project I will gather the data from four sources, and Loads this data into staging dataframes, and  Cleans the raw data, then write it to parquet files and perform an ETL process using Spark cluster, then add data into Fact & Dimension tables to form schema.
The schema can be used perform data analytices, correlation and ad-hoc reporting in an effective.

#### Describe and Gather Data 
##### Describe the data sets you're using. Where did it come from? What type of information is included? 

-  **i94 Immigration sample Data:** 

Sample data of immigration records from th US National Tourism and Trade Office, this data source will serve as the Fact table in the schema, and this data comes from: https://www.trade.gov/national-travel-and-tourism-office

- **World Temperature Data:**

This datset contains temperature data in different cities from the 1750 to 2013, this data comes from : https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data

- **U.S. City Demographic Data:**

This dataset includes information on the population of all US cities such as race, household size and gender, this data comes from: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

- **Airport Codes Table:**

This dataset table contains the airport codes for the corresponding cities, this data comes from: https://datahub.io/core/airport-codes#data

In [3]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

#### Immigration Data

In [4]:
# Read immigration data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [5]:
# Review the head of data
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
# Read and Review states data 
fname = ('us_states.csv')
us_states = spark.read.csv(fname,header=True,inferSchema=True)
us_states.limit(5).toPandas()

Unnamed: 0,state,code
0,Alabama,AL
1,Alaska,AK
2,Arizona,AZ
3,Arkansas,AR
4,California,CA


#### Temperature Data

In [7]:
# Read temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.csv(fname, header=True, inferSchema=True)

In [8]:
# Review the head of data
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


#### US Cities Demographics

In [9]:
# Read demographics data by pandas
df_demographics = pd.read_csv('us-cities-demographics.csv',sep=';')
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [10]:
# Rename the columns names 
df_demographics = df_demographics.rename(columns={'Median Age':'median_age','Male Population':'male_population',
                                                 'Female Population':'female_population','Total Population':'total_population',
                                                 'Number of Veterans':'number_veterans','Foreign-born':'foreign_born',
                                                 'Average Household Size':'avg_household_size','State Code':'state_code'})
# review the head of data to check the change 
df_demographics.head()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_veterans,foreign_born,avg_household_size,state_code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [11]:
# Save the change to the dataset 
df_demographics.to_csv('us-cities-demographics1.csv',index=False)

In [12]:
# Read US cities Demographics data
fname = 'us-cities-demographics1.csv'
demographics_df = spark.read.csv(fname,inferSchema=True, header=True)

In [13]:
# review the head of data
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_veterans,foreign_born,avg_household_size,state_code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


#### Airport Code

In [14]:
# Read airport data
fname = 'airport-codes_csv.csv'
airport_df = spark.read.csv(fname,inferSchema=True, header=True)

In [15]:
# Review the head of data
airport_df.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Performing cleaning tasks here

* Drop all missing values.

* Drop all duplicate values.

* Convert a coulmns data type.

* Modify the columns letter to lowercase.

* Change some of columns name.


#### Clean Immigration Data

In [16]:
# print data schema
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### Clean Temperature Data

In [17]:
# Review data size
temperature_df.count()

8599212

In [18]:
# review the schema of data
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



#### US Cities Demographics

In [19]:
# review data schema
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: double (nullable = true)
 |-- female_population: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- number_veterans: double (nullable = true)
 |-- foreign_born: double (nullable = true)
 |-- avg_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [20]:
# review data size
demographics_df.count()

2891

#### Airport Code

In [21]:
# create a copy of airport data 
airport_df.count()

55075

In [22]:
# review data info
airport_df.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Since we are interested in the flow of immigration in the united states, the below fact and dimension tables will serve as  clear evidence to explain our aim and this schema can be used by data analysts and other relevant business professional to gain deeper insight into various immigration figures, trends and statistics recorded historically:

![title](Schema.png)


To made conceptual data model, I used dbdiagram tool to create it on this URL : https://dbdiagram.io/home?utm_source=dbdiagram, it's free, simple tool to draw ER diagrams by just writing code.
Designed for developers and data analysts.


#### 3.2 Mapping Out Data Pipelines
The steps that necessary to pipeline the data into the chosen data model

1- Load the data into staging tables.

2- Create Dimenision tables.

3- Create Fact table.

4- Write data into parquet files.

5- Perform data quality checks.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [23]:
# save the immigration & states dataset to immi_table dataframe
immigration_df.createOrReplaceTempView("fact_immigration")
us_states.createOrReplaceTempView("states")

In [24]:
# Join immigration & states dataset to display the full name of state
spark.sql("""
SELECT f.i94addr, s.state
FROM fact_immigration AS f
INNER JOIN states AS s
ON f.i94addr = s.code
""").show()

+-------+-------------+
|i94addr|        state|
+-------+-------------+
|     AL|      Alabama|
|     MI|     Michigan|
|     MA|Massachusetts|
|     MA|Massachusetts|
|     MI|     Michigan|
|     NJ|   New Jersey|
|     NJ|   New Jersey|
|     NY|     New York|
|     NY|     New York|
|     NY|     New York|
|     MO|     Missouri|
|     MA|Massachusetts|
|     MA|Massachusetts|
|     MA|Massachusetts|
|     NJ|   New Jersey|
|     NY|     New York|
|     TX|        Texas|
|     CT|  Connecticut|
|     CT|  Connecticut|
|     NJ|   New Jersey|
+-------+-------------+
only showing top 20 rows



In [25]:
# Join immigration & states dataset to display the full name of state
spark.sql("""
SELECT f.*, s.state
FROM fact_immigration AS f
INNER JOIN states AS s
ON f.i94addr = s.code
""").createOrReplaceTempView("fact_immigration")

In [26]:
# Review the fact immigration after add state full name
spark.sql("""
SELECT  i94addr,state, count(state)
FROM fact_immigration
GROUP BY 1,2
ORDER BY 3 DESC
""").show()

+-------+--------------------+------------+
|i94addr|               state|count(state)|
+-------+--------------------+------------+
|     FL|             Florida|      621701|
|     NY|            New York|      553677|
|     CA|          California|      470386|
|     HI|              Hawaii|      168764|
|     TX|               Texas|      134321|
|     NV|              Nevada|      114609|
|     IL|            Illinois|       82126|
|     NJ|          New Jersey|       76531|
|     MA|       Massachusetts|       70486|
|     WA|          Washington|       55792|
|     GA|             Georgia|       44663|
|     MI|            Michigan|       32101|
|     VA|            Virginia|       31399|
|     PA|        Pennsylvania|       30293|
|     DC|District of Columbia|       28228|
|     NE|            Nebraska|       26574|
|     MD|            Maryland|       25360|
|     NC|      North Carolina|       23375|
|     LA|           Louisiana|       22655|
|     AZ|             Arizona|  

In [27]:
# Review the count immigration of dataset 
spark.sql("""
SELECT COUNT(state), COUNT(i94addr),COUNT(cicid)
FROM fact_immigration
WHERE state IS NOT NULL
""").show()

+------------+--------------+------------+
|count(state)|count(i94addr)|count(cicid)|
+------------+--------------+------------+
|     2823040|       2823040|     2823040|
+------------+--------------+------------+



In [28]:
# Review i94mode content
spark.sql("""
SELECT i94mode , COUNT(*)
FROM fact_immigration
GROUP BY 1
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|    1.0| 2760441|
|    3.0|   48410|
|    2.0|    7917|
|    9.0|    6272|
+-------+--------+



In [29]:
# Review data if we repace Mode of transportation to be (1 = Air, 2 = Sea, 3 = Land, 9 = Not reported)
spark.sql(""" SELECT i94mode , CASE
                             WHEN i94mode = 1.0 THEN 'Air'
                             WHEN i94mode = 2.0 THEN 'Sea'
                             WHEN i94mode = 3.0 THEN 'Land'
                             WHEN i94mode = 3.0 THEN 'Not Reported'
                             ELSE 'N/A' END AS transport_mode
FROM fact_immigration                            

""").show()

+-------+--------------+
|i94mode|transport_mode|
+-------+--------------+
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
|    1.0|           Air|
+-------+--------------+
only showing top 20 rows



In [30]:
# Repace Mode of transportation to be (1 = Air, 2 = Sea, 3 = Land, 9 = Not reported)
spark.sql(""" SELECT * , CASE
                             WHEN i94mode = 1.0 THEN 'Air'
                             WHEN i94mode = 2.0 THEN 'Sea'
                             WHEN i94mode = 3.0 THEN 'Land'
                             WHEN i94mode = 3.0 THEN 'Not Reported'
                             ELSE 'N/A' END AS transport_mode
FROM fact_immigration                            

""").createOrReplaceTempView("fact_immigration")

In [31]:
# Drop the data not via Air \ as 1 = Air
#spark.sql("""
#SELECT *
#FROM fact_immigration
#WHERE i94mode = 1.0
#""").createOrReplaceTempView("fact_immigration")

In [32]:
# Review the change
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration
""").show()

+--------+
|count(1)|
+--------+
| 2823040|
+--------+



In [33]:
# Review gender content
spark.sql("""
SELECT gender, COUNT(*)
FROM fact_immigration
GROUP BY 1
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F| 1183006|
|  null|  384629|
|     M| 1254637|
|     U|     294|
|     X|     474|
+------+--------+



In [34]:
# Keep just male and female travellers on data as there incorrect values
spark.sql("""
SELECT * 
FROM fact_immigration
WHERE gender IN ('F','M')
""").createOrReplaceTempView("fact_immigration")

In [35]:
# Review the change
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration
""").show()

+--------+
|count(1)|
+--------+
| 2437643|
+--------+



In [36]:
# convert arrdate to timedate
spark.sql("""
SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date
FROM fact_immigration
""").createOrReplaceTempView("fact_immigration")

In [37]:
# check the change
spark.sql("""
SELECT arrival_date
FROM fact_immigration
""").show(5)

+------------+
|arrival_date|
+------------+
|  2016-04-07|
|  2016-04-01|
|  2016-04-01|
|  2016-04-01|
|  2016-04-01|
+------------+
only showing top 5 rows



In [38]:
# convert the departure dates to useable date
spark.sql("""
SELECT *, CASE
               WHEN depdate >= 1.0 THEN date_add(to_date('1960-01-01'), depdate )
               WHEN depdate IS NULL THEN NULL
               ELSE 'N/A' END AS departure_date
FROM fact_immigration               
""").createOrReplaceTempView("fact_immigration")

In [39]:
# check the change
spark.sql("""
SELECT departure_date
FROM fact_immigration
""").show(5)

+--------------+
|departure_date|
+--------------+
|          null|
|    2016-08-25|
|    2016-04-05|
|    2016-04-05|
|    2016-04-17|
+--------------+
only showing top 5 rows



In [40]:
# Check the count in case departure date < arrival date
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration
WHERE departure_date <= arrival_date
""").show()

+--------+
|count(1)|
+--------+
|     137|
+--------+



In [41]:
# drop incorrect date in case departure date < arrival date
spark.sql("""
SELECT *
FROM fact_immigration
WHERE departure_date >= arrival_date
""").createOrReplaceTempView("fact_immigration")

In [42]:
# Review the change
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration
WHERE departure_date <= arrival_date
""").show()

spark.sql("""
SELECT arrival_date, departure_date
FROM fact_immigration
""").show(5)

+--------+
|count(1)|
+--------+
|       0|
+--------+

+------------+--------------+
|arrival_date|departure_date|
+------------+--------------+
|  2016-04-01|    2016-08-25|
|  2016-04-01|    2016-04-05|
|  2016-04-01|    2016-04-05|
|  2016-04-01|    2016-04-17|
|  2016-04-01|    2016-05-04|
+------------+--------------+
only showing top 5 rows



In [43]:
# review biryear content
spark.sql("""
SELECT MAX(biryear), MIN(biryear)
FROM fact_immigration
""").show()

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2016.0|      1916.0|
+------------+------------+



In [44]:
# review if we add new column for age 
spark.sql("""
SELECT (2016-biryear) AS age
FROM fact_immigration
""").show(5)

+----+
| age|
+----+
|55.0|
|58.0|
|56.0|
|62.0|
|49.0|
+----+
only showing top 5 rows



In [45]:
# apply the new column to immigration dataset to display the age
spark.sql("""
SELECT *, (2016-biryear) AS age
FROM fact_immigration
""").createOrReplaceTempView("fact_immigration")

In [46]:
# Review the change
spark.sql("""
SELECT age,COUNT(age)
FROM fact_immigration
GROUP BY 1
ORDER BY 2 DESC
""").show()

+----+----------+
| age|count(age)|
+----+----------+
|30.0|     55173|
|31.0|     53413|
|33.0|     53019|
|34.0|     52722|
|32.0|     52617|
|35.0|     51788|
|29.0|     51522|
|36.0|     50720|
|40.0|     50437|
|28.0|     49761|
|37.0|     49690|
|38.0|     47915|
|41.0|     47091|
|39.0|     46995|
|45.0|     46720|
|42.0|     46607|
|44.0|     46534|
|43.0|     46161|
|27.0|     45683|
|50.0|     45154|
+----+----------+
only showing top 20 rows



In [47]:
# Review Visa content
spark.sql("""
SELECT i94visa , COUNT(i94visa)
FROM fact_immigration
GROUP BY 1
""").show()

+-------+--------------+
|i94visa|count(i94visa)|
+-------+--------------+
|    1.0|        380214|
|    3.0|         27459|
|    2.0|       1919532|
+-------+--------------+



In [48]:
# Review if we repace Visa categories to be (1 = Business, 2 = Tourism, 3 = Stady)
spark.sql(""" SELECT * , CASE
                             WHEN i94visa = 1.0 THEN 'Business'
                             WHEN i94visa = 2.0 THEN 'Tourism'
                             WHEN i94visa = 3.0 THEN 'Study'
                             ELSE 'N/A' END AS visa_type
FROM fact_immigration                            
""").show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+-------------+--------------+------------+--------------+----+---------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|        state|transport_mode|arrival_date|departure_date| age|visa_type|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+-------------+--------------+------------+--------------+----+---------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| nul

In [49]:
# Repace Visa categories to be (1 = Business, 2 = Tourism, 3 = Stady)
spark.sql(""" SELECT * , CASE
                             WHEN i94visa = 1.0 THEN 'Business'
                             WHEN i94visa = 2.0 THEN 'Tourism'
                             WHEN i94visa = 3.0 THEN 'Study'
                             ELSE 'N/A' END AS visa_type
FROM fact_immigration
""").createOrReplaceTempView("fact_immigration")

In [50]:
# Review Visa content after change
spark.sql("""
SELECT visa_type
FROM fact_immigration
""").show()

+---------+
|visa_type|
+---------+
|  Tourism|
| Business|
| Business|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
| Business|
| Business|
| Business|
|    Study|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
|  Tourism|
+---------+
only showing top 20 rows



In [51]:
# Review Visa content after change
spark.sql("""
SELECT  visa_type, COUNT(visa_type)
FROM fact_immigration
GROUP BY 1
""").show()

+---------+----------------+
|visa_type|count(visa_type)|
+---------+----------------+
|    Study|           27459|
|  Tourism|         1919532|
| Business|          380214|
+---------+----------------+



In [52]:
# Review Visa content after change
spark.sql("""
SELECT  COUNT(*)
FROM fact_immigration
""").show()

+--------+
|count(1)|
+--------+
| 2327205|
+--------+



In [53]:
# Create the immigration fact table into spark dataframe
fact_immigration_table = spark.sql("""
SELECT cicid,
       state,
       i94cit AS country_code,
       i94port AS port,
       arrival_date,
       departure_date,
       biryear,
       gender,
       age,
       visa_type,
       transport_mode
       
FROM fact_immigration       
""")
fact_immigration_table.createOrReplaceTempView("fact_immigration_table")

In [54]:
spark.sql("""
SELECT *
FROM fact_immigration_table
""").show()

+-----+-------------+------------+----+------------+--------------+-------+------+----+---------+--------------+
|cicid|        state|country_code|port|arrival_date|departure_date|biryear|gender| age|visa_type|transport_mode|
+-----+-------------+------------+----+------------+--------------+-------+------+----+---------+--------------+
| 15.0|     Michigan|       101.0| WAS|  2016-04-01|    2016-08-25| 1961.0|     M|55.0|  Tourism|           Air|
| 27.0|Massachusetts|       101.0| BOS|  2016-04-01|    2016-04-05| 1958.0|     M|58.0| Business|           Air|
| 28.0|Massachusetts|       101.0| ATL|  2016-04-01|    2016-04-05| 1960.0|     F|56.0| Business|           Air|
| 29.0|Massachusetts|       101.0| ATL|  2016-04-01|    2016-04-17| 1954.0|     M|62.0|  Tourism|           Air|
| 30.0|   New Jersey|       101.0| ATL|  2016-04-01|    2016-05-04| 1967.0|     M|49.0|  Tourism|           Air|
| 31.0|     New York|       101.0| ATL|  2016-04-01|    2016-06-06| 1973.0|     M|43.0|  Tourism

In [55]:
# Review the fact_immigration_table 
spark.sql("""
SELECT COUNT(*)
FROM fact_immigration_table
""").show()

+--------+
|count(1)|
+--------+
| 2327205|
+--------+



In [56]:
# Extract all distinct dates from arrival and departure dates to create dimension table
dim_time = spark.sql("""
SELECT DISTINCT arrdate AS date
FROM fact_immigration
UNION
SELECT DISTINCT departure_date AS date
FROM fact_immigration
WHERE departure_date IS NOT NULL
""")
dim_time.createOrReplaceTempView("dim_time")

In [57]:
# Extract year, month, week, day from the date and insert all values to dim_time table
dim_time_table = spark.sql("""
SELECT date,
       YEAR(date) AS year,
       MONTH(date) AS month,
       DAY(date) AS day,
       WEEKOFYEAR(date) AS week,
       WEEKDAY(date) AS weekday,
       DAYOFYEAR(date) AS year_day
FROM dim_time
ORDER BY date ASC
""")
dim_time_table.createOrReplaceTempView("dim_time_table")

In [58]:
# Review the dim_time_table 
spark.sql("""
SELECT *
FROM dim_time_table
""").show()

+----------+----+-----+---+----+-------+--------+
|      date|year|month|day|week|weekday|year_day|
+----------+----+-----+---+----+-------+--------+
|2016-04-02|2016|    4|  2|  13|      5|      93|
|2016-04-03|2016|    4|  3|  13|      6|      94|
|2016-04-04|2016|    4|  4|  14|      0|      95|
|2016-04-05|2016|    4|  5|  14|      1|      96|
|2016-04-06|2016|    4|  6|  14|      2|      97|
|2016-04-07|2016|    4|  7|  14|      3|      98|
|2016-04-08|2016|    4|  8|  14|      4|      99|
|2016-04-09|2016|    4|  9|  14|      5|     100|
|2016-04-10|2016|    4| 10|  14|      6|     101|
|2016-04-11|2016|    4| 11|  15|      0|     102|
|2016-04-12|2016|    4| 12|  15|      1|     103|
|2016-04-13|2016|    4| 13|  15|      2|     104|
|2016-04-14|2016|    4| 14|  15|      3|     105|
|2016-04-15|2016|    4| 15|  15|      4|     106|
|2016-04-16|2016|    4| 16|  15|      5|     107|
|2016-04-17|2016|    4| 17|  15|      6|     108|
|2016-04-18|2016|    4| 18|  16|      0|     109|


In [59]:
# Another way to extract date 
#dim_time_table = dim_time.withColumn('year',year('date'))
#dim_time_table = dim_time.withColumn('month',month('date'))
#dim_time_table = dim_time.withColumn('week',weekofyear('date'))
#dim_time_table = dim_time.withColumn('day',dayofmonth('date'))
#dim_time_table = dim_time.withColumn('weekday',dayofweek('date'))
#dim_time_table = dim_time.withColumn('yearday',dayofyear('date'))
#dim_time_table.createOrReplaceTempView("dim_time_table")
#dim_time_table.show()

In [60]:
# save the temperature dataset to temp_table dataframe
temperature_df.createOrReplaceTempView("dim_temperature")

In [61]:
# Keep only data for the U.S
spark.sql("""
SELECT *
FROM dim_temperature
WHERE country = 'United States'
""").createOrReplaceTempView("dim_temperature")

In [62]:
# Verify the change
spark.sql("""
SELECT country
FROM dim_temperature
""").show(5)

+-------------+
|      country|
+-------------+
|United States|
|United States|
|United States|
|United States|
|United States|
+-------------+
only showing top 5 rows



In [63]:
# Verify the change
spark.sql("""
SELECT *
FROM dim_temperature
""").show(5)

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1820-01-01 00:00:00|2.1010000000000004|                        3.217|Abilene|United States|  32.95N|  100.53W|
|1820-02-01 00:00:00|             6.926|                        2.853|Abilene|United States|  32.95N|  100.53W|
|1820-03-01 00:00:00|            10.767|                        2.395|Abilene|United States|  32.95N|  100.53W|
|1820-04-01 00:00:00|17.988999999999994|                        2.202|Abilene|United States|  32.95N|  100.53W|
|1820-05-01 00:00:00|            21.809|                        2.036|Abilene|United States|  32.95N|  100.53W|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---

In [64]:
# Create temperature dimontion table into spark datafram
dim_temperature_table = spark.sql("""
SELECT DISTINCT
      dt AS date,
      City AS city,
      Country AS country,
      AVG(AverageTemperature) AS avg_temp,
      AVG(AverageTemperatureUncertainty) AS avg_temp_uncertainty
FROM dim_temperature 
GROUP BY 1,2,3
""")
dim_temperature_table.createOrReplaceTempView("dim_temperature_table")

In [65]:
# Review the dim_temperature_table 
spark.sql("""
SELECT *
FROM dim_temperature_table
""").show()

+-------------------+------------+-------------+------------------+--------------------+
|               date|        city|      country|          avg_temp|avg_temp_uncertainty|
+-------------------+------------+-------------+------------------+--------------------+
|1764-05-01 00:00:00|   Allentown|United States|              null|                null|
|1779-06-01 00:00:00|   Allentown|United States|              null|                null|
|1910-02-01 00:00:00|     Abilene|United States|             5.586|               0.847|
|1892-02-01 00:00:00|   Ann Arbor|United States|            -2.495| 0.42200000000000004|
|1875-11-01 00:00:00|      Arvada|United States|             -3.96|  0.6970000000000001|
|1763-07-01 00:00:00|      Aurora|United States|              null|                null|
|1795-11-01 00:00:00| Baton Rouge|United States|              null|                null|
|1830-07-01 00:00:00|    Bellevue|United States|              null|                null|
|2001-02-01 00:00:00|

In [66]:
# Remove the time from date column
dim_temperature_table = dim_temperature_table.withColumn("date",date_format(col("date"), "YYYY-MM-dd"))

In [67]:
# Review the dim_temperature_table
dim_temperature_table.show()

+----------+------------+-------------+------------------+--------------------+
|      date|        city|      country|          avg_temp|avg_temp_uncertainty|
+----------+------------+-------------+------------------+--------------------+
|1764-05-01|   Allentown|United States|              null|                null|
|1779-06-01|   Allentown|United States|              null|                null|
|1910-02-01|     Abilene|United States|             5.586|               0.847|
|1892-02-01|   Ann Arbor|United States|            -2.495| 0.42200000000000004|
|1875-11-01|      Arvada|United States|             -3.96|  0.6970000000000001|
|1763-07-01|      Aurora|United States|              null|                null|
|1795-11-01| Baton Rouge|United States|              null|                null|
|1830-07-01|    Bellevue|United States|              null|                null|
|2001-02-01|    Bellevue|United States|1.7760000000000002|               0.245|
|1751-11-01|  Bridgeport|United States| 

In [68]:
# Save the change on dim_temperature_table
dim_temperature_table.createOrReplaceTempView("dim_temperature_table")

In [69]:
# Review the dim_temperature_table 
spark.sql("""
SELECT *
FROM dim_temperature_table
""").show()

+----------+------------+-------------+------------------+--------------------+
|      date|        city|      country|          avg_temp|avg_temp_uncertainty|
+----------+------------+-------------+------------------+--------------------+
|1764-05-01|   Allentown|United States|              null|                null|
|1779-06-01|   Allentown|United States|              null|                null|
|1910-02-01|     Abilene|United States|             5.586|               0.847|
|1892-02-01|   Ann Arbor|United States|            -2.495| 0.42200000000000004|
|1875-11-01|      Arvada|United States|             -3.96|  0.6970000000000001|
|1763-07-01|      Aurora|United States|              null|                null|
|1795-11-01| Baton Rouge|United States|              null|                null|
|1830-07-01|    Bellevue|United States|              null|                null|
|2001-02-01|    Bellevue|United States|1.7760000000000002|               0.245|
|1751-11-01|  Bridgeport|United States| 

In [70]:
# save the demographics dataset to demo_table dataframe
demographics_df.createOrReplaceTempView("dim_demographics")

In [71]:
# Create demographics dimention table into spark dataframe
dim_demographics_table = spark.sql("""
SELECT 
      City AS city,
      State AS state,
      median_age,
      male_population,
      female_population,
      total_population,
      foreign_born,
      avg_household_size,
      state_code,
      Race AS race,
      Count AS count
FROM dim_demographics
""")
# Save the new demographics dataset to demo_table dataframe
dim_demographics_table.createOrReplaceTempView("dim_demographics_table")

In [72]:
# Review the dim_demographics_table
spark.sql("""
SELECT *
FROM dim_demographics_table
""").show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------+------------------+----------+--------------------+------+
|            city|         state|median_age|male_population|female_population|total_population|foreign_born|avg_household_size|state_code|                race| count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------+------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|        40601.0|          41862.0|           82463|     30908.0|               2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|        44129.0|          49500.0|           93629|     32935.0|              2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|        38040.0|          46799.0|           84839|      8229.0|              2.58|        AL|               Asian|  4759

In [73]:
# save the airport dataset to air_table dataframe
airport_df.createOrReplaceTempView("dim_airport")

In [74]:
# Drop is null values from air_table
spark.sql("""
SELECT *
FROM dim_airport
WHERE iso_country IS NOT NULL
""").createOrReplaceTempView("dim_airport")

In [75]:
# Create temperature dimention table into spaark dataframe
dim_airport_table = spark.sql("""
SELECT
      ident AS id,
      type,
      name,
      elevation_ft,
      iso_region AS state_code,
      municipality,
      iata_code
FROM dim_airport      
""")


In [76]:
from pyspark.sql.functions import regexp_replace
dim_airport_table = dim_airport_table.withColumn('state_code',regexp_replace('state_code', 'US-',''))
dim_airport_table.show()

+----+-------------+--------------------+------------+----------+------------+---------+
|  id|         type|                name|elevation_ft|state_code|municipality|iata_code|
+----+-------------+--------------------+------------+----------+------------+---------+
| 00A|     heliport|   Total Rf Heliport|          11|        PA|    Bensalem|     null|
|00AA|small_airport|Aero B Ranch Airport|        3435|        KS|       Leoti|     null|
|00AK|small_airport|        Lowell Field|         450|        AK|Anchor Point|     null|
|00AL|small_airport|        Epps Airpark|         820|        AL|     Harvest|     null|
|00AR|       closed|Newport Hospital ...|         237|        AR|     Newport|     null|
|00AS|small_airport|      Fulton Airport|        1100|        OK|        Alex|     null|
|00AZ|small_airport|      Cordes Airport|        3810|        AZ|      Cordes|     null|
|00CA|small_airport|Goldstone /Gts/ A...|        3038|        CA|     Barstow|     null|
|00CL|small_airport| 

In [77]:
# save the change to air_table dataframe
dim_airport_table.createOrReplaceTempView("dim_airport_table")

In [78]:
# Review the dim_airport_table
spark.sql("""
SELECT *
FROM dim_airport_table
""").show()

+----+-------------+--------------------+------------+----------+------------+---------+
|  id|         type|                name|elevation_ft|state_code|municipality|iata_code|
+----+-------------+--------------------+------------+----------+------------+---------+
| 00A|     heliport|   Total Rf Heliport|          11|        PA|    Bensalem|     null|
|00AA|small_airport|Aero B Ranch Airport|        3435|        KS|       Leoti|     null|
|00AK|small_airport|        Lowell Field|         450|        AK|Anchor Point|     null|
|00AL|small_airport|        Epps Airpark|         820|        AL|     Harvest|     null|
|00AR|       closed|Newport Hospital ...|         237|        AR|     Newport|     null|
|00AS|small_airport|      Fulton Airport|        1100|        OK|        Alex|     null|
|00AZ|small_airport|      Cordes Airport|        3810|        AZ|      Cordes|     null|
|00CA|small_airport|Goldstone /Gts/ A...|        3038|        CA|     Barstow|     null|
|00CL|small_airport| 

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [79]:
# Count Rows check test
# Define the table names
table_names = ["fact_immigration_table","dim_demographics_table","dim_time_table","dim_airport_table","dim_temperature_table"]

# Loop through each table name and perform a count check 
for table_name in table_names:
    # Read the table into dataframe
    table_df = spark.table(table_name)
    # Get the count of rows
    row_count = table_df.count()
    # print the result
    print(f"The {table_name} has {row_count} rows.")
    

The fact_immigration_table has 2327205 rows.
The dim_demographics_table has 2891 rows.
The dim_time_table has 203 rows.
The dim_airport_table has 55075 rows.
The dim_temperature_table has 664481 rows.


In [80]:
# Duplicates test
# Define the table names
table_names = ["fact_immigration_table","dim_demographics_table","dim_time_table","dim_airport_table","dim_temperature_table"]

# Loop through each table name and perform null values
for table_name in table_names:
    # read the table into dataframe
    #able_df = spark.table(table_name)
    # Check for duplicates in dataframe
    duplicate_count = table_df.groupBy(table_df.columns).agg(count("*").alias("count")).filter(col("count") > 1).count()
    
    
    #print the result
    print(f"The {table_name} has {duplicate_count} duplicates!")
    

The fact_immigration_table has 0 duplicates!
The dim_demographics_table has 0 duplicates!
The dim_time_table has 0 duplicates!
The dim_airport_table has 0 duplicates!
The dim_temperature_table has 0 duplicates!


In [81]:
# Null Values test
# Define the table names
table_names = ["fact_immigration_table","dim_demographics_table","dim_time_table","dim_airport_table","dim_temperature_table"]

# Loop through each table name and perform null values
for table_name in table_names:
    # read the table into dataframe
    table_df = spark.table(table_name)
    # Check for null values in dataframe
    null_value_count = table_df.select([count(when(col(c).isNull(), c)).alias(c) 
    for c in table_df.columns]).collect()[0]
    # print the result
    print(f"the {table_name} has {null_value_count} null values!.")

the fact_immigration_table has Row(cicid=0, state=0, country_code=0, port=0, arrival_date=0, departure_date=0, biryear=1, gender=0, age=1, visa_type=0, transport_mode=0) null values!.
the dim_demographics_table has Row(city=0, state=0, median_age=0, male_population=3, female_population=3, total_population=0, foreign_born=13, avg_household_size=16, state_code=0, race=0, count=0) null values!.
the dim_time_table has Row(date=0, year=30, month=30, day=30, week=30, weekday=30, year_day=30) null values!.
the dim_airport_table has Row(id=0, type=0, name=0, elevation_ft=7006, state_code=0, municipality=5676, iata_code=45886) null values!.
the dim_temperature_table has Row(date=0, city=0, country=0, avg_temp=24832, avg_temp_uncertainty=24832) null values!.


In [82]:
# print the schema for each table
print("fact_immigration_table")
fact_immigration_table.printSchema()
print("dim_temperature_table")
dim_temperature_table.printSchema()
print("dim_time_table")
dim_time_table.printSchema()
print("dim_airport_table")
dim_airport_table.printSchema()
print("dim_demographics_table")
dim_demographics_table.printSchema()

fact_immigration_table
root
 |-- cicid: double (nullable = true)
 |-- state: string (nullable = true)
 |-- country_code: double (nullable = true)
 |-- port: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_type: string (nullable = false)
 |-- transport_mode: string (nullable = false)

dim_temperature_table
root
 |-- date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- avg_temp: double (nullable = true)
 |-- avg_temp_uncertainty: double (nullable = true)

dim_time_table
root
 |-- date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year_day: integer (nullable = true)

dim_airport

In [83]:
# join the fact_immigration and dim_temperaturetables and dim_demographics tables toghater
#spark.sql("""
#SELECT f.state, f.age, f.visa_type, t.avg_temp,f.transport_mode, d.total_population
#FROM fact_immigration_table AS f
#JOIN dim_temperature_table AS t
#ON f.state = t.city
#JOIN dim_demographics_table AS d
#ON f.state = d.state
#JOIN dim_time_table AS td
#ON t.date = td.date
#""").show()

In [84]:
# join the fact_immigration and dim_temperaturetables toghater
spark.sql("""
SELECT f.state, f.arrival_date, f.age, f.visa_type, t.avg_temp,f.transport_mode
FROM fact_immigration_table AS f
JOIN dim_temperature_table AS t
ON f.state = t.city
WHERE avg_temp >= 5
""").show()

+----------+------------+----+---------+--------------------+--------------+
|     state|arrival_date| age|visa_type|            avg_temp|transport_mode|
+----------+------------+----+---------+--------------------+--------------+
|Washington|  2016-04-01|52.0|  Tourism|                null|           Air|
|Washington|  2016-04-01|52.0|  Tourism|  14.584000000000001|           Air|
|Washington|  2016-04-01|52.0|  Tourism|  10.739999999999998|           Air|
|Washington|  2016-04-01|52.0|  Tourism|              18.563|           Air|
|Washington|  2016-04-01|52.0|  Tourism|              13.906|           Air|
|Washington|  2016-04-01|52.0|  Tourism|               3.209|           Air|
|Washington|  2016-04-01|52.0|  Tourism|-0.43200000000000005|           Air|
|Washington|  2016-04-01|52.0|  Tourism|  12.549000000000001|           Air|
|Washington|  2016-04-01|52.0|  Tourism|              14.558|           Air|
|Washington|  2016-04-01|52.0|  Tourism|                4.25|           Air|

In [85]:
# join the fact_immigration and dim_temperaturetables and dim_demographics tables toghater
#spark.sql("""
#SELECT f.state, f.age, COUNT(f.visa_type), t.avg_temp,f.transport_mode, d.total_population
#FROM fact_immigration_table AS f
#JOIN dim_temperature_table AS t
#ON f.state = t.city
#JOIN dim_demographics_table AS d
#ON f.state = d.state
#JOIN dim_time_table AS td
#ON t.date = td.date
#""").show()

In [86]:
# review the age rate and city and male & female 
spark.sql("""
SELECT f.state, f.age,f.gender, d.male_population,d.female_population,total_population
FROM fact_immigration_table AS f
JOIN dim_demographics_table AS d
ON f.state = d.state
""").show()

+--------+----+------+---------------+-----------------+----------------+
|   state| age|gender|male_population|female_population|total_population|
+--------+----+------+---------------+-----------------+----------------+
|Michigan|55.0|     M|        31369.0|          41808.0|           73177|
|Michigan|55.0|     M|        35967.0|          39310.0|           75277|
|Michigan|55.0|     M|        58789.0|          58281.0|          117070|
|Michigan|55.0|     M|        36356.0|          37076.0|           73432|
|Michigan|55.0|     M|        64063.0|          71293.0|          135356|
|Michigan|55.0|     M|        37175.0|          38865.0|           76040|
|Michigan|55.0|     M|        36356.0|          37076.0|           73432|
|Michigan|55.0|     M|        31369.0|          41808.0|           73177|
|Michigan|55.0|     M|        31369.0|          41808.0|           73177|
|Michigan|55.0|     M|        64063.0|          71293.0|          135356|
|Michigan|55.0|     M|        37742.0|

In [90]:
# review the highest city and age rate & total Population
spark.sql("""
SELECT f.state, AVG(f.age),COUNT(d.total_population)
FROM fact_immigration_table AS f
JOIN dim_demographics_table AS d
ON f.state = d.state
GROUP BY 1
ORDER BY 3 DESC
""").show()

+--------------+------------------+-----------------------+
|         state|          avg(age)|count(total_population)|
+--------------+------------------+-----------------------+
|    California| 41.87245177943538|              266079008|
|       Florida|40.611540487978736|              114607944|
|         Texas|44.569836635127146|               29545152|
|      New York| 40.03067495212076|               25122582|
|      Illinois| 43.58689612594536|                5895890|
|        Nevada|  42.2622057243333|                4606650|
|    Washington|   41.712776456866|                3789470|
| Massachusetts| 43.00791177175737|                3741387|
|    New Jersey|44.917880202795374|                3698901|
|      Michigan| 43.19893449570166|                1957383|
|      Virginia|45.645065877924175|                1822310|
|       Georgia|45.451668230801594|                1699555|
|North Carolina|44.777914740626606|                1362900|
|       Arizona| 45.67388955582233|     

The most average of age 41.8 years are immigration to California City and it's the most population city.

In [91]:
# review the highest city and age rate & total Population when the reason of visa type for Business
spark.sql("""
SELECT f.state, AVG(f.age),COUNT(d.total_population)
FROM fact_immigration_table AS f
JOIN dim_demographics_table AS d
ON f.state = d.state
WHERE visa_type = "Business"
GROUP BY 1
ORDER BY 3 DESC
""").show()

+--------------+------------------+-----------------------+
|         state|          avg(age)|count(total_population)|
+--------------+------------------+-----------------------+
|    California| 41.71518952597919|               49257416|
|         Texas| 42.54297853451283|                8610147|
|       Florida| 42.98742121424339|                8241972|
|      New York|  42.8458540042523|                2438208|
|      Illinois|42.239607959022855|                1847664|
|    Washington| 41.24383139136395|                 992120|
| Massachusetts| 42.32705882352941|                 909075|
|      Michigan| 41.33250414593698|                 857466|
|        Nevada| 42.48653597587247|                 835560|
|    New Jersey|43.479519439977295|                 602547|
|North Carolina| 42.76927868444002|                 561890|
|       Georgia| 42.68189629030628|                 544115|
|      Virginia|43.142140468227424|                 376740|
|       Arizona| 43.26148016049933|     

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Descrition of Immigration Data:

| Feature | Description |
| ------- | ----------  |
| cicid | Unique ID |
| i94yr | Year |
| i94mon | Month |
| i94cit | 3 digit code for immigrant country of brith|
| i94res | 3 digit code for immigrant country of residence |
| i94port | Port of admission |
| arrdate | Arrival Date in the USA |
| i94mode | Mode of transportation (1 = Air, 2 = Sea, 3 = Land, 9 = Not reported) |
| i94addr | USA State of arrival |
| depdate | Departure DAte From the USA |
| i94bir | Age of Respondent in Years |
| i94visa | Visa codes collapsed into three categories |
| count | Field used for summary statistics |
| dtadfile | Character Date Field - DAte added to I-94 Files |
| visapost | Department of State where Visa was issued |
| occup | Occupation that will be performed is U.S |
| entdepa | Arrival Flag - admitted or paroled into th US |
| entdepd | Departure Flag - Departed, lost I-94 or is deceased |
| entdepu | Update Flag - Either apperhended, Overstayed, adjusted to perm residence |
| matflag | Match flag - Match of arrival and departure records |
| biryear | 4 digit year of birth |
| dtaddto | Character Date Field - Date to which admitted to U.S (allowed to stay until) |
| gender | Non - immigrant sex |
| insnum | INS number |
| airline | Airline used to arrive in U.S. |
| admnum | Admission Number |
| fltno | Flight number od Airline used to arrive in U.S. |
| visatype | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

#### Description of Temperature Data:

| Feature | Description |
| ------- | ----------- |
| dt | Date |
| AverageTemperature | Average temerature in celsius |
| AvarageTemperatureUncertainty | 95% confidence interval around average temperature |
| City | Name of city |
| Country | Name of country |
| Latitude | Latitude of city |
| Longitude | Longitude of city |

#### Description of Demographics Data:

| Feature | Description |
| ------- | ----------- |
| City | City Name |
| State | US State of City |
| Median Age | The median population age |
| Male Population | Male population total |
| Female Population | Female population total |
| Total Population | Total population |
| Number of Veterans | Number of veterans living in the city |
| Foregin- born | Number of residents who were not born in the city |
| Average Household Size | Average size of houses in the city |
| state Code | Code of the state |
| Race | race class |
| Count | Number of individuals in each race |

#### Description of Airport Data

| Feature | Description |
| ------- | ----------- |
| ident | Unique identifier |
| type | Airport type |
| name | Airport name |
| elevation_ft | Airport altitude |
| countinent | Continent |
| iso_country | ISO Code of the airport's country |
| iso_region | ISO Code for the airport's region |
| municipality | City/Municipality where the airport is located |
| gps_code | Airport GPS Code |
| lata_code | Airport IATA Code |
| local_code | Airport local Code |
| coordinates | Airport coordinates |

#### Step 5: Complete Project Write Up

this project using a big data Apache Spark processing technologie because of ability to process massive amounts of data as well as the use of unified analytics engine and convenient API's.

#### Requirment scenarios:

1- The data was increased by 100x:

In this scenario the data would be stored on one of storage and processing platforms such as Amazon S3 bucket or Amazon Redshift or Amazon EMR culster or Apache Cassandera and loaded into our staging tables for processing and analysis.

2- The data population a dashboard that must be updated on a daily basis by 7am every day:

In this scenario we would manage ETL pipeline in DAG from Apache Airflow to ensure the pipeline runs in set time and data quality check test as well.

3- The datebase needed to be accessed by 100+ people:

In this scenario better move the database to Amazon Redshift cluster which can handle a massive request volumes and it will be easy to scalable as require.
