# Udacity Data Engineer Nanodegree - Capstone Project

## Project Summary

This project builds upon 4 datasetrs included in the Udacity project workspace. The workflow for this project is described below:

1. Scope the project and gather data
2. Exploring all the data to understand them, clean them, and possibly save a new copy
3. Define the Data Model  based on the exploration 
4. Design ETL as such and then run it to model the data
5. Data quality check
6. Other considerations
7. Summary and running the project at a glance

In [19]:
import pandas as pd
import pyspark
import configparser
from datetime import datetime
import os
import glob
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, weekofyear, date_format, to_date
from pyspark.sql.functions import lit, expr
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, TimestampType

In [20]:
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')
AWS_ACCESS_KEY_ID = config.get('AWS', 'AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')
output_data = config['S3']['DEST_S3_BUCKET']

In [21]:
# Path to S3 where the parquet files will be written
write_to_path = {"immigration": "final_project/immigration.parquet",
                "state": "final_project/state.parquet",
                "city": "final_project/city.parquet",
                "temperature": "final_project/temperature.parquet",
                "demographic": "final_project/demographic.parquet",
                "visa": "final_project/visa.parquet",
                "countries": "final_project/countries.parquet",
                "mode": "final_project/mode.parquet"}

In [22]:
# Read using spark
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

In [7]:
# Then setup the sparkContext object
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

## Step 1: Scope the project and the related data

### Datasets
For this project, I will be mainly working wiht the following 4 datasets hosted in the Udacity Workspace. The target is to upload clean and formatted monthly immigration data and other associated files to S3 to give a base for further exploration and analysis of these data at a later stage.

* **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's also a sample file so you can take a look at the data in csv format before reading it all in. 

* **World Temperature Data:** This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

* **U.S. City Demographic Data:** This data comes from OpenSoft. You can read more about it [here]().
* **Airport Code Table:** This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

### Downloading and uploading to S3

#### Downloading data

* I94 Immigration Data: 
The data is in the  folder with the following path: `../../data/18-83510-I94-Data-2016/`. To download this, I ran the following command in the command prompt.
`!zip -r data.zip ../../data/18-83510-I94-Data-2016` This created a zip folder named **data.zip** in the working directory. Now, I just downloaded this to my local computer by right clicking on it and clicking on **Download**. However, this saved data was not used as the data in the Udacity directory was directly asccessed for reshaping, cleaning and then for final upload to S3. 

* World Temperature Data: 
The data is in the folder with the following path: `../../data2/`. To download this, I ran the following command in the command prompt.
`!zip -r data2.zip ../../data2`
This created a zip folder named **data2.zip** in the working directory. Now, like before, this data can also be downloaded to local computer by right clicking on this zipped folder and clicking on **Download**. Anyway, for this case, we can directly work with the data hosted in the Udacity working environment.

* U.S. City Demographic Data: This data is in the working directory as a single csv file named us-cities-demographics.csv and so just right click it and download, if needed.
* Airport Code Table: This file named as airport-codes_csv.csv can also be directly downloaded to the local computer.

#### Uploading to S3
Now, upload all of these data to a S3 bucket so that this can be later accessed through Spark and processed as such.

## Step 2: Exploring all these data to understand them

### Step 2.a. Explore I94_SAS_Labels_Descriptions.SAS

#### First get the country code and name and then write this as CSV file.

* Country code

In [71]:
with open("I94_SAS_Labels_Descriptions.SAS") as file:
    auxiliary_data = file.readlines()

As the data on this starts from 10th row and ends on 298th row, I will now get all these data by looping through these lines.

In [72]:
country = {}
for countries in auxiliary_data[9:298]:
    line = countries.split("=")
    code = line[0].strip()
    country_name = line[1].strip().strip("'")
    country[code] = country_name

In [73]:
country_pd = pd.DataFrame(list(country.items()), columns = ['code', 'country'])

In [74]:
country_pd.head(5)

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [75]:
# Write the data to the directory
country_pd.to_csv("countries.csv", index = False)

Now, write data as parquet files to S3 bucket.

In [58]:
country_spark = spark.createDataFrame(country_pd) # Convert to Spark dataframe

In [60]:
country_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["countries"]))

#### Secondly, get the city code and name and then write this as CSV file.

* City code

In [45]:
city = {}
for cities in auxiliary_data[302:962]:
    line = cities.split("=")
    code = line[0].strip().strip("'")
    city_name = line[1].strip().strip("'")
    city[code] = city_name

In [46]:
city_pd = pd.DataFrame(list(city.items()), columns = ['code', 'city'])

In [47]:
city_pd.head()

Unnamed: 0,code,city
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [None]:
# Write the data to the directory
city_pd.to_csv("cities.csv", index = False)

Now, write data as parquet files to S3 bucket.

In [62]:
city_spark = spark.createDataFrame(city_pd) # Convert to Spark dataframe

In [63]:
city_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["city"]))

#### Thirdly, get the mode code and name and then write this as CSV file.

* Mode code

In [48]:
mode = {}
for modes in auxiliary_data[972:976]:
    line = modes.split("=")
    code = line[0].strip()
    mode_name = line[1].strip().strip("'").strip(";").strip("'")
    mode[code] = mode_name

In [49]:
mode_pd = pd.DataFrame(list(mode.items()), columns = ['code', 'mode'])

In [50]:
mode_pd.head()

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported'


In [None]:
# Write the data to the directory
mode_pd.to_csv("mode.csv", index = False)

Now, write data as parquet files to S3 bucket.

In [78]:
mode_spark = spark.createDataFrame(mode_pd) # Convert to Spark dataframe

In [65]:
mode_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["mode"]))

#### Fourthly, get the state code and name and then write this as CSV file.
* State code

In [51]:
state = {}
for states in auxiliary_data[981:1036]:
    line = states.split("=")
    code = line[0].strip().strip("'")
    state_name = line[1].strip().strip("'")
    state[code] = state_name

In [52]:
state_pd = pd.DataFrame(list(state.items()), columns = ['code', 'state'])

In [53]:
state_pd.head()

Unnamed: 0,code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [None]:
# Write the data to the directory
state_pd.to_csv("state.csv", index = False)

Now, write data as parquet files to S3 bucket.

In [66]:
state_spark = spark.createDataFrame(state_pd) # Convert to Spark dataframe

In [67]:
state_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["state"]))

#### Fifthly, get the visa code and name and then write this as CSV file.
* Visa code

In [70]:
visa = {}
for visas in auxiliary_data[1046:1049]:
    line = visas.split("=")
    code = line[0].strip()
    visa_name = line[1].strip()
    visa[code] = visa_name

In [71]:
visa_pd = pd.DataFrame(list(visa.items()), columns = ['code', 'visa'])

In [72]:
# Write the data to the directory
visa_pd.to_csv("visa.csv", index = False)

In [73]:
visa_pd.head()

Unnamed: 0,code,visa
0,1,Business
1,2,Pleasure
2,3,Student


Now, write data as parquet files to S3 bucket.

In [74]:
visa_spark = spark.createDataFrame(visa_pd) # Convert to Spark dataframe

In [75]:
visa_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["visa"]))

### Step 2.b. Explore I94 Immigration Data

In [76]:
immigration = pd.read_sas("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat", "sas7bdat", encoding="ISO-8859-1")

In [77]:
pd.options.display.max_columns = None
immigration.head(15)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,59.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1957.0,10032016,,,,14938460000.0,,WT
1,5.0,2016.0,6.0,135.0,135.0,XXX,20612.0,,,,50.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,Z,,U,,1966.0,10032016,,,,17460060000.0,,WT
2,6.0,2016.0,6.0,213.0,213.0,XXX,20609.0,,,,27.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1989.0,D/S,,,,1679298000.0,,F1
3,7.0,2016.0,6.0,213.0,213.0,XXX,20611.0,,,,23.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1993.0,D/S,,,,1140963000.0,,F1
4,16.0,2016.0,6.0,245.0,245.0,XXX,20632.0,,,,24.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1992.0,D/S,,,,1934535000.0,,F1
5,19.0,2016.0,6.0,254.0,276.0,XXX,20612.0,,,,21.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1995.0,D/S,,,,1148758000.0,,F1
6,27.0,2016.0,6.0,343.0,343.0,XXX,20611.0,,,,32.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1984.0,D/S,,,,1152545000.0,,F1
7,33.0,2016.0,6.0,582.0,582.0,XXX,20612.0,,,,18.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,T,,U,,1998.0,D/S,,,,1150900000.0,,F2
8,38.0,2016.0,6.0,687.0,687.0,XXX,20623.0,,,,19.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,U,,U,,1997.0,06172018,,,,35753780000.0,,E2
9,39.0,2016.0,6.0,694.0,694.0,XXX,20611.0,,,,20.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,,,,U,,U,,1996.0,04162017,,,,1142101000.0,,M1


In [78]:
# Have a look at all the columns
immigration.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'validres', 'delete_days', 'delete_mexl', 'delete_dup', 'delete_visa',
       'delete_recdup', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [79]:
# Have a quick look at different statistics of the table
immigration.describe()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,arrdate,i94mode,depdate,i94bir,i94visa,count,validres,delete_days,delete_mexl,delete_dup,delete_visa,delete_recdup,biryear,admnum
count,3574989.0,3574989.0,3574989.0,3574469.0,3574989.0,3574989.0,3513802.0,3287918.0,3574350.0,3574989.0,3574989.0,3574989.0,3574989.0,3574989.0,3574989.0,3574989.0,3574989.0,3574350.0,3574989.0
mean,3258526.0,2016.0,6.0,316.2554,314.8681,20620.85,1.07024,20635.85,40.30389,1.88548,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1975.696,68911720000.0
std,1888572.0,0.0,0.0,209.0522,207.1219,8.78243,0.4343662,19.79676,18.10871,0.3806378,0.0,0.0,0.0,0.0,0.0,0.0,0.0,18.10871,28634610000.0
min,4.0,2016.0,6.0,101.0,101.0,20606.0,1.0,18804.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1900.0,0.0
25%,1625393.0,2016.0,6.0,135.0,135.0,20613.0,1.0,20623.0,27.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1962.0,61634650000.0
50%,3275808.0,2016.0,6.0,245.0,245.0,20621.0,1.0,20632.0,40.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1976.0,62593420000.0
75%,4949151.0,2016.0,6.0,525.0,516.0,20629.0,1.0,20643.0,54.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1989.0,98573180000.0
max,6432838.0,2016.0,6.0,999.0,760.0,20635.0,9.0,20745.0,116.0,3.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,2016.0,100000000000.0


In [23]:
immigration_spark = spark.read.format('com.github.saurfang.sas.spark').load("../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat")

In [24]:
fact_immigration = immigration_spark.distinct().withColumn("immigration_id", monotonically_increasing_id())
# Show the 5 top rows
fact_immigration.show(n=5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|immigration_id|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+
|245.0|2016.0|   6.0| 103.0| 103.0|    NYC|20606.0|    1.0|     NY|20613.0| 

In [25]:
immigration_reshaped = fact_immigration.withColumn("year", col("i94yr").cast("integer"))
immigration_reshaped = fact_immigration.withColumnRenamed("i94addr", "state_code")
immigration_reshaped = immigration_reshaped.withColumn("month", col("i94mon").cast("integer"))
immigration_reshaped = immigration_reshaped.withColumn("city_code", col("i94cit").cast("integer"))
immigration_reshaped = immigration_reshaped.withColumn("origin_country_code", col("i94res").cast("integer"))
immigration_reshaped = immigration_reshaped.withColumnRenamed("i94port", "port_code")
immigration_reshaped = immigration_reshaped.withColumn("data_base_sas", to_date(lit("01/01/1960"), "MM/dd/yyyy"))
immigration_reshaped = immigration_reshaped.withColumn("arrival_date", expr("date_add(data_base_sas, arrdate)"))
immigration_reshaped = immigration_reshaped.withColumn("mode_code", col("i94mode").cast("integer"))
immigration_reshaped = immigration_reshaped.withColumn("departure_date", expr("date_add(data_base_sas, depdate)"))
immigration_reshaped = immigration_reshaped.withColumn("age", col("i94bir").cast("integer"))
immigration_reshaped = immigration_reshaped.withColumn("visa_code", col("i94visa").cast("integer"))
immigration_reshaped = immigration_reshaped.withColumn("birth_year", col("biryear").cast("integer"))
immigration_reshaped = immigration_reshaped.drop("i94yr", "i94mon", "i94cit", "i94res", "data_base_sas", "arrdate", "i94mode", "depdate", "i94bir", "i94visa", "biryear")

In [26]:
immigration_reshaped = immigration_reshaped.drop("dtadfile")

In [27]:
# Show the 5 top rows
immigration_reshaped.show(n=5)

+-----+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+
|cicid|port_code|state_code|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|visapost|occup|entdepa|entdepd|entdepu|matflag| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|immigration_id|month|city_code|origin_country_code|arrival_date|mode_code|departure_date|age|visa_code|birth_year|
+-----+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+
|245.0|      NYC

As I couldn't find any description of the columns validres, delete_days, delete_mexl, delete_dup, delete_visa, delete_recdup, entdepa, entdepd, entdepu, insnum, I will now explore these columns to understand what they stand for and whether they have any significance and I should retain them or not.

In [28]:
immigration_reshaped.select("validres").distinct().show()

+--------+
|validres|
+--------+
|     1.0|
+--------+



In [29]:
immigration_reshaped.select("delete_days").distinct().show()

+-----------+
|delete_days|
+-----------+
|        0.0|
+-----------+



In [30]:
immigration_reshaped.select("delete_mexl").distinct().show()

+-----------+
|delete_mexl|
+-----------+
|        0.0|
+-----------+



In [31]:
immigration_reshaped.select("delete_dup").distinct().show()

+----------+
|delete_dup|
+----------+
|       0.0|
+----------+



In [32]:
immigration_reshaped.select("delete_visa").distinct().show()

+-----------+
|delete_visa|
+-----------+
|        0.0|
+-----------+



In [33]:
immigration_reshaped.select("delete_recdup").distinct().show()

+-------------+
|delete_recdup|
+-------------+
|          0.0|
+-------------+



In [34]:
immigration_reshaped.select("entdepa").distinct().show()

+-------+
|entdepa|
+-------+
|      K|
|      F|
|      Q|
|   null|
|      T|
|      B|
|      M|
|      U|
|      O|
|      J|
|      Z|
|      A|
|      N|
|      R|
|      G|
|      I|
|      P|
|      H|
+-------+



In [35]:
immigration_reshaped.select("entdepd").distinct().show()

+-------+
|entdepd|
+-------+
|      K|
|      Q|
|   null|
|      M|
|      L|
|      V|
|      O|
|      D|
|      J|
|      N|
|      W|
|      R|
|      I|
+-------+



In [36]:
immigration_reshaped.select("entdepu").distinct().show()

+-------+
|entdepu|
+-------+
|   null|
|      U|
+-------+



In [37]:
immigration_reshaped.select("insnum").distinct().show()

+------+
|insnum|
+------+
| 33174|
|  4821|
|  3210|
|  2069|
| 18947|
|  2294|
|  3959|
|  3606|
|  0965|
| 06660|
|    MM|
|  5645|
| 39645|
|  5149|
|  1372|
|  1669|
|  3650|
|  5523|
|  2393|
| 17835|
+------+
only showing top 20 rows



As the columns **validres**, **delete_days**, **delete_mexl**, **delete_dup**, **delete_visa**, **delete_recdup** have only 1 unique value, I decided to delete these columns. 

In [38]:
immigration_reshaped.drop("validres", "delete_days", "delete_mexl", "delete_dup", "delete_visa", "delete_recdup")

DataFrame[cicid: double, port_code: string, state_code: string, count: double, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string, immigration_id: bigint, month: int, city_code: int, origin_country_code: int, arrival_date: date, mode_code: int, departure_date: date, age: int, visa_code: int, birth_year: int]

Further as the columns **entdepa**, **entdepd**, **entdepu** and **insnum** are not found in other tables, I decided to delete these columns as well as they have little chance to be included or considered for further analysis based on this table.

In [39]:
immigration_reshaped.drop("entdepa", "entdepd", "entdepu", "insnum")

DataFrame[cicid: double, port_code: string, state_code: string, count: double, validres: double, delete_days: double, delete_mexl: double, delete_dup: double, delete_visa: double, delete_recdup: double, visapost: string, occup: string, matflag: string, dtaddto: string, gender: string, airline: string, admnum: double, fltno: string, visatype: string, immigration_id: bigint, month: int, city_code: int, origin_country_code: int, arrival_date: date, mode_code: int, departure_date: date, age: int, visa_code: int, birth_year: int]

In [40]:
# Show the 5 top rows
immigration_reshaped.show(n=5)

+-----+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+
|cicid|port_code|state_code|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|visapost|occup|entdepa|entdepd|entdepu|matflag| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|immigration_id|month|city_code|origin_country_code|arrival_date|mode_code|departure_date|age|visa_code|birth_year|
+-----+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+
|245.0|      NYC

Now, write data as parquet files to S3 bucket.

In [41]:
immigration_reshaped.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["immigration"]))

### Step 2.c. Explore world temperature data

In [101]:
file_loc = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_data = pd.read_csv(file_loc)

In [102]:
# Look at the first 5 rows
temp_data.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [103]:
# Subset to USA
temp_data = temp_data[temp_data['Country'] == 'United States']

In [104]:
# Now, extract year and month from the first column dt
temp_data['dt'] = pd.to_datetime(temp_data['dt'])
temp_data['year'] = temp_data['dt'].apply(lambda t: t.year)
temp_data['month'] = temp_data['dt'].apply(lambda t: t.month)
temp_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,1820,5


In [105]:
# Changing City and Country column values to match the upper case values stored in cities.csv and state.csv
temp_data['City'] = temp_data['City'].str.upper()
temp_data['Country'] = temp_data['Country'].str.upper()

In [106]:
temp_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year,month
47555,1820-01-01,2.101,3.217,ABILENE,UNITED STATES,32.95N,100.53W,1820,1
47556,1820-02-01,6.926,2.853,ABILENE,UNITED STATES,32.95N,100.53W,1820,2
47557,1820-03-01,10.767,2.395,ABILENE,UNITED STATES,32.95N,100.53W,1820,3
47558,1820-04-01,17.989,2.202,ABILENE,UNITED STATES,32.95N,100.53W,1820,4
47559,1820-05-01,21.809,2.036,ABILENE,UNITED STATES,32.95N,100.53W,1820,5


In [107]:
# Write the data to the directory
temp_data.to_csv("temperature.csv", index = False)

In [108]:
temp_data.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude', 'year', 'month'],
      dtype='object')

Now, write data as parquet files to S3 bucket.

In [109]:
temp_spark = spark.createDataFrame(temp_data) # Convert to Spark dataframe
temp_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["temperature"]))

### Step 2.d. Explore U.S. City Demographic Data 

In [110]:
demographic = pd.read_csv('us-cities-demographics.csv', delimiter=';')
demographic.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [111]:
# Changing City and State column values to match the upper case values stored in cities.csv and state.csv
demographic['City'] = demographic['City'].str.upper()
demographic['State'] = demographic['State'].str.upper()

In [112]:
# Write the data to the directory
demographic.to_csv("demographic.csv", index = False)

In [113]:
demographic.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [114]:
demographic_spark = spark.createDataFrame(demographic) # Convert to Spark dataframe

In [115]:
demographic_spark = demographic_spark.withColumnRenamed("Median Age", "med_age")
demographic_spark = demographic_spark.withColumnRenamed("Male Population", "male_pop")
demographic_spark = demographic_spark.withColumnRenamed("Female Population", "female_pop")
demographic_spark = demographic_spark.withColumnRenamed("Total Population", "total_pop")
demographic_spark = demographic_spark.withColumnRenamed("Number of Veterans", "no_veterans")
demographic_spark = demographic_spark.withColumnRenamed("Average Household Size", "average_household_size")
demographic_spark = demographic_spark.withColumnRenamed("State Code", "state_code")

In [116]:
demographic_spark = demographic_spark.distinct().withColumn("demographic_id", monotonically_increasing_id())

In [117]:
# Show the 5 top rows
demographic_spark.show(n=5)

+--------------+----------+-------+--------+----------+---------+-----------+------------+----------------------+----------+--------------------+------+--------------+
|          City|     State|med_age|male_pop|female_pop|total_pop|no_veterans|Foreign-born|average_household_size|state_code|                Race| Count|demographic_id|
+--------------+----------+-------+--------+----------+---------+-----------+------------+----------------------+----------+--------------------+------+--------------+
|URBAN HONOLULU|    HAWAII|   41.4|176807.0|  175959.0|   352766|    23213.0|    101312.0|                  2.69|        HI|               Asian|240978|             0|
|       BOULDER|  COLORADO|   29.0| 56342.0|   51000.0|   107342|     4061.0|     12993.0|                  2.24|        CO|Black or African-...|  1615|             1|
|        DOTHAN|   ALABAMA|   38.9| 32172.0|   35364.0|    67536|     6334.0|      1699.0|                  2.59|        AL|  Hispanic or Latino|  1704|        

Now, write data as parquet files to S3 bucket.

In [118]:
demographic_spark.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["demographic"]))

## Step 3: Define the data model 

I used star schema for data modeling. This consideration was made so that it would be helpful for data warehouse. 

### Fact table

* **immigration_reshaped**: 
This has the following columns

`cicid, port_code, i94addr, count, validres, delete_days, delete_mexl, delete_dup, delete_visa, delete_recdup, dtadfile, visapost, occup, entdepa, entdepd, entdepu, matflag, dtaddto, gender, insnum, airline, admnum, fltno, visatype, immigration_id, year, month, city_code, origin_country_code, arrival_date, mode_code, departure_date, age, visa_code, birth_year`

### Dimension table

* **countries**: 
This has the following columns

`code, country`

* **cities**: 
This has the following columns

`code, city`

* **mode**: 
This has the following columns

`code, mode`

* **state**: 
This has the following columns

`code, state`

* **visa**: 
This has the following columns

`code, state`

* **temperature**: 
This has the following columns

`dt, AverageTemperature, AverageTemperatureUncertainty, City, Country, Latitude, Longitude, year, month`

* **demographic**: 
This has the following columns

`City, State, med_age, male_pop, female_pop, total_pop, no_veterans, Foreign-born, average_household_size, state_code, Race, Count`

![](data_model2.png)

## Step 4: Data pipeline

Then, I uploaded all of these factt tables and dimension tables to S3 bucket. For doing so, I created a dictionary with the path for each of these files to S3 bucket. 

In [12]:
write_to_path = {"immigration": "final_project/immigration.parquet",
                "state": "final_project/state.parquet",
                "city": "final_project/city.parquet",
                "temperature": "final_project/temperature.parquet",
                "demographic": "final_project/demographic.parquet",
                "visa": "final_project/visa.parquet",
                "countries": "final_project/countries.parquet",
                "mode": "final_project/mode.parquet"}

I then write all of these data to the following S3 bucket.

In [35]:
output_data = "s3a://aws-logs-608251643021-us-west-2/elasticmapreduce/"

For example, to write the immigration data to S3 address as saved in the output_data variable, running the following command will do this. 

In [None]:
#immigration_reshaped.write.mode("overwrite").parquet("{}{}".format(output_data, write_to_path["immigration"]))

All of the above steps are put in one file in **etl.py**, so that by just running this one file one can upload all the 7 dimension and 1 fact tables to S3 as parquet files.

## Step 5: Data quality check

For doing this, I will check that no table uploaded to the S3 is empty.

### Check immigration data

In [130]:
immig_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["immigration"]))

In [110]:
# Get row count
rows = immig_spark.count()
print(f"Immigration has count : {rows}")

Immigration has count : 18113


In [111]:
rows

18113

### Check state data

In [135]:
state_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["state"]))
# Get row count
rows = state_spark.count()
print(f"State has count : {rows}")

State has count : 55


### Check city data

In [84]:
city_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["city"]))
# Get row count
rows = city_spark.count()
print(f"City has count : {rows}")

City has count : 660


### Check temperature data

In [86]:
temp_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["temperature"]))
# Get row count
rows = temp_spark.count()
print(f"temperature has count : {rows}")

temperature has count : 687289


### Check demographic data

In [95]:
demo_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["demographic"]))
# Get row count
rows = demo_spark.count()
print(f"demographic has count : {rows}")

demographic has count : 2891


### Check visa data

In [96]:
visa_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["visa"]))
# Get row count
rows = visa_spark.count()
print(f"visa has count : {rows}")

visa has count : 3


### Check countries data

In [97]:
countries_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["countries"]))
# Get row count
rows = countries_spark.count()
print(f"countries has count : {rows}")

countries has count : 289


### Check mode data

In [98]:
mode_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["mode"]))
# Get row count
rows = mode_spark.count()
print(f"mode has count : {rows}")

mode has count : 4


As not a single table downloaded from S3 has 0 count, we can say that our table have been successfully uploaded to S3 as parquet files.

The same logic has been implemented in the Validator class.

First call the Validator class.

In [14]:
from check_data_quality import DataValidator

In [15]:
validator = DataValidator(spark, output_data, write_to_path)

* Does immigration table pass the check?

In [150]:
result = validator.check_immigration()
print("The table immigration passes the test, True or False???    {}".format(result))

The table immigration passes the test, True or Fale???    True


* Does state table pass the check?

In [127]:
result = validator.check_state()
print("The table state passes the test, True or False???    {}".format(result))

The table state passes the test, True or Fale???    True


* Does city table pass the check?

In [128]:
result = validator.check_city()
print("The table city passes the test, True or False???    {}".format(result))

The table city passes the test, True or Fale???    True


* Does temperature table pass the check?

In [129]:
result = validator.check_temperature()
print("The table temperature passes the test, True or False???    {}".format(result))

The table temperature passes the test, True or Fale???    True


* Does temperature demographic pass the check?

In [16]:
result = validator.check_demographic()
print("The table demographic passes the test, True or False???    {}".format(result))

The table demographic passes the test, True or False???    True


* Does table visa pass the check?

In [131]:
result = validator.check_visa()
print("The table visa passes the test, True or False???    {}".format(result))

The table visa passes the test, True or Fale???    True


* Does table countries pass the check?

In [134]:
result = validator.check_countries()
print("The table countries passes the test, True or False???    {}".format(result))

The table countries passes the test, True or Fale???    True


* Does table mode pass the check?

In [133]:
result = validator.check_mode()
print("The table mode passes the test, True or False???    {}".format(result))

The table mode passes the test, True or Fale???    True


### Do all the tables pass the test?

In [151]:
result = validator.check_all()
print("All the tables pass the test, True or Fale???    {}".format(result))

All the tables pass the test, True or Fale???    True


### Further check  -- Is the data model used here useful for further work?

We will now check whether different tables can be joined together for immigrants flock more to areas where more people born in other countries live or not. For doing so, I will first group-count foreign-born people by states and then will group-count immigrants by their states. If we see that out of the top 3 states for both group-wise counts, 2 or 3 sates are the same, then we can say that this hypothesis is true. This will also establish the usability of the data model that I have used here.

In [13]:
demo_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["demographic"]))

In [14]:
from pyspark.sql.functions import sum
from pyspark.sql.functions import col
demo_spark = demo_spark.withColumn("Foreign-born", col("Foreign-born").cast("int"))

grouped_df = demo_spark.groupBy("State").agg(sum("Foreign-born").alias("total_foreign_born"))

In [15]:
# Sort the data by the "total_foreign_born" column in descending order
sorted_df = grouped_df.orderBy(grouped_df["total_foreign_born"].desc())

In [16]:
# Show the 5 top rows
sorted_df.show(n=3)

+----------+------------------+
|     State|total_foreign_born|
+----------+------------------+
|CALIFORNIA|          37059662|
|  NEW YORK|          17186873|
|     TEXAS|          14498054|
+----------+------------------+
only showing top 3 rows



So, more or less these states will also appear when we order the total number of immigrants by state if the hypothesis is true.

First add, full state name to immig_spark as immig_spark has the state code and not full names.

In [43]:
immig_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["immigration"]))
state_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["state"]))

In [44]:
# join the two dataframes on the 'code' column
immig_spark = immig_spark.join(state_spark, immig_spark["state_code"] == state_spark["code"], "left")

In [45]:
immig_spark.show(n=5)

+------+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+----+----------+
| cicid|port_code|state_code|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|visapost|occup|entdepa|entdepd|entdepu|matflag| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|immigration_id|month|city_code|origin_country_code|arrival_date|mode_code|departure_date|age|visa_code|birth_year|code|     state|
+------+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---

Now, do the aggregation.

In [50]:
from pyspark.sql.functions import count
# group the imported immigration data from S3 by state, now column state and count the number of rows
state_count = immig_spark.groupBy("state").agg(count("*").alias("total_count"))

# sort the data by total_count in descending order
state_count = state_count.orderBy(state_count["total_count"].desc())

In [51]:
state_count.show(n=3)

+----------+-----------+
|     state|total_count|
+----------+-----------+
|CALIFORNIA|     603181|
|  NEW YORK|     589603|
|   FLORIDA|     584520|
+----------+-----------+
only showing top 3 rows



From here, we can see that the top 2 states are the same for both immigrants and for citizens born in other countries. Thus, our assumption is correct.

**Another approach to the same problem**

The above task can also be accomplished by directly adding **Foreign-born** field from demo_spark and adding it to immig_spark by joining on the field state. But before that, like just what has been done before, first we need to get the full state name to the immigration file and then we can just do the joining.

In [52]:
immig_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["immigration"]))
state_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["state"]))
demo_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["demographic"]))
# join the two dataframes on the 'code' column to get the state column with full names
immig_spark = immig_spark.join(state_spark, immig_spark["state_code"] == state_spark["code"], "left")

In [53]:
immig_spark.show(n=3)

+------+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+----+----------+
| cicid|port_code|state_code|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|visapost|occup|entdepa|entdepd|entdepu|matflag| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|immigration_id|month|city_code|origin_country_code|arrival_date|mode_code|departure_date|age|visa_code|birth_year|code|     state|
+------+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---

As the immig_spark and demo_spark  both has column named state, rename the column "state" of immig_spark to **state_name**.

In [59]:
immig_spark = immig_spark.withColumnRenamed("state", "state_name")

In [81]:
# Join everything from the demography table to immig_spark and get a new table
new_table = immig_spark.join(demo_spark, immig_spark["state_name"] == demo_spark["State"], "left")

In [71]:
new_table.show(n=3)

+-----+---------+----------+-----+--------+-----------+-----------+----------+-----------+-------------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+-----+---------+-------------------+------------+---------+--------------+---+---------+----------+----+----------+-------------+-------+-------+--------+----------+---------+-----------+------------+----------------------+----------+--------------------+-----+--------------+
|cicid|port_code|state_code|count|validres|delete_days|delete_mexl|delete_dup|delete_visa|delete_recdup|visapost|occup|entdepa|entdepd|entdepu|matflag| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|immigration_id|month|city_code|origin_country_code|arrival_date|mode_code|departure_date|age|visa_code|birth_year|code|state_name|         City|  State|med_age|male_pop|female_pop|total_pop|no_veterans|Foreign-born|average_household_size|state_code|                Race|Count|demogr

In [85]:
new_table.columns

['cicid',
 'port_code',
 'state_code',
 'count',
 'validres',
 'delete_days',
 'delete_mexl',
 'delete_dup',
 'delete_visa',
 'delete_recdup',
 'visapost',
 'occup',
 'entdepa',
 'entdepd',
 'entdepu',
 'matflag',
 'dtaddto',
 'gender',
 'insnum',
 'airline',
 'admnum',
 'fltno',
 'visatype',
 'immigration_id',
 'month',
 'city_code',
 'origin_country_code',
 'arrival_date',
 'mode_code',
 'departure_date',
 'age',
 'visa_code',
 'birth_year',
 'code',
 'state_name',
 'City',
 'State',
 'med_age',
 'male_pop',
 'female_pop',
 'total_pop',
 'no_veterans',
 'Foreign-born',
 'average_household_size',
 'state_code',
 'Race',
 'Count',
 'demographic_id']

In [87]:
# new_table.count()

Now, select only the two columns "State" and "Foreign-born".

In [82]:
from pyspark.sql.functions import col
diff_table = new_table.select("state_name")

# This is where error is thrown

In [83]:
diff_table.show(n=3)

Py4JJavaError: An error occurred while calling o1407.showString.
: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:259)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
	at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:216)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:187)
	at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceBatches(ColumnarBatchScan.scala:144)
	at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:83)
	at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.FileSourceScanExec.produce(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
	... 88 more


In [86]:
diff_table.columns

['state_name']

Now, group the data by state and count the number of rows and sum the foreign-born column

In [66]:
from pyspark.sql.functions import count, sum

# group the data by state and count the number of rows and sum the foreign-born column
state_agg = new_table.groupBy("State").agg(count("*").alias("total_count"), sum("Foreign-born").alias("total_foreign_born"))

Now, select only 2 columns

In [88]:
#state_agg.show()

### Validate the tables -- are they formatted in a proper way for joining

In [58]:
from check_data_quality import DataValidator
validator = DataValidator(spark, output_data, write_to_path)

In [17]:
immig_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["immigration"]))
state_spark = spark.read.parquet("{}{}".format(output_data, write_to_path["state"]))

In [18]:
field1 = "state_code"
field2 = "code"

In [None]:
result = validator.validate_left_join(immig_spark, state_spark, field1, field2)

In [None]:
print("The merged table has more than one observation: True or False???    {}".format(result))

## 6. Other considerations

The data should be updated daily as immigrants or visitors come to the airport everday and thus it needs to be updated on a daily basis. But it is also possible to update the data monthly as the SAS dataset is stored for monthly data in this case. Demographic tables could be updated yearly as iut takes some time for survey to conduct and normally surveys at a large scale are conducted on a yearly basis, if not on a 4 or 5 year basis. 

#### Situation 1: The data was increased by 100x

AWS Spark can handle pretty large data. But in case, it crosses its capacity, we can resort to AWS EMR.

#### Situation 2: The data populates a dashboard that must be updated on a daily basis by 7am every day.

Using Airflow, we can easily do this with scheduling.

#### Situation 3: The database needed to be accessed by 100+ people

In this case, we can use Amazon Redshift, or Hive.

## 7. Summary and running the project at a glance

In this project, I used immigration data, demography data, temperature data and along with these some other related data saved in SAS file for monthly data. For a single month, I extracted the data from these  sources and then reshaped them so as to make them more easier for future analysis or exploration by data scientists or for business data analysis and visualization. 

In this case, I used Star schema where the immigration table is the fact table and there are sevcen more dimension tables, namely: state, city, temperature, demographic, visa, countries and mode.

All of these tables were prepared using PySpark and then they were uploaded to S3 bucket as parquet files. To do this, running `python etl.py` will do all the steps and tricks. To work with a  different month's data, one just needs to update the location of the immigration file corresponding to that different month.

To check whether the data is loaded correctly or not, a new class **DataValidator** has been defined in the Python file **check_data_quality.py**. Running the method **check_all()** will give either True or False depending on if all the tables have at least one observations in them in S3 or not. By running different methods corresponding to different tables, one can also check the validity of the table separately.