# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
!pip install pycountry
import pandas as pd
import pycountry
from datetime import datetime, timedelta
import os, glob
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from utility import get_files, get_name, map_country, is_empty, has_null, data_quality_check, data_type, metrics



In [2]:
root = ["../../", "/data/18-83510-I94-Data-2016/"]

# Source Data Paths
source_immigration_data = "../../data/18-83510-I94-Data-2016/"
source_cities_dem = 'source_data/us-cities-demographics.csv'
source_country = 'source_data/countries.csv'
source_country_mapping = 'source_data/country-mapping.txt'
source_airport = 'source_data/airport.txt'

# Staging Data Paths
staging_immigration_path = 'staging_files/immigration_data/'
staging_airport_path = 'staging_files/airport_data/'
staging_cities_dem_path = 'staging_files/us-cities-demograpy/'
staging_country_path = 'staging_files/country_data/'

# Output Parquet Data Paths
airport_output_path = 'tables/airport/'
country_output_path = 'tables/country/'
passenger_output_path = 'tables/passenger/'
time_output_path = 'tables/time/'
entry_output_path = 'tables/entry/'

In [3]:
def create_spark():
    """
     Create or retrieve a Spark Session
    """

    spark = SparkSession.builder.config("spark.jars.packages",
                                        "saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()
    return spark

spark = create_spark()

In [4]:
# User defined spark functions
countries = map_country(source_country_mapping)
@udf(TimestampType())
def get_timestamp(arrdate):
    return datetime(1960, 1, 1) + timedelta(days=int(arrdate))
@udf(StringType())
def get_country(code):
    code = int(code)
    #print(code)
    return countries[code] if code in countries else "Others"

### Step 1: Project Scope , Data Descriptions and Sources

#### Scope 

- The aim of this project is to build an ETL pipeline that extracts data from the dataset on immigration to the United States, and supplementry airport and country codes datasets. 
    
- Process, transform the extracted datasets and load the data back into a file storage as a set of tables in parquet format.

- The end solution would be a star schema model with one fact table and 4 dimension tables suitable for analytics use cases

- From these tables, further analytics can be carried out

- Tools used: Python, Pandas, Pyspark, Pycountry

#### Description of Data and Sources

* ../..data/18-83510-I94-Data-2016/: US I94 immigration data from 2016 (Jan-Dec).
    * Data Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * I94_SAS_Labels_Descriptions.SAS file contains data dictionary for the dataset.
    * The dataset is divided into files with each file representing the events for each month (jan-dec) in 2016
    * Each file contains about 3M rows
    * Data has 28 columns containing information about airport, airline, countries, cities, time, flight number etc.
    
* source_data/airport.txt: Airport code, city name and city code extracted from the I94_SAS_Labels_Descriptions.SAS file
    * Data Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: Data contains information about different airports code, name and city codes
    * Data has 660 rows and 3 columns rep airport code, airport name and city code.

* source_data/countries.csv: ISO-3166 Country and Dependent Territories Lists with UN Regional Codes
    * Source: https://github.com/lukes/ISO-3166-Countries-with-Regional-Codes
    * Description: Contains data for alpha and numeric country codes, and the UN Statistics data for countries regional, and sub-regional codes
    * Data has 249 rows and 11 columns rep several types of country and regional codes  

* source_data/country-mapping.txt: I94 Country Code extracted from the I94_SAS_Labels_Descriptions.SAS file
    * Source: https://travel.trade.gov/research/reports/i94/historical/2016.html
    * Description: Data contains country names and code
    * Data has 236 rows and 2 columns rep country code and name


* NOTE: two country data sources were used and combined. 
    * The I94 data only contained country code and name and in combination with the ISO-3166 Country data, was used to extract alpha_2, alpha_3 and regional information

### Step 2: Data Exploration

#### Data Quality Issues:
    
- US I94 Immigration data: 
    * Missing and null row values
    * Different data types value in some columns
    
- US I94 Airport data: 
    * Inconsistent data type in columns
    * Data values contains several punctuation marks and white spaces
    
- US I94 Country code data: 
    * Data values contains several punctuation marks and white spaces
    * Invalid and redundant values

- ISO-3166 Country and Dependent Territories Data:
    * Missing and null row values
    * Country codes not consistent with US I94 Data Values

#### Steps in Cleaning Data

- US I94 Immigration data: 
    * Replace all missing and null row values with 'NA' (string) and 0.0 (double)
    * Convert data type to suitable types based on row values
    
- US I94 Airport data: 
    * Convert data type to suitable types based on row values
    * Remove punctuation marks and white spaces
    
- US I94 Country code data: 
    * Convert data type to suitable types based on row values
    * Removed invalid and redundant values

- ISO-3166 Country and Dependent Territories Data:
    * Replace all missing and null row values with 'others'(string) and 0.0(double)
    * Extract data values consistent with US I94 Data

In [5]:
# Cleaning US I94 Immigration data

# create the function

def clean_staging_immigration_data(spark, file_path, target):
    """
    Description: This function can be used to read, clean and copy the immigraion files from the source file_path,
    loads it into the staging directory as parquet files

    Arguments:
        spark: spark session
        file_path: input immigration data file path.
        target: destination directory

    Returns:
        data: spark df
    """

    default_missing_values = {
        'i94mode': 0.0,
        'i94addr': 'NA',
        'depdate': 0.0,
        'i94bir': 'NA',
        'i94visa': 0.0,
        'count': 0.0,
        'dtadfile': 'NA',
        'visapost': 'NA',
        'occup': 'NA',
        'entdepa': 'NA',
        'entdepd': 'NA',
        'entdepu': 'NA',
        'matflag': 'NA',
        'biryear': 0.0,
        'dtaddto': 'NA',
        'gender': 'NA',
        'insnum': 'NA',
        'airline': 'NA',
        'admnum': 0.0,
        'fltno': 'NA',
        'visatype': 'NA'
    }

    df = spark.read.format('com.github.saurfang.sas.spark').load(file_path)
    df = df.na.fill(default_missing_values)
    df.write.mode("overwrite").parquet(target)
    df = spark.read.parquet(target)
    print(f"Cleaning and Staging Completed")
    df.show(5)
    return df

In [55]:
# Clean and stage
directory = get_files(root)[0]
clean_staging_immigration_data(spark, directory, staging_immigration_path)

Cleaning and Staging Completed
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD|   NA|      G|      O|     NA|      M| 1976.0|10292016|     F|    NA|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|  

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

In [58]:
# cleaning US I94 Airport data 
def clean_staging_airport_data(spark, file_path, target):
    """
    Description: This function can be used to read, clean and copy the airport data file_path and
    loads it into the staging directory

    Arguments:
        spark: spark session
        file_path: input data file path.
        target: destination directory

    Returns:
        data: spark df
    """
    print("Staging Started")
    name_arr, code_arr, location_arr = list(), list(), list()
    
    # read airport data
    with open(file_path) as f:
        for line in f.readlines():
            
            # remove punctuations and white spaces
            code, name = line.split('=')
            code = code.strip().strip().replace("'", "")
            name = " ".join(name.strip().replace("'", "").split()).split(',')
            code_arr.append(code)
            
            if len(name) == 2:
                name1 = name[0]
                loc = name[1].strip() 
            else:
                loc = code
                name1 = code
                
            name_arr.append(name1)
            location_arr.append(loc)
            
    data = list(zip(code_arr, name_arr, location_arr))

    df_airport = pd.DataFrame(data, columns=['airport_id', 'airport_name', 'airport_location'])
    df_airport['airport_id'] = df_airport['airport_id'].astype(str).str.replace('\[|\]|\'', '')
    df_airport['airport_name'] = df_airport['airport_name'].astype(str).str.replace('\[|\]|\'', '')
    df_airport['airport_location'] = df_airport['airport_location'].astype(str).str.replace('\[|\]|\'', '')
    #df_airport.head(10)
    
    # save csv
    #df_airport.to_csv(target + 'airport.csv', index=False)

    output = target + 'parquet/'
    print("Spark Processing")
    schema = StructType([
        StructField("airport_id", StringType(), False),
        StructField("airport_name", StringType(), False),
        StructField("airport_location", StringType(), False)
    ])
    df_spark = spark.createDataFrame(df_airport, schema=schema)

    df_spark.write.mode("overwrite").parquet(output)
    df_spark = spark.read.parquet(output)
    df_spark.show(5)
    print(f"Staging Completed ")
    return df_spark

In [59]:
clean_staging_airport_data(spark, source_airport, staging_airport_path)

Staging Started
Spark Processing
+----------+--------------------+----------------+
|airport_id|        airport_name|airport_location|
+----------+--------------------+----------------+
|       ALC|               ALCAN|              AK|
|       ANC|           ANCHORAGE|              AK|
|       BAR|BAKER AAF - BAKER...|              AK|
|       DAC|       DALTONS CACHE|              AK|
|       PIZ|DEW STATION PT LA...|              AK|
+----------+--------------------+----------------+
only showing top 5 rows

Staging Completed 


DataFrame[airport_id: string, airport_name: string, airport_location: string]

In [60]:
# Clean, merge and stage ISO-3166 and US I94 country data
def clean_staging_country_data(spark, path1, path2, target):
    """
    Description: This function can be used to read, clean and copy the country data file and
    loads it into the staging directory

    Arguments:
        spark: spark session
        path1: file path to countries.csv file downloaded from source https://github.com/lukes/ISO-3166-Countries-with-Regional-Codes
        path2: file path to country.mapping.txt file gotten from I94_SAS_Labels_Descriptions
        target: destination directory

    Returns:
        data: country spark df
    """
    print("Staging Started")
    
    # get ISO-3166 data
    df1 = pd.read_csv(path1)
    
    # get US I94 country data
    countries = map_country(path2)
    
    df2 = pd.DataFrame(list(countries.items()), columns=['code', 'name'])
    df2 = df2[df2.name != 'Kosovo']
    df2 = df2[df2.name != 'Zaire']

    name_1, alpha_2, alpha_3, official_name, official_code = list(), list(), list(), list(), list()
    
    # Extract alpha_2, alpha_3, official_name, official_code from ISO-3166 data
    for i, row in df2.iterrows():
        b, c, d, e, = get_name(row['name'])
        alpha_2.append(b)
        alpha_3.append(c)
        official_name.append(d)
        official_code.append(e)
    
    # Add extracted column and row data to US I94 data
    df2['alpha_2'] = alpha_2
    df2['alpha_3'] = alpha_3
    df2['official_name'] = official_name
    df2['official_code'] = official_code

    df2['region'] = df2['alpha_3'].apply(lambda x: df1[df1['alpha-3'] == x].region.values)
    df2['region'] = df2['region'].astype(str).str.replace('\[|\]|\'', '')
    
    # save csv
    #df2.to_csv(target + 'csv/country_processed.csv', index=False)

    output = target + 'parquet/'

    print("Spark Processing")
    
    # set data types
    schema = StructType([
        StructField("code", IntegerType(), False),
        StructField("name", StringType(), False),
        StructField("alpha_2", StringType(), True),
        StructField("alpha_3", StringType(), True),
        StructField("official_name", StringType(), True),
        StructField("official_code", StringType(), True),
        StructField("region", StringType(), True)

    ])
    df_spark = spark.createDataFrame(df2, schema=schema)
    df_spark.write.mode("overwrite").parquet(output)
    df_spark = spark.read.parquet(output)
    df_spark.show(5)
    print(f"Staging Completed ")
    return df_spark

In [61]:
clean_staging_country_data(spark, source_country, source_country_mapping, staging_country_path)

Staging Started
Spark Processing
+----+-----------+-------+-------+--------------------+-------------+------+
|code|       name|alpha_2|alpha_3|       official_name|official_code|region|
+----+-----------+-------+-------+--------------------+-------------+------+
| 236|Afghanistan|     AF|    AFG|Islamic Republic ...|          004|  Asia|
| 101|    Albania|     AL|    ALB| Republic of Albania|          008|Europe|
| 316|    Algeria|     DZ|    DZA|People's Democrat...|          012|Africa|
| 102|    Andorra|     AD|    AND|Principality of A...|          020|Europe|
| 324|     Angola|     AO|    AGO|  Republic of Angola|          024|Africa|
+----+-----------+-------+-------+--------------------+-------------+------+
only showing top 5 rows

Staging Completed 


DataFrame[code: int, name: string, alpha_2: string, alpha_3: string, official_name: string, official_code: string, region: string]

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

I chose a Star Schema model for the US I94 Immigration Data because:

- Easy to design

- The schema design is suitable for the earlier defined use case (Analytics) purposes

- The star chema model contains the following tables

    - Facts Table:
        - entry table
        
    - Dimension Tables:
        - airport table: 
        - country table: 
        - passenger table
        - time table

#### 3.2 Mapping Out Data Pipelines

- ETL pipeline script **etl.py** reads and extracts data from Source Data Paths

- Extracted data is cleaned and loaded in spark parquet files into staging paths

- Spark processes and transforms data into one fact table (entry partitioned by year and month) and four Dimension Tables; country, time(partitioned by year and month), passenger and airport tables with each having the right columns and data types.

- Loads and writes processed tables in parquet form into output directories where the analytics team can further find insights.

- Data quality checks are run at the end for each table the validate tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [6]:
# Airport Data Model
df = spark.read.parquet(staging_airport_path+'parquet/')
df.createOrReplaceTempView("airport")
query = """

    SELECT DISTINCT airport_id, 
                    airport_name,  
                    airport_location as airport_region
    FROM airport
    """
airport_df = spark.sql(query)
airport_df.printSchema()
airport_df.write.mode("overwrite").parquet(airport_output_path)
airport_df = spark.read.parquet(airport_output_path)
airport_df.show(3)

root
 |-- airport_id: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- airport_region: string (nullable = true)

+----------+------------------+--------------+
|airport_id|      airport_name|airport_region|
+----------+------------------+--------------+
|       EPT|EASTPORT MUNICIPAL|            ME|
|       SPA|        ST PAMPILE|            ME|
|       GTF|Collapsed into INT|            MN|
+----------+------------------+--------------+
only showing top 3 rows



In [7]:
# Country Data Model
df = spark.read.parquet(staging_country_path+'parquet/')
df.createOrReplaceTempView("country")
query = """

SELECT DISTINCT code as country_id, 
                name as country_name,  
                alpha_2 , alpha_3, official_name,
                official_code as iso_code, 
                region
FROM country
ORDER BY country_name
"""
country_df = spark.sql(query)
country_df.printSchema()
country_df.write.mode("overwrite").parquet(country_output_path)
country_df = spark.read.parquet(country_output_path)
country_df.show(3)

root
 |-- country_id: integer (nullable = true)
 |-- country_name: string (nullable = true)
 |-- alpha_2: string (nullable = true)
 |-- alpha_3: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- region: string (nullable = true)

+----------+--------------------+-------+-------+--------------------+--------+--------+
|country_id|        country_name|alpha_2|alpha_3|       official_name|iso_code|  region|
+----------+--------------------+-------+-------+--------------------+--------+--------+
|       717|Bonaire, sint eus...|     BQ|    BES|Bonaire, Sint Eus...|     535|Americas|
|       330|Sao tome and prin...|     ST|    STP|Democratic Republ...|     678|  Africa|
|       296|United arab emirates|   null|   null|                null|    null|        |
+----------+--------------------+-------+-------+--------------------+--------+--------+
only showing top 3 rows



In [8]:
spark.udf.register("get_country", get_country)
spark.udf.register("get_timestamp", get_timestamp)

<function __main__.get_timestamp(arrdate)>

In [9]:
# Time Data Model

df = spark.read.parquet(staging_immigration_path).withColumn("time_id", get_timestamp("arrdate"))
df.createOrReplaceTempView("time")
query = """

SELECT DISTINCT  time_id, hour(time_id) as hour, day(time_id) as day,
                 weekofyear(time_id) as week, month(time_id) as month, year(time_id) as year,
                 dayofweek(time_id) weekday

FROM time
ORDER BY time_id
"""
time_df = spark.sql(query)
time_df.printSchema()
time_df.write.mode("overwrite").parquet(time_output_path)
time_df = spark.read.parquet(time_output_path)
time_df.show(3)

root
 |-- time_id: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|            time_id|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2016-04-10 00:00:00|   0| 10|  14|    4|2016|      1|
|2016-04-16 00:00:00|   0| 16|  15|    4|2016|      7|
|2016-04-30 00:00:00|   0| 30|  17|    4|2016|      7|
+-------------------+----+---+----+-----+----+-------+
only showing top 3 rows



In [10]:
# Passenger Data Model

#df = spark.read.parquet(staging_immigration_path).withColumn("time_id", get_timestamp("arrdate"))
df.createOrReplaceTempView("passenger")

query = """

SELECT DISTINCT admnum as passenger_id, string(fltno) as flight_no, airline, i94port as airport, time_id as time_of_arrival,
                get_country(i94cit) as departure_country, gender, i94visa as purpose_of_travel, i94bir as age
FROM passenger
WHERE get_country(i94cit) != 'Others'
ORDER BY passenger_id
"""
passenger_df = spark.sql(query)
passenger_df.printSchema()
passenger_df.write.mode("overwrite").parquet(passenger_output_path)
passenger_df = spark.read.parquet(passenger_output_path)
passenger_df.show(3)

root
 |-- passenger_id: double (nullable = true)
 |-- flight_no: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- time_of_arrival: timestamp (nullable = true)
 |-- departure_country: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- purpose_of_travel: double (nullable = true)
 |-- age: double (nullable = true)

+---------------+---------+-------+-------+-------------------+-----------------+------+-----------------+----+
|   passenger_id|flight_no|airline|airport|    time_of_arrival|departure_country|gender|purpose_of_travel| age|
+---------------+---------+-------+-------+-------------------+-----------------+------+-----------------+----+
|5.9564187033E10|    00657|     FI|    SPM|2016-04-30 00:00:00|          Finland|     F|              1.0|55.0|
|5.9564188933E10|    02083|     AA|    CLT|2016-04-30 00:00:00|      Netherlands|     M|              2.0| 5.0|
|5.9564189833E10|    00615|     FI|    NYC|2016-04

In [11]:
# Entry Data Model
#df = spark.read.parquet(staging_immigration_path).withColumn("time_id", get_timestamp("arrdate"))
df.createOrReplaceTempView("entry")
entry_df = spark.sql(
    """
    select distinct int(cicid) as entry_id,
                    airline as airline_id,
                    string(fltno) as flight_id,
                    i94port as airport_id, 
                    admnum as passenger_id, 
                    get_country(i94cit) as country_id,
                    time_id, 
                    hour(time_id) as hour, 
                    day(time_id) as day,
                    weekofyear(time_id) as week,
                    month(time_id) as month, 
                    year(time_id) as year,
                    dayofweek(time_id) as weekday
    from entry
    ORDER BY time_id ASC
    """
)
entry_df.printSchema()
entry_df.write.mode("overwrite").parquet(entry_output_path)
entry_df = spark.read.parquet(entry_output_path)
entry_df.show(3)

root
 |-- entry_id: integer (nullable = true)
 |-- airline_id: string (nullable = true)
 |-- flight_id: string (nullable = true)
 |-- airport_id: string (nullable = true)
 |-- passenger_id: double (nullable = true)
 |-- country_id: string (nullable = true)
 |-- time_id: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------+----------+---------+----------+---------------+----------+-------------------+----+---+----+-----+----+-------+
|entry_id|airline_id|flight_id|airport_id|   passenger_id|country_id|            time_id|hour|day|week|month|year|weekday|
+--------+----------+---------+----------+---------------+----------+-------------------+----+---+----+-----+----+-------+
| 5417904|        OS|    00089|       NEW|5.9451939333E10|   Austria|2016-04-29 00:00:00|   0| 29|  17|   

#### 4.2 Data Quality Checks
Data quality checks:

 * Check that fact and dimension tables are not empty
 * Check that fact and dimension tables have no null values
 * Check that primary and secondary keys have the correct data type
 

In [23]:
def data_quality_check(df, metric: dict, table):
    """
    Description: This function checks if spark df is empty, null and has wrong data types

    Arguments:
        df: spark data frame
        metric: dict containing:
                query: sql query to check if df is null
                dtype1, dtype2: expected data types in columns (name1, name2)
                name1, name2: columns names

    Returns:
         status: dict => msg: Status, code: 1 or 0
    """
    table_is_empty = is_empty(df)
    print(f"Passed: {table} Table is Not Empty")
    table_has_null = has_null(df, metric["query"])
    print(f"Passed: {table} Table has No Null Values")
    col1_has_wrong_data_type = data_type(df, metric['dtype1'], metric["name1"])
    print(f"Passed: Data type for {metric['name1']} column is correct")
    col2_has_wrong_data_type = data_type(df, metric['dtype2'], metric["name2"])
    print(f"Passed: Data type for {metric['name2']} column is correct")
    results = {table_is_empty, table_has_null, col1_has_wrong_data_type, col2_has_wrong_data_type}

    #status = "Failed" if 1 in results else "Passed"
    if 1 in results:
        status = {
            "msg": "Failed",
            "Code": 1
        }
    else:
        status = {
            "msg": "Passed",
            "Code": 0
        }

    if table_is_empty:
        print("Err: Table is Empty")
    elif table_has_null:
        print("Err: Table has Null")
    elif col1_has_wrong_data_type:
        print(f"Err: {metrics['name1']} should be {metrics['dtype1']} type")
    elif col2_has_wrong_data_type:
        print(f"Err: {metrics['name2']} should be {metrics['dtype2']} type")

    return status

In [24]:
# Perform quality checks here
# Airport Table check
try:
    airport_table_check = data_quality_check(airport_df,metrics(["airport_id", "airport_name"], ["string", "string"]), table="Airport")
    print("Airport table check Passed")
except Exception as e:
    print("Airport table check failed")
    print(e)


Passed: Airport Table is Not Empty
Passed: Airport Table has No Null Values
Passed: Data type for airport_id column is correct
Passed: Data type for airport_name column is correct
Airport table check Passed


In [25]:
# Country Table check
try:
    country_table_check = data_quality_check(country_df, metrics(["country_id", "country_name"], ["int", "string"]),table="Counry")
    print("Country table check Passed")
except Exception as e:
    print("Country table check failed")
    print(e)


Passed: Counry Table is Not Empty
Passed: Counry Table has No Null Values
Passed: Data type for country_id column is correct
Passed: Data type for country_name column is correct
Country table check Passed


In [26]:
# Passenger Table check
try:
    passenger_table_check = data_quality_check(passenger_df,metrics(["passenger_id", "flight_no"], ["double", "string"]), table="Passenger")
    print("Passenger table check Passed")
except Exception as e:
    print("Passenger table check failed")
    print(e)


Passed: Passenger Table is Not Empty
Passed: Passenger Table has No Null Values
Passed: Data type for passenger_id column is correct
Passed: Data type for flight_no column is correct
Passenger table check Passed


In [27]:
# Time Table check
try:
    time_table_check = data_quality_check(time_df, metrics(["time_id", "hour"], ["timestamp", "int"]), table="Time")
    print("Time table check Passed")
except Exception as e:
    print("Time table check failed")
    print(e)


Passed: Time Table is Not Empty
Passed: Time Table has No Null Values
Passed: Data type for time_id column is correct
Passed: Data type for hour column is correct
Time table check Passed


In [28]:
# Entry Table check
try:
    entry_table_check = data_quality_check(entry_df, metrics(["time_id", "entry_id"], ["timestamp", "int"]), table="Entry")
    print("Entry table check Passed")
except Exception as e:
    print("Entry table check failed")
    print(e)

Passed: Entry Table is Not Empty
Passed: Entry Table has No Null Values
Passed: Data type for time_id column is correct
Passed: Data type for entry_id column is correct
Entry table check Passed


In [29]:

all_checks = {airport_table_check['Code'], country_table_check['Code'], passenger_table_check['Code'],
                  time_table_check['Code'], entry_table_check['Code']}

if 1 in all_checks:
    print("Data Quality Failed")
else:
    print("Data Quality check on all tables was successful")


Data Quality check on all tables was successful


#### 4.3 Data dictionary 
The data dictionary can be found in the **data_dictionary.txt** file


#### Step 5: Rationale for tools chosen:
- Python: flexible and easy to use
- Pandas: i am comfortable with python and pandas is a python library optimized for data analysis and manipulation
- Spark: Has a python api called pyspark. Also has a dataframe like pandas and allows sql queries. Suitable for data with over 1 million rows
- Local Storage: Ideally AWS S3 and Redshift would be prefered choices for files and data storage. But i used local storage because of the limited amount of data. 


##### How often should ETL scripts be run?
- Based on the use cases and assuming the US I94 Immigration data is made available on monthly, i would propose the ETL script should be run monthly.
   

##### Possible scenarions
- Data is 100x: 
    - Source Data files should be stoted in AWS S3 or other cloud storage
    - Multi node Spark cluster with data stored in a distributed file system like HDFS should be used to enable faster and parallel data processing.
    - Output data (Dimensions and Fact Tables) as parquet files should be stored in AWS s3 or any cheaper cloud storage system for easy access

##### Data populates a dashboard and updated every day 07:00AM:
- The ETL script should only process changes detected in input data and update the dashboard database instead of processing  all data. 

##### DB is accessed by 100+ people:
- Adding more dimension tables to cater for several users use cases
- A cloud data warehouse should be used so that data would always be available to users easily. 

##### Future Work/ Improvements 
- Apache Airflow could be used toautomatically monitor and schedule pipelines
- US-cities-demographics data could be added to show the demography of differeb´nt cities visisted by different countries for example are flights from africa more directed to cities in the us with high african americans? 

