# Proposal

## Use Case

#### Big Question
- Which shippers are most/least reliable (arrival time delta between estimated and actual)?

#### Sub Questions
- Which are the most reliable shippers per country/region/subregion
- Which carrier companies are the most reliable?
- What, if any, were the reliability changes over the years?
    - How did covid affect reliability metrics of shipment times?
- Which consignees chose their shippers wisest?

# ETL Pipeline

In [5]:
import pandas as pd

from lib.utils import get_id_nums, clean_row, remove_incorrect_codes, data_fetch_url, data_send_url

## Set up countries table

- Get table of countries with alpha-2 code that includes region from repository
    - https://github.com/lukes/ISO-3166-Countries-with-Regional-Codes

In [28]:
countries_cols = ['name', 'alpha-2', 'region', 'sub-region']
countries = pd.read_csv(data_send_url('bronze', 'country_data'), usecols=countries_cols, keep_default_na=False)

extra_codes = pd.DataFrame(
    {'name': ['Czechia', 'Netherland Antilles', 'Germany', 'European Union'], 
    'alpha-2': ['XC', 'AN', 'DD', 'EU'], 
    'region': ['Europe', 'Americas', 'Europe', 'Europe'], 
    'sub-region': ['Eastern Europe', 'Latin America and the Caribbean', 'Western Europe', 'Western Europe']}
    )

extra_codes
countries = pd.concat([countries, extra_codes], ignore_index=True, keys=['alpha-2', 'name'])

# Change countries index column to be alpha-2 values and rename to id
countries.set_index('alpha-2', inplace=True)
countries.index.name = 'id'
countries.sort_index(inplace=True)

# Create country code set with O(1) lookup for table cleaning
alpha_2_set = set(countries.index)

# Create country name dictionary with O(1) lookup for table cleaning
country_dict = {x[1].upper(): x[0] for x in countries.itertuples()}

# Add some statistically siginificant outliers, including common 2 common 'typos'
country_dict['TAIWAN'] = 'TW'
country_dict['SOUTH KOREA'] = 'KR'
country_dict['SHANGHAI CN'] = 'CN'
country_dict['SHANGHAI'] = 'CN'
country_dict['SHANGHAI .'] = 'CN'
country_dict['HONG KONG .'] = 'CN'
country_dict['TAIPEI .'] = 'TW'
country_dict['USA'] = 'US'
country_dict['U.S.A.'] = 'US'

## Shipper

- Read CSVs to DataFrames with only necessary columns

In [8]:
shipper_0 = pd.read_csv(data_fetch_url('bronze', 'shipper', '18', '0'))
shipper_1 = pd.read_csv(data_fetch_url('bronze', 'shipper', '18', '1'))
shipper_2 = pd.read_csv(data_fetch_url('bronze', 'shipper', '19', '0'))
shipper_3 = pd.read_csv(data_fetch_url('bronze', 'shipper', '19', '1'))
shipper_4 = pd.read_csv(data_fetch_url('bronze', 'shipper', '20', '0'))
shipper_5 = pd.read_csv(data_fetch_url('bronze', 'shipper', '20', '1'))

- Concatenate shippers DataFrames to single DataFrame

In [9]:
shippers = pd.concat([shipper_0, shipper_1, shipper_2, shipper_3, shipper_4, shipper_5], ignore_index=True)

In [8]:
# Replace NaN in name with Unknown
shippers['shipper_party_name'].fillna('N/A', inplace=True)

- Clean shipper rows and remove remaining unnecessary columns - (see utils.py for **CLEAN_ROW** function)

In [9]:
shipper_clean = shippers.apply(lambda row: clean_row(row, 'shipper_party', alpha_2_set, country_dict), axis=1)

- **Result:** out of 40,240,366 values


|  | Before Cleaning | After Cleaning |
| - | - | - |
| country_codes #| 9,911,774 | 13,100,153| 
| country_codes %| 24.6% | 32.55% |

- Create shipper id column and map IDs by name - (see utils.py for **GET_ID_NUMS** function)

In [13]:
shipper_id_dict = get_id_nums(shipper_clean['shipper_party_name'])
shipper_clean['shipper_id'] = shipper_clean['shipper_party_name'].map(shipper_id_dict)

- Write cleaned data to CSV (time consuming process -- no mistakes)

In [None]:
shipper_fail_safe_path = data_send_url('fail_safe', 'shipper_clean')
shipper_clean.to_csv(shipper_fail_safe_path, mode='w')

## Consignee

- copy CSVs to DataFrames

In [27]:
consignee_0 = pd.read_csv(data_fetch_url('bronze', 'consignee', '18', '0'))
consignee_1 = pd.read_csv(data_fetch_url('bronze', 'consignee', '18', '1'))
consignee_2 = pd.read_csv(data_fetch_url('bronze', 'consignee', '19', '0'))
consignee_3 = pd.read_csv(data_fetch_url('bronze', 'consignee', '19', '1'))
consignee_4 = pd.read_csv(data_fetch_url('bronze', 'consignee', '20', '0'))
consignee_5 = pd.read_csv(data_fetch_url('bronze', 'consignee', '20', '1'))

- Concatenate to single DataFrame

In [29]:
consignees = pd.concat([consignee_0, consignee_1, consignee_2, consignee_3, consignee_4, consignee_5], ignore_index=True)

- Fill NaN names with N/A

In [9]:
consignees['consignee_name'].fillna('N/A', inplace=True)

- Clean names and country codes, remove unneceessary columns

In [None]:
consignee_clean = consignees.apply(lambda row: clean_row(row, 'consignee', alpha_2_set, country_dict), axis=1)

- Map IDs to consignees by name

In [37]:
consignee_id_dict = get_id_nums(consignee_clean['consignee_name'])
consignee_clean['consignee_id'] = consignee_clean['consignee_name'].map(consignee_id_dict)

- Save expensive task to CSV in case of errors

In [38]:
consignee_fail_safe_path = data_send_url('fail_safe', 'consignee_clean')
consignee_clean.to_csv(consignee_fail_safe_path, mode='w')

## Header

- Read header CSV files

In [None]:
header_0 = pd.read_csv(data_fetch_url('bronze', 'header', '18', '0'))
header_1 = pd.read_csv(data_fetch_url('bronze', 'header', '18', '1'))
header_2 = pd.read_csv(data_fetch_url('bronze', 'header', '18', '2'))
header_3 = pd.read_csv(data_fetch_url('bronze', 'header', '18', '3'))
header_4 = pd.read_csv(data_fetch_url('bronze', 'header', '19', '0'))
header_5 = pd.read_csv(data_fetch_url('bronze', 'header', '19', '1'))
header_6 = pd.read_csv(data_fetch_url('bronze', 'header', '19', '2'))
header_7 = pd.read_csv(data_fetch_url('bronze', 'header', '19', '3'))
header_8 = pd.read_csv(data_fetch_url('bronze', 'header', '20', '0'))
header_9 = pd.read_csv(data_fetch_url('bronze', 'header', '20', '1'))
header_10 = pd.read_csv(data_fetch_url('bronze', 'header', '20', '2'))

- Concat to a single DataFrame

In [3]:
headers = pd.concat([header_0, header_1, header_2, header_3, header_4, header_5, header_6, header_7, header_8, header_9, header_10], ignore_index=True)

### Vessel Table

- Separate out vessel table

In [5]:
vessel_silver_cols = ['vessel_name', 'vessel_country_code', 'carrier_code', 'conveyance_id_qualifier', 'conveyance_id']
vessels = headers[['identifier'] + vessel_silver_cols]
vessels.loc[:, 'vessel_name'] = vessels['vessel_name'].fillna('N/A')
vessels.loc[:, 'vessel_country_code'] = vessels['vessel_country_code'].fillna('N/A')

- Get unique vessels from name + country and assign ids

In [None]:
vessels['temp'] = vessels['vessel_name'] + ' ' + vessels['vessel_country_code']
vessel_id_dict = get_id_nums(vessels['temp'])
vessels['vessel_id'] = vessels['temp'].map(vessel_id_dict)

- Merge with header on identifier to assign vessel_id as foreign key

In [7]:
headers = headers.merge(vessels[['identifier', 'vessel_id']], on=['identifier'], how='inner')

- Remove repeat vessels, drop identifier and temp column, set index as id

In [22]:
vessels.drop_duplicates(subset=['temp'], inplace=True)
vessels.drop(labels=['identifier', 'temp'], axis=1, inplace=True)
vessels.set_index(keys='vessel_id', inplace=True)

- Remove erroneous or unknown country codes

In [30]:
vessels['vessel_country_code'] = vessels['vessel_country_code'].apply(lambda row: remove_incorrect_codes(row, alpha_2_set))

- Remove vessel columns and columns with non-nulls <= 10%

In [9]:
notify = 'secondary_notify_party_'
insig_cols = [
        'port_of_destination', 
        'foreign_port_of_destination', 
        'foreign_port_of_destination_qualifier', 
        'in_bond_entry_type', 
        notify + '2', notify + '3', notify + '4', notify + '5', 
        notify + '6', notify + '7', notify + '8', notify + '9', notify + '10'
    ]

headers.drop(vessel_silver_cols + insig_cols, inplace=True, axis=1)

## Junction Tables

In [42]:
identifier_rename = {'identifier': 'shipment_id'}

### Shipper_Shipment

In [36]:
shipper_shipment = shipper_clean[['identifier', 'shipper_id']].copy()
shipper_shipment.index.name = 'shipper_shipment_id'
shipper_shipment.rename(columns=identifier_rename, inplace=True)

### Consignee_Shipment

In [43]:
consignee_shipment = consignee_clean[['identifier', 'consignee_id']].copy()
consignee_shipment.index.name = 'cosignee_shipment_id'
consignee_shipment.rename(columns=identifier_rename, inplace=True)

## Silver Layer Creation

![](/Users/jesseputnam/cs-learning/skillstorm/project01/silver_erd.png)

- Remove identifier from consignee and shipper, drop duplicates on id, change index to IDs

In [63]:
shipper_clean.drop(['identifier'], axis=1, inplace=True)
shipper_clean.drop_duplicates(subset=['shipper_id'], inplace=True)
shipper_clean.set_index(['shipper_id'], inplace=True)

In [71]:
consignee_clean.drop(['identifier'], axis=1, inplace=True)
consignee_clean.drop_duplicates(subset=['consignee_id'], inplace=True)
consignee_clean.set_index(['consignee_id'], inplace=True)

- Change index on header to identifier

In [None]:
headers.set_index(['identifier'], inplace=True)
headers.index.name = 'shipment_id'

- Upload shipper, shipment, shipper_shipment, consignee_shipment, consignee as as csv for SQL batch loading

In [34]:
headers.to_csv(data_send_url('silver', 'shipment'), mode='w')
vessels.to_csv(data_send_url('silver', 'vessel'), mode='w')
shipper_clean.to_csv(data_send_url('silver', 'shipper'), mode='w')
consignee_clean.to_csv(data_send_url('silver', 'consignee'), mode='w')
shipper_shipment.to_csv(data_send_url('silver', 'shipper_shipment'), mode='w')
consignee_shipment.to_csv(data_send_url('silver', 'consignee_shipment'), mode='w')
countries.to_csv(data_send_url('silver', 'country'), mode='w')

## Gold Layer Creation

![ERD](/Users/jesseputnam/cs-learning/skillstorm/project01/erd.png)

- Prep tables for Gold layer

In [None]:
shipper_clean.rename(columns={"shipper_party_name": "shipper_name"}, inplace=True)

In [16]:
# Choose columns to keep
shipper_cols = ['shipper_party_name', 'country_code']
consignee_cols = ['consignee_name', 'country_code']
header_cols = ['vessel_id', 'estimated_arrival_date','actual_arrival_date']
vessel_cols = ['vessel_name', 'vessel_country_code', 'carrier_code']

- Upload 'denormalized' gold layer tables to folder for virtual mount

In [19]:
countries.to_csv(data_send_url('gold', 'country'), mode='w')
vessels[vessel_cols].to_csv(data_send_url('gold', 'vessel'), mode='w')
headers[header_cols].to_csv(data_send_url('gold', 'shipment'), mode='w')
shipper_clean[shipper_cols].to_csv(data_send_url('gold', 'shipper'), mode='w')
consignee_clean[consignee_cols].to_csv(data_send_url('gold', 'consignee'), mode='w')
shipper_shipment.to_csv(data_send_url('gold', 'shipper_shipment'), mode='w')
consignee_shipment.to_csv(data_send_url('gold', 'consignee_shipment'), mode='w')

### SQL Database 

In [1]:
import pyodbc
import os

In [2]:
server = os.getenv('SERVER')
username = os.getenv('USER')
password = os.getenv('PASSWORD')
driver = os.getenv('DRIVER')

#### SQL Table Creation

In [22]:
create_tables_file = '/Users/jesseputnam/cs-learning/skillstorm/project01/sql/create_db_tables.sql'

try: 
    conn = pyodbc.connect(driver=driver, server=server, uid=username, pwd=password, autocommit=True)
    cursor = conn.cursor()
    print("Connected Successfully!\n")
    
    with open(create_tables_file, 'r') as f:
        sql_statements = f.read().replace('\n', '').strip().split(";")
        sql_statements = [x + ';' for x in sql_statements if x]

        for statement in sql_statements:
            cursor.execute(statement)

            split = statement.split('    ')
            if split[0].split()[0] == 'USE':
                cmd_run = split[0].split()[0]
            else:
                cmd_run = f"{split[1].split()[0]} {split[1].split()[1]}"

            print(f"{cmd_run} statement executed")

except pyodbc.Error as e:
    print(f"Error: {e}")
finally:
    if conn:
        conn.close()
        print('Connection closed\n')
    else:
        print('No connection was established')

Connected Successfully!

USE statement executed
CREATE DATABASE statement executed
USE statement executed
CREATE TABLE statement executed
CREATE TABLE statement executed
CREATE TABLE statement executed
CREATE TABLE statement executed
CREATE TABLE statement executed
CREATE TABLE statement executed
CREATE TABLE statement executed
Connection closed



#### Bulk Insert from Gold Layer

In [23]:
bulk_load_file = '/Users/jesseputnam/cs-learning/skillstorm/project01/sql/load_db_tables.sql'

try: 
    conn = pyodbc.connect(driver=driver, server=server, uid=username, pwd=password, autocommit=True)
    cursor = conn.cursor()
    print("Connected Successfully!\n")

    with open(bulk_load_file, 'r') as f:
        sql_statements = f.read().replace('\n', '').split(";")
        sql_statements = [x for x in sql_statements if x]

        for statement in sql_statements:
            cursor.execute(statement)
            words = statement.split()
            print(f"{words[0]} {words[1]} {words[2] if len(words) > 2 else ''} statement executed")

except pyodbc.Error as e:
    print(f"Error: {e}")
finally:
    if conn:
        conn.close()
        print('Connection closed\n')
    else:
        print('No connection was established\n')

Connected Successfully!

USE ams  statement executed
BULK INSERT dbo.country statement executed
BULK INSERT dbo.vessel statement executed
BULK INSERT dbo.shipment statement executed
BULK INSERT dbo.shipper statement executed
BULK INSERT dbo.consignee statement executed
BULK INSERT dbo.shipper_shipment statement executed
BULK INSERT dbo.consignee_shipment statement executed
Connection closed



## Analysis

In [27]:
try: 
    conn = pyodbc.connect(driver=driver, server=server, uid=username, pwd=password, database='ams', autocommit=True)
    cursor = conn.cursor()
    print("Connected Successfully!\n")

    statement = 'SELECT * FROM country'
    cursor.execute(statement)
    rows = cursor.fetchall()

except pyodbc.Error as e:
    print(f"Error: {e}")
finally:
    if conn:
        conn.close()
        print('Connection closed\n')
    else:
        print('No connection was established\n')

for row in rows:
    print(row)

Connected Successfully!

Connection closed

('AD', 'Andorra', 'Europe', 'Southern Europe')
('AE', 'United Arab Emirates', 'Asia', 'Western Asia')
('AF', 'Afghanistan', 'Asia', 'Southern Asia')
('AG', 'Antigua and Barbuda', 'Americas', 'Latin America and the Caribbean')
('AI', 'Anguilla', 'Americas', 'Latin America and the Caribbean')
('AL', 'Albania', 'Europe', 'Southern Europe')
('AM', 'Armenia', 'Asia', 'Western Asia')
('AN', 'Netherland Antilles', 'Americas', 'Latin America and the Caribbean')
('AO', 'Angola', 'Africa', 'Sub-Saharan Africa')
('AQ', 'Antarctica', None, None)
('AR', 'Argentina', 'Americas', 'Latin America and the Caribbean')
('AS', 'American Samoa', 'Oceania', 'Polynesia')
('AT', 'Austria', 'Europe', 'Western Europe')
('AU', 'Australia', 'Oceania', 'Australia and New Zealand')
('AW', 'Aruba', 'Americas', 'Latin America and the Caribbean')
('AX', '+àland Islands', 'Europe', 'Northern Europe')
('AZ', 'Azerbaijan', 'Asia', 'Western Asia')
('BA', 'Bosnia and Herzegovina',