## Sales Pipeline ETL & Tableau Dashboard: GearQuest Analysis

The objective of this mini-project is to demonstrate the process of extracting, transforming, and loading (ETL) data from Excel files into a .hyper Tableau extract file. This approach facilitates seamless data integration within Tableau, enabling enhanced business intelligence visualizations and dashboard creation.

## Import Libraries

In [1]:
from decimal import Decimal
import pandas as pd
import numpy as np
from tableauhyperapi import HyperProcess, Connection, Nullability, TableDefinition, TableName, SqlType, Telemetry, Inserter, CreateMode
import psycopg2
from dotenv import load_dotenv
import os

## Load Data

In [2]:
orders_df = pd.read_excel('data/raw/Orders.xlsx')
order_status_df = pd.read_excel('data/raw/Order_Status.xlsx')
products_df = pd.read_excel('data/raw/Products.xlsx')
geo_lookup_df = pd.read_excel('data/raw/Geo_lookup.xlsx')
customers_df = pd.read_excel('data/raw/Customers.xlsx')

## View Tables

In [3]:
tables = [orders_df, order_status_df, products_df, geo_lookup_df, customers_df]

for table in tables:
    print(table.head())

          id customer_id          purchase_ts product_id currency  \
0  ORD089732  CUST003092  2021-01-01 00:06:26    PROD012      USD   
1  ORD019074  CUST043370  2021-01-01 00:08:39    PROD010      USD   
2  ORD019074  CUST043370  2021-01-01 00:08:39    PROD020      USD   
3  ORD019074  CUST043370  2021-01-01 00:08:39    PROD020      USD   
4  ORD237204  CUST025382  2021-01-01 00:14:06    PROD035      EUR   

   local_price  usd_price purchase_platform  
0       909.40       3250        Mobile App  
1       643.01       1430        Mobile App  
2       846.20       3900        Mobile App  
3       120.41       3900           Website  
4      3531.00       3640        Mobile App  
    order_id          purchase_ts              ship_ts          delivery_ts  \
0  ORD013125  2021-02-19 07:22:33  2021-02-20 09:41:13  2021-02-27 10:31:18   
1  ORD013126  2022-04-15 07:57:18  2022-04-20 08:26:16  2022-04-30 13:41:33   
2  ORD013127  2021-01-26 11:58:09  2021-01-27 01:19:50  2021-02-03 19:43

## Drop Duplicates

In [4]:
orders_df.drop_duplicates(inplace=True)
order_status_df.drop_duplicates(inplace=True)
products_df.drop_duplicates(inplace=True)
geo_lookup_df.drop_duplicates(inplace=True)
customers_df.drop_duplicates(inplace=True)

## Combine Tables

### Orders + Order Status Table

In [5]:
orders_df_combined = pd.merge(orders_df.drop('purchase_ts', axis=1), order_status_df, how='left', left_on='id', right_on='order_id').drop('order_id', axis=1).rename(columns={'id': 'order_id'})

In [6]:
orders_df_combined.head()

Unnamed: 0,order_id,customer_id,product_id,currency,local_price,usd_price,purchase_platform,purchase_ts,ship_ts,delivery_ts,refund_ts
0,ORD089732,CUST003092,PROD012,USD,909.4,3250,Mobile App,2021-01-01 00:06:26,2021-01-05 12:26:36,2021-01-09 09:07:20,
1,ORD019074,CUST043370,PROD010,USD,643.01,1430,Mobile App,2021-01-01 00:08:39,2021-01-03 13:23:10,2021-01-09 10:03:21,
2,ORD019074,CUST043370,PROD020,USD,846.2,3900,Mobile App,2021-01-01 00:08:39,2021-01-03 13:23:10,2021-01-09 10:03:21,
3,ORD019074,CUST043370,PROD020,USD,120.41,3900,Website,2021-01-01 00:08:39,2021-01-03 13:23:10,2021-01-09 10:03:21,
4,ORD237204,CUST025382,PROD035,EUR,3531.0,3640,Mobile App,2021-01-01 00:14:06,2021-01-05 21:16:52,2021-01-09 10:11:47,


### Customers + Geo Lookup Tables

In [7]:
customers_df_combined = pd.merge(customers_df, geo_lookup_df, how='left', left_on='country_code', right_on='country').drop('country_code', axis=1).rename(columns={'id': 'customer_id'})

In [8]:
customers_df_combined.head()

Unnamed: 0,customer_id,marketing_channel,account_creation_method,created_on,country,region,currency
0,CUST000001,Email Marketing,Website,2024-09-24 22:19:54,JP,Asia,JPY
1,CUST000002,Instagram Ad,Mobile App,2023-10-24 13:13:21,BR,South America,BRL
2,CUST000003,Search Engine Ads,Website,2023-03-28 08:44:33,KR,Asia,KRW
3,CUST000004,Twitch Ads,Website,2020-04-08 14:22:06,JP,Asia,JPY
4,CUST000005,Search Engine Ads,Website,2021-04-02 09:02:15,DE,Europe,EUR


In [9]:
tables_dict = {'orders': orders_df_combined, 'customers': customers_df_combined, 'products': products_df}

for title, df in tables_dict.items():
    print(f'{title}\n{df.dtypes}\n\n')

orders
order_id              object
customer_id           object
product_id            object
currency              object
local_price          float64
usd_price              int64
purchase_platform     object
purchase_ts           object
ship_ts               object
delivery_ts           object
refund_ts             object
dtype: object


customers
customer_id                object
marketing_channel          object
account_creation_method    object
created_on                 object
country                    object
region                     object
currency                   object
dtype: object


products
product_id      object
product_name    object
category        object
specs           object
price_USD        int64
dtype: object




### Replace `np.nan` with `None` type

In [10]:
# Replace np.nan with None type to correctly insert to databases
for table in [orders_df_combined, customers_df_combined, products_df]:
    table.replace(np.nan, None, inplace=True)

## Export to .hyper

In [69]:
# Create DataFrame Copies
hyperapi_orders_df = orders_df_combined.copy()
hyperapi_customers_df = customers_df_combined.copy()
hyperapi_products_df = products_df.copy()

In [70]:
# Replace types of price columns to avoid errors on hyperapi extraction
hyperapi_orders_df['local_price'] = hyperapi_orders_df['local_price'].apply(lambda x: Decimal(x))
hyperapi_orders_df['usd_price'] = hyperapi_orders_df['usd_price'].apply(lambda x: Decimal(x))

In [71]:
def create_and_insert_table(conn: Connection, table_def: TableDefinition, df: pd.DataFrame):
    """
    Creates a table in Hyper and inserts data from a Pandas DataFrame.

    Args:
        conn: Hyper Connection object.
        table_def: TableDefinition object defining the table schema.
        df: Pandas DataFrame containing the data to insert.
    """
    conn.catalog.create_table(table_def)
    with Inserter(conn, table_def) as inserter:
        rows = []
        for _, row in df.iterrows():
            row_list = row.tolist()
            rows.append(row_list)
        inserter.add_rows(rows)
        inserter.execute()


with HyperProcess(Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
    with Connection(hyper.endpoint, 'data/output/gearquest.hyper', CreateMode.CREATE_AND_REPLACE) as conn:
        conn.catalog.create_schema('gearquest')

        orders_table_def = TableDefinition(
            TableName('gearquest', 'orders'),
            [
                TableDefinition.Column('order_id', SqlType.varchar(255), Nullability.NOT_NULLABLE),
                TableDefinition.Column('customer_id', SqlType.varchar(255), Nullability.NOT_NULLABLE),
                TableDefinition.Column('product_id', SqlType.varchar(255), Nullability.NOT_NULLABLE),
                TableDefinition.Column('currency', SqlType.varchar(255)),
                TableDefinition.Column('local_price', SqlType.numeric(10, 2)),
                TableDefinition.Column('usd_price', SqlType.numeric(10, 2)),
                TableDefinition.Column('purchase_platform', SqlType.varchar(255)),
                TableDefinition.Column('purchase_ts', SqlType.varchar(255)),
                TableDefinition.Column('ship_ts', SqlType.varchar(255)),
                TableDefinition.Column('delivery_ts', SqlType.varchar(255)),
                TableDefinition.Column('refund_ts', SqlType.varchar(255)),
            ],
        )
        customers_table_def = TableDefinition(
            TableName('gearquest', 'customers'),
            [
                TableDefinition.Column('customer_id', SqlType.varchar(255), Nullability.NOT_NULLABLE),
                TableDefinition.Column('marketing_channel', SqlType.varchar(255)),
                TableDefinition.Column('account_creation_method', SqlType.varchar(255)),
                TableDefinition.Column('created_on', SqlType.varchar(255)),
                TableDefinition.Column('country', SqlType.varchar(255)),
                TableDefinition.Column('region', SqlType.varchar(255)),
                TableDefinition.Column('currency', SqlType.varchar(255)),
            ],
        )
        products_table_def = TableDefinition(
            TableName('gearquest', 'products'),
            [
                TableDefinition.Column('product_id', SqlType.varchar(255), Nullability.NOT_NULLABLE),
                TableDefinition.Column('product_name', SqlType.varchar(255)),
                TableDefinition.Column('category', SqlType.varchar(255)),
                TableDefinition.Column('specs', SqlType.varchar(255)),
                TableDefinition.Column('price_USD', SqlType.int()),
            ],
        )

        # Orders Table Insertion
        create_and_insert_table(conn, orders_table_def, hyperapi_orders_df)

        # Customers Table Insertion
        create_and_insert_table(conn, customers_table_def, hyperapi_customers_df)

        # Products Table Insertion
        create_and_insert_table(conn, products_table_def, hyperapi_products_df)


## Export PostgreSQL

In [11]:
# Reset index (multiple orders overlap)
orders_df_combined = orders_df_combined.reset_index()

In [None]:
# Load environment variables
load_dotenv()
pg_host = os.getenv('PG_HOST')
pg_user = os.getenv('PG_USER')
pg_password = os.getenv('PG_PASSWORD')
pg_database = os.getenv('PG_DATABASE')

# Connect to PostgreSQL
conn = psycopg2.connect(host=pg_host, user=pg_user, password=pg_password, database=pg_database)
cursor = conn.cursor()


def sync_dataframe_to_table(cursor, table_name, columns, df):
    """
    Synchronizes a pandas DataFrame with a PostgreSQL table, performing upsert operations.

    Args:
        cursor: A PostgreSQL database cursor object
        table_name (str): Name of the target database table
        columns (list): List of tuples containing column names and their PostgreSQL data types
        df (pandas.DataFrame): DataFrame containing the data to sync

    The function performs the following operations:
    1. Creates the table if it doesn't exist
    2. Identifies primary key columns
    3. Deletes records that exist in the database but not in the DataFrame
    4. Upserts (INSERT/UPDATE) records from the DataFrame into the database
    """
    # Ensure table exists
    column_definitions = ', '.join([f'"{name}" {dtype}' for name, dtype in columns])
    cursor.execute(f'CREATE TABLE IF NOT EXISTS "{table_name}" ({column_definitions})')

    # Identify primary key(s)
    pk_cols = ', '.join([f'"{name}"' for name, dtype in columns if 'PRIMARY KEY' in dtype])  # Extract PK column names

    db_existing_pks = set()  # Initialize as empty set
    try:
        # Fetch existing primary keys from the database
        cursor.execute(f'SELECT {pk_cols} FROM "{table_name}"')
        db_existing_pks = set(cursor.fetchall())
    except psycopg2.errors.UndefinedTable:  # catch exception for when a table does not exist
        print(f'Table {table_name} likely just created, or does not exist.')
    except Exception as e:
        print(f'An unexpected error occurred: {e}')

    # Prepare DataFrame primary keys as a set of tuples
    pk_cols_list = [name for name, dtype in columns if 'PRIMARY KEY' in dtype]  # Extract PK column names
    df_pk_tuples = set([tuple(row[pk] for pk in pk_cols_list) for _, row in df.iterrows()])

    # Delete records that no longer exist in the DataFrame
    to_delete = [pk for pk in db_existing_pks if pk not in df_pk_tuples]
    if to_delete:
        delete_query = f'DELETE FROM "{table_name}" WHERE ({pk_cols}) IN ({", ".join(["%s"] * len(to_delete))})'
        cursor.executemany(delete_query, to_delete)
        print(f'Deleted {len(to_delete)} records from {table_name}.')

    # Prepare the insert query with upsert (ON CONFLICT DO UPDATE)
    placeholders = ', '.join(['%s'] * len(columns))
    column_names = ', '.join([f'"{name}"' for name, _ in columns])
    updates = ', '.join([f'"{name}" = EXCLUDED."{name}"' for name, dtype in columns if 'PRIMARY KEY' not in dtype])

    insert_query = f'''
        INSERT INTO "{table_name}" ({column_names}) VALUES ({placeholders})
        ON CONFLICT ({pk_cols}) DO UPDATE SET {updates}
    '''

    # Prepare rows for insertion
    rows = []
    for _, row in df.iterrows():
        row_list = [row[name] for name, _ in columns]
        rows.append(tuple(row_list))

    # Bulk insert/upsert data
    try:
        cursor.executemany(insert_query, rows)
        print(f'Upserted {len(rows)} records into {table_name}.')
    except Exception as e:
        print(f'Error upserting records into {table_name}: {e}')
        print(f'Problematic query: {insert_query}')
        print(f'First few rows of problematic data: {rows[:5]}')
        return


# Define table schemas and primary keys
table_schemas = {
    'orders': {
        'columns': [
            ('index', 'INTEGER NOT NULL PRIMARY KEY'),  # PRIMARY KEY
            ('order_id', 'TEXT NOT NULL'),
            ('customer_id', 'TEXT NOT NULL'),
            ('product_id', 'TEXT NOT NULL'),
            ('currency', 'TEXT'),
            ('local_price', 'NUMERIC(10, 2)'),
            ('usd_price', 'NUMERIC(10, 2)'),
            ('purchase_platform', 'TEXT'),
            ('purchase_ts', 'TEXT'),
            ('ship_ts', 'TEXT'),
            ('delivery_ts', 'TEXT'),
            ('refund_ts', 'TEXT'),
        ],
        'dataframe': orders_df_combined,
    },
    'customers': {
        'columns': [
            ('customer_id', 'TEXT NOT NULL PRIMARY KEY'),  # PRIMARY KEY
            ('marketing_channel', 'TEXT'),
            ('account_creation_method', 'TEXT'),
            ('created_on', 'TEXT'),
            ('country', 'TEXT'),
            ('region', 'TEXT'),
            ('currency', 'TEXT'),
        ],
        'dataframe': customers_df_combined,
    },
    'products': {
        'columns': [
            ('product_id', 'TEXT NOT NULL PRIMARY KEY'),  # PRIMARY KEY
            ('product_name', 'TEXT'),
            ('category', 'TEXT'),
            ('price_USD', 'INTEGER'),
        ],
        'dataframe': products_df,
    },
}

# Sync each table with the database
for table_name, schema in table_schemas.items():
    sync_dataframe_to_table(cursor=cursor, table_name=table_name, columns=schema['columns'], df=schema['dataframe'])

# Commit changes and close the connection
conn.commit()
cursor.close()
conn.close()


Upserted 1038215 records into orders.
Upserted 50000 records into customers.
Upserted 60 records into products.


In [96]:
orders_df_combined.shape

(1038215, 11)

In [97]:
customers_df_combined.shape

(50000, 7)

In [98]:
products_df.shape

(60, 5)