# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
from pyspark.sql.types import *
import requests
requests.packages.urllib3.disable_warnings()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, FloatType
import etl

In [2]:

spark = etl.create_spark_session()

23/11/20 08:01:39 WARN Utils: Your hostname, rash-Swift-SF514-55T resolves to a loopback address: 127.0.1.1; using 10.1.1.235 instead (on interface wlp0s20f3)
23/11/20 08:01:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/rash/.ivy2/cache
The jars for the packages stored in: /home/rash/.ivy2/jars
saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8ba87314-3496-4224-911e-529b1e89fe77;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/rash/RMIT-DataEngineer/DAT108U_capstone_project/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found saurfang#spark-sas7bdat;2.0.0-s_2.11 in spark-packages
	found com.epam#parso;2.0.8 in central
	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.11;2.7 in central
	found org.scala-lang#scala-reflect;2.11.8 in central
:: resolution report :: resolve 188ms :: artifacts dl 9ms
	:: modules in use:
	com.epam#parso;2.0.8 from central in [default]
	org.apache.logging.log4j#log4j-api-scala_2.11;2.7 from central in [default]
	org.scala-lang#scala-reflect;2.11.8 from central in [default]
	org.slf4j#slf4j-api;1.7.5 from central in [default]
	saurfang#spark-sas7bdat;2.0.0-s_2.11 from spark-packages in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   5   |   0   |   0   |   0   ||   5 

In [3]:
def num_duplicate_rows(df: pd.DataFrame) -> int:
    return len(df)-len(df.drop_duplicates())

In [4]:
class FileNames():
    def __init__(self) -> None:
        pass
    airport = "data/airport-codes_csv.csv"
    immigration_sample = "data/immigration_data_sample.csv"
    city_demographics = "data/us-cities-demographics.csv"
    temperature = "data/GlobalLandTemperaturesByCity.csv"
    immigration_sas = 'data/immigration/18-83510-I94-Data-2016/*.sas7bdat'
    immigration_sas_labels = "data/I94_SAS_Labels_Descriptions.SAS"

### Step 1: Scope the Project and Gather Data

### Scope

In this project, we outline our objectives and the data sources we plan to utilize. Our primary goal is to perform an Extract, Transform, Load (ETL) process on data collected from various sources. The project encompasses the following key steps:

1. **Data Gathering:** We will acquire data from four distinct sources, which will serve as the foundation for our analytical work.

2. **Data Staging:** After obtaining the raw data, we will load it into staging dataframes for initial examination and preparation.

3. **Data Cleaning:** We will apply data cleaning techniques to ensure the quality and consistency of our datasets.

4. **ETL Processing:** Leveraging a Spark cluster, we will execute the ETL process, including data transformation and integration.

5. **Star Schema Creation:** To enable efficient data analytics, correlation, and ad-hoc reporting, we will construct Fact and Dimension tables in a star schema configuration.

Our final deliverable will be a well-structured star schema that facilitates seamless data analysis and reporting for relevant stakeholders.

## Data Description

To accomplish our project objectives, we will utilize the following datasets:

1. **i94 Immigration Sample Data:**
   - **Source:** This dataset contains sample immigration records and is sourced from the US National Tourism and Trade Office.
   - **Role:** It will serve as our Fact table in the star schema.
   - **Access:** The data can be found at [i94 Immigration Sample Data](https://travel.trade.gov/research/reports/i94/historical/2016.html).

2. **World Temperature Data (world_temperature):**
   - **Source:** This dataset comprises historical temperature data from various cities, spanning from the 1700s to 2013.
   - **Usage:** We will use this dataset to approximate temperature conditions in 2017.
   - **Access:** The data is available at [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

3. **US City Demographic Data:**
   - **Source:** This dataset provides demographic information about US cities, encompassing population statistics, race distribution, household size, and gender demographics.
   - **Role:** It will be used as one of our Dimension tables.
   - **Access:** The data can be accessed at [US City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

4. **Airport Codes:**
   - **Source:** This dataset contains airport codes associated with cities.
   - **Usage:** We will use it for reference and as part of our Dimension tables.
   - **Access:** The data is accessible at [Airport Codes](https://datahub.io/core/airport-codes#data).

### Tools Used

To accomplish the tasks outlined in this project, we will primarily utilize the following tools and technologies:

- Apache Spark: We will employ Spark to perform the ETL process efficiently, especially when dealing with large-scale data.
- Parquet Files: Data will be stored in Parquet format, which is well-suited for columnar storage and works seamlessly with Spark.
- SQL and DataFrame operations: These techniques will be applied for data cleaning, transformation, and integration.

With these tools and datasets in place, we aim to create a robust star schema that empowers data analysis and reporting endeavors.

### World Temperature Data

In [5]:
temperature_df = pd.read_csv(FileNames.temperature)
print("Selecting US data")
temperature_df = temperature_df.loc[temperature_df['Country'] == "United States"]
print(f"Number of rows = {temperature_df.shape[0]}")
temperature_df.head()

Selecting US data
Number of rows = 687289


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


### I94 Immigration Data

In [6]:
immigration_df = pd.read_csv(FileNames.immigration_sample)
print(f"Number of rows = {immigration_df.shape[0]}")
immigration_df.head()

Number of rows = 1000


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


### Airport Code Table

In [7]:
airport_df = pd.read_csv(FileNames.airport)
print(f"Number of rows = {airport_df.shape[0]}")
airport_df.head()

Number of rows = 57421


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### U.S. City Demographic Data

In [8]:
city_df = pd.read_csv(FileNames.city_demographics, delimiter=';')
print(f"Number of rows = {city_df.shape[0]}")
city_df.head()

Number of rows = 2891


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
1,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
2,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
3,Hampton,Virginia,35.5,66214.0,70240.0,136454,19638.0,6204.0,2.48,VA,Black or African-American,70303
4,Lakewood,Colorado,37.7,76013.0,76576.0,152589,9988.0,14169.0,2.29,CO,Hispanic or Latino,33630


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### Exploring World Temperature Data

In [9]:
print(f"Number of duplicate rows = {num_duplicate_rows(temperature_df)}")
print(f"Date column type: {temperature_df['dt'].dtype}")

# Check for missing values
temperature_missing_values = temperature_df.isnull().sum()/temperature_df.shape[0]*100
print("Missing Values (%):")
print(temperature_missing_values)

Number of duplicate rows = 0
Date column type: object
Missing Values (%):
dt                               0.000000
AverageTemperature               3.748787
AverageTemperatureUncertainty    3.748787
City                             0.000000
Country                          0.000000
Latitude                         0.000000
Longitude                        0.000000
dtype: float64


In [10]:
print("converting 'dt' column type.")
temperature_df['dt'] = pd.to_datetime(temperature_df['dt'])
print(f"New date column type: {temperature_df['dt'].dtype}")
temperature_df['year'] = temperature_df['dt'].apply(lambda t: t.year)
temperature_df['month'] = temperature_df['dt'].apply(lambda t: t.month)
print(f"Date range is {temperature_df.year.min()}-{temperature_df.year.max()}")
temperature_df[['dt', 'year', 'month']].head(5)

converting 'dt' column type.
New date column type: datetime64[ns]
Date range is 1743-2013


Unnamed: 0,dt,year,month
47555,1820-01-01,1820,1
47556,1820-02-01,1820,2
47557,1820-03-01,1820,3
47558,1820-04-01,1820,4
47559,1820-05-01,1820,5


### Exploring I94 Immigration Data

In [11]:
fact_immigration_columns_to_select = ['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']
fact_immigration_new_column_names = ['uid', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']
fact_immigration_df = immigration_df[fact_immigration_columns_to_select]
fact_immigration_df.columns = fact_immigration_new_column_names

# Check for missing values
fact_immigration_missing_values = fact_immigration_df.isnull().sum()/fact_immigration_df.shape[0]*100
print("Missing Values (%):")
print(fact_immigration_missing_values)

fact_immigration_df.head()

Missing Values (%):
uid               0.0
year              0.0
month             0.0
city_code         0.0
state_code        5.9
arrive_date       0.0
departure_date    4.9
mode              0.0
visa              0.0
dtype: float64


Unnamed: 0,uid,year,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [12]:
dim_personal_columns_to_select = ['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']
dim_personal_new_column_names = ['uid', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']
dim_personal_df = immigration_df[dim_personal_columns_to_select]
dim_personal_df.columns = dim_personal_new_column_names

# Check for missing values
personal_immigration_missing_values = dim_personal_df.isnull().sum()/dim_personal_df.shape[0] * 100
print("Missing Values (%):")
print(personal_immigration_missing_values)

dim_personal_df.head()


Missing Values (%):
uid                   0.0
citizen_country       0.0
residence_country     0.0
birth_year            0.0
gender               14.1
ins_num              96.5
dtype: float64


Unnamed: 0,uid,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


Although "ins_num" is missing for 96.5% of the data (in the sample set) it is an important number therefore I have decided to keep it.

In [13]:
dim_airline_columns_to_select = ['cicid', 'airline', 'admnum', 'fltno', 'visatype']
dim_airline_new_column_names = ['uid', 'airline', 'admission_num', 'flight_number', 'visa_type']
dim_airline_df = immigration_df[dim_airline_columns_to_select]
dim_airline_df.columns = dim_airline_new_column_names

# Check for missing values
airline_immigration_missing_values = dim_airline_df.isnull().sum()
print("Missing Values:")
print(airline_immigration_missing_values)

dim_airline_df.head()

Missing Values:
uid               0
airline          33
admission_num     0
flight_number     8
visa_type         0
dtype: int64


Unnamed: 0,uid,airline,admission_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


### Exploring Airport Code Table

In [14]:
print(f"Number of duplicate rows = {num_duplicate_rows(airport_df)}")
# Check for missing values
airport_missing_values = airport_df.isnull().sum()/airport_df.shape[0] * 100
print("Missing Values (%):")
print(airport_missing_values)

Number of duplicate rows = 0
Missing Values (%):
ident            0.000000
type             0.000000
name             0.000000
elevation_ft    13.606520
continent       49.534143
iso_country      0.428415
iso_region       0.000000
municipality    10.264537
gps_code        27.620557
iata_code       83.934449
local_code      47.702060
coordinates      0.000000
dtype: float64


Based on the missing data values, I have decided to ignore "iata_code" because of 84% missing values and ignore "continent" with 49% missing values since it can be determined using other columns. 

### Exploring U.S. City Demographic Data

In [15]:
print(f"Number of duplicate rows = {num_duplicate_rows(city_df)}")
# Check for missing values
city_missing_values = city_df.isnull().sum()
print("Missing Values:")
print(city_missing_values)


Number of duplicate rows = 0
Missing Values:
City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The conceptual data model for this project is designed to support analytical queries related to immigration data, temperature data, demographic data, and airport data. The data model consists of several dimension tables and a fact table, linked by foreign keys. The model provides a structured and efficient way to store and analyze the data.

##### Dimension Tables:


1. **dim_immigration_personal**: 
This table stores information about individual immigrants, including their unique identifier (uid), citizenship country, residence country, birth year, gender, and INS number.

2. **dim_immigration_airline**: This table contains data about the airlines used by immigrants, with fields such as uid (unique identifier), airline name, admission number, flight number, and visa type.

3. **dim_temperature**: This table stores temperature data, including the date of measurement, average temperature, uncertainty in temperature measurement, city, and country.

4. **dim_demography**: This table includes demographic information for cities, including city name, state name, male population, female population, number of veterans, foreign-born population, and race.

5. **dim_demography_stats**: This table stores additional demographic statistics at the city and state level, including median age and average household size.

6. **dim_airport**: This table stores information about airports, including airport identifier (ident), type, name, elevation in feet, ISO country code, ISO region code, municipality, GPS code, IATA code, local code, and coordinates.

7. **country_code**: This table contains country codes and their corresponding country names, which are used for reference and mapping.

8. **city_code**: This table contains city codes and their corresponding city names, used for reference and mapping.

9. **state_code**: This table contains state codes and their corresponding state names, used for reference and mapping.

##### Fact Table:

1. **fact_immigration**: This fact table records immigration events, with foreign keys linking to various dimensions. It includes a unique identifier (immigration_id), year, month, city code, state code, arrival date, departure date, mode of transportation, and visa type.

#### 3.2 Mapping Out Data Pipelines

The data pipeline process involves extracting data from various sources, transforming it to match the data model, and loading it into the respective dimension and fact tables. Here are the steps necessary to pipeline the data into the chosen data model:

1.    **Process Immigration Data:**
        Read the immigration data from the source.
        Extract relevant columns and perform data wrangling.
        Load the transformed data into the fact_immigration fact table.
        Load relevant columns into the dim_immigration_personal and dim_immigration_airline dimension tables.

2.    **Process Label Descriptions:**
        Read label descriptions to get mappings for country codes, city codes, and state codes.
        Create dimension tables country_code, city_code, and state_code and load the mappings.

3.    **Process Temperature Data:**
        Read temperature data from the source.
        Filter data for the United States.
        Extract relevant columns and perform data wrangling.
        Load the transformed data into the dim_temperature dimension table.

4.    **Process Demography Data:**
        Read demography data from the source.
        Extract relevant columns and perform data wrangling.
        Load the transformed data into the dim_demography dimension table.
        Create the dim_demography_stats dimension table with additional statistics.

5.    **Process Airport Data:**
        Read airport data from the source.
        Extract relevant columns and perform data wrangling.
        Load the transformed data into the dim_airport dimension table.

6.    **Data Quality Checks:**
        Implement data quality checks, such as checking for missing values, duplicate data, and referential integrity between dimension and fact tables.
        Log any issues or discrepancies found during data quality checks.

7.    Data Storage:
        Store the processed and transformed data in Parquet format, partitioned by relevant columns for efficient querying.

8.    **Schedule and Automation:**
        Schedule the ETL pipeline to run periodically to keep the data up to date.
        Implement monitoring and alerting for pipeline failures or data quality issues.

By following these steps, the data pipeline will ensure that the data is cleaned, transformed, and structured according to the defined conceptual data model, making it ready for analytical queries and reporting.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Refere to the **etl.py** file


#### 4.2 Data Quality Checks

To ensure the data pipeline runs as expected and that the data quality is maintained, we will perform a series of data quality checks. These checks are essential for validating the integrity and correctness of the data. The checks we will perform include:

1.    **Integrity Constraints:** We will enforce integrity constraints on the relational database to ensure the quality of the data. These constraints include:
        Primary Key Constraints: Ensure that primary keys in dimension and fact tables are unique and not null. This guarantees the uniqueness of records.
        Foreign Key Constraints: Validate that foreign key relationships between dimension and fact tables exist and are consistent. This ensures referential integrity.

2.    **Data Type Checks:** We will verify that the data types of columns in the database match the expected data types. This ensures that the data is stored correctly and can be queried without data type-related issues.

3.    **Null Value Checks:** We will check for null values in critical columns, such as primary key columns and columns that should not contain null values. If null values are found in unexpected places, it may indicate data quality issues.

4.    **Unique Key Checks:** Ensure that columns with unique constraints, such as unique identifiers, do not have duplicate values. This check helps maintain data integrity.

5.    **Source/Count Checks:** We will perform source and count checks to ensure completeness and data consistency:
        Source Checks: Verify that the data extracted from source systems matches the expected format and schema. Any unexpected changes in source data should trigger an alert.
        Count Checks: Count the number of records in the source data and compare it with the number of records loaded into the data warehouse. This check ensures that no records are lost during the ETL process.

6.    **Data Validation Queries:** Implement SQL queries to validate specific data quality aspects, such as checking for missing values, duplicate records, and outliers in the data. Any issues discovered will be logged for further investigation.

7.    **Log and Alerting:** Set up a logging and alerting system to capture and notify the data engineering team of any data quality issues or pipeline failures. This will allow for prompt remediation of problems as they arise.

By performing these data quality checks, we can ensure that the pipeline runs smoothly, data integrity is maintained, and any issues are identified and addressed promptly. This proactive approach to data quality helps ensure that the data used for analytics and reporting is accurate, reliable, and trustworthy.

In [16]:
immigration_spark = spark.createDataFrame(immigration_df, schema=None)

# Show the inferred schema
immigration_spark.printSchema()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


root
 |-- Unnamed: 0: long (nullable = true)
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: long (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: double (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: double (nullable = true)
 |-- airline: string (nullable = 

In [17]:
# run immigration etl
etl.process_immigration_data(spark, src='', dst='tables/', use_sample_data=True)

                                                                                

In [18]:
temperature_schema = StructType([StructField("dt", StringType(), True)\
                          ,StructField("AverageTemperature", FloatType(), True)\
                          ,StructField("AverageTemperatureUncertainty", FloatType(), True)\
                          ,StructField("City", StringType(), True)\
                          ,StructField("Country", StringType(), True)\
                          ,StructField("Latitude", StringType(), True)\
                          ,StructField("Longitude", StringType(), True)])

temperature_spark = spark.createDataFrame(temperature_df, schema=temperature_schema)

# Show the inferred schema
temperature_spark.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: float (nullable = true)
 |-- AverageTemperatureUncertainty: float (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [19]:
# run temperature etl process
etl.process_temperature_data(spark, src='', dst='tables/')

23/11/20 08:02:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [20]:
# create schema
city_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", FloatType(), True)\
                        ,StructField("Female Population", FloatType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", FloatType(), True)\
                        ,StructField("Foreign-born", FloatType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

city_spark = spark.createDataFrame(city_df, schema=city_schema)

city_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: float (nullable = true)
 |-- Male Population: float (nullable = true)
 |-- Female Population: float (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: float (nullable = true)
 |-- Foreign-born: float (nullable = true)
 |-- Average Household Size: float (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [22]:
# run city demographics etl process
etl.process_demography_data(spark, src='', dst='tables/')

In [21]:
airport_schema = StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])

airport_spark = spark.createDataFrame(airport_df, schema=airport_schema)

airport_spark.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: float (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [23]:
# run airport etl process
etl.process_airport_data(spark, src='', dst='tables/')

#### 4.3 Data dictionary

See **data_dictionary/data_dictionary.txt**

/////////////////////
// Data Dictionary //
/////////////////////

-------
fact_immigration
-------
- cicid -> uid: Unique identifier for each immigration record
- i94yr -> year: 4-digit year of the immigration record
- i94mon -> month: Numeric month of the immigration record
- i94port -> city_code: Port of entry for immigration
- i94addr -> state_code: State of arrival for immigration
- arrdate -> arrive_date: Arrival date in SAS format
- depdate -> departure_date: Departure date in SAS format
- i94mode -> mode: Mode of transportation
- i94visa -> visa: Visa type

-------
dim_immigration_personal
-------
- cicid -> uid: Unique identifier for each immigration personal record
- i94cit -> citizen_country: Citizen country code
- i94res -> residence_country: Residence country code
- biryear -> birth_year: Year of birth
- gender -> gender: Gender of the immigrant
- insnum -> ins_num: INS number

-------
dim_immigration_airline
-------
- cicid -> uid: Unique identifier for each immigration airline record
- airline -> airline: Airline code
- admnum -> admission_num: Admission number
- fltno -> flight_number: Flight number
- visatype -> visa_type: Visa type

-------
dim_temperature
-------
- dt -> date: Date of temperature measurement
- AverageTemperature -> avg_temperature: Average temperature
- AverageTemperatureUncertainty -> avg_temperature_uncertainty: Uncertainty in average temperature
- City -> city: City where temperature was measured
- Country -> country: Country where temperature was measured

-------
dim_demography
-------
- City -> city: City name
- State -> state: State name
- Male Population -> male_population: Male population count
- Female Population -> female_population: Female population count
- Number of Veterans -> num_veterans: Number of veterans
- Foreign-born -> foreign_born: Count of foreign-born residents
- Race -> race: Race

-------
dim_demography_stats
-------
- City -> city: City name
- State -> state: State name
- Median Age -> median_age: Median age of residents
- Average Household Size -> avg_household_size: Average household size

-------
dim_airport
-------
- ident -> ident: Airport identifier
- type -> type: Airport type
- name -> airport_name: Airport name
- elevation_ft -> elevation_ft: Elevation in feet
- iso_country -> iso_country: ISO country code
- iso_region -> iso_region: ISO region code
- municipality -> municipality: Municipality
- gps_code -> gps_code: GPS code
- iata_code -> iata_code: IATA code
- local_code -> local_code: Local code
- coordinates -> coordinates: Coordinates



#### Step 5: Complete Project Write Up
The choice of tools and technologies for this project was based on several factors:

1.    **Apache Spark:** Spark was chosen as the primary data processing framework due to its ability to handle large-scale data processing efficiently. It provides distributed processing capabilities, which are essential for handling large datasets, and it allows for seamless integration with various data sources.

2.    **Python:** Python was chosen as the primary programming language for its ease of use, rich ecosystem of data manipulation libraries (e.g., Pandas), and its compatibility with Spark.

3.    **Amazon Web Services (AWS):** AWS was selected for cloud infrastructure and storage. S3 buckets were used to store both the source and destination data, providing scalability, reliability, and cost-efficiency.

4.    **Data Quality Checks:** Assertions were used for data quality checks within the ETL process to ensure that the data meets predefined criteria and quality standards. These checks help maintain data integrity and reliability.

##### Data Update Frequency:

The frequency at which the data should be updated depends on the specific use case and requirements. In this project, the suggested data update frequency is as follows:

-    **Immigration Data:** Given that immigration data typically arrives in batches, the data can be updated periodically, such as weekly or monthly, depending on the frequency of new immigration records. Frequent updates may not be necessary, as long as the data is refreshed in a timely manner to reflect the latest immigration records.

-    **Label Descriptions:** These reference tables, such as country codes, city codes, and state codes, can be updated less frequently, perhaps on a monthly or quarterly basis, as they are less likely to change frequently.

-    **Temperature Data:** Temperature data can be updated on a daily or weekly basis, depending on the availability and frequency of new temperature measurements. Daily updates may be appropriate if real-time temperature data is required.

-    **Demographic Data:** Demographic data, being relatively stable, can be updated on a quarterly or annual basis, as major demographic changes are less frequent.

##### Approach Under Different Scenarios:

1.    **Data Increased by 100x:**
        In this scenario, the ETL process may experience performance challenges due to increased data volume. To address this, we can consider using a more powerful Spark cluster with additional worker nodes and resources. Additionally, data partitioning and optimization techniques can be employed to enhance processing efficiency.

2.    **Daily Dashboard Updates by 7am:**
        To meet the daily dashboard update requirement, we should implement a scheduled ETL pipeline that runs during off-peak hours to avoid impacting system performance during the day. We can use Apache Airflow or similar tools to schedule and orchestrate the ETL process, ensuring that data is ready by 7am daily.

3.    **Database Access by 100+ People:**
        For concurrent access by a large number of users, we can consider optimizing the database for read-heavy workloads. This may involve data warehousing solutions like Amazon Redshift, which is designed for high-performance querying and can handle a large number of concurrent users efficiently. Additionally, implementing access controls and user management is crucial to ensure data security and integrity.

