## The Data Governance Co-Pilot Dataset

- A large dataset of 100,000 orders from 2016-2018 on the Olist marketplace in Brazil.
- `olist_customers_dataset.csv` customer master
- `olist_orders_dataset.csv` The main order log
- `olist_order_items_dataset.csv` Links orders to products
- `olist_order_payments_dataset.csv` The transaction value

In [None]:
import pandas as pd
from typing import Tuple, List
from io import StringIO
from psycopg2.extras import execute_batch
from psycopg2 import sql
import psycopg2

### Database Connection

In [None]:
def create_db_connection() -> psycopg2.extensions.connection:
    """Initialize the database with required extensions and tables."""
    conn = psycopg2.connect(
        dbname="admin",
        user="admin",
        password="password",
        host="localhost",
        port="5432",
    )
    return conn

### Dataset Imports and Exploration

In [2]:
customer_master = pd.read_csv("./dataset/olist_customers_dataset.csv")

In [3]:
# Check columns and shape
print("Columns:", customer_master.columns.tolist())
print("Shape:", customer_master.shape)
print("Sample row:")
print(customer_master.iloc[0].to_dict())

Columns: ['customer_id', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state']
Shape: (99441, 5)
Sample row:
{'customer_id': '06b8999e2fba1a1fbc88172c00ba8bc7', 'customer_unique_id': '861eff4711a542e4b93843c6dd7febb0', 'customer_zip_code_prefix': 14409, 'customer_city': 'franca', 'customer_state': 'SP'}


In [4]:
customer_master.head()

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP


In [5]:
customer_master.describe()

Unnamed: 0,customer_zip_code_prefix
count,99441.0
mean,35137.474583
std,29797.938996
min,1003.0
25%,11347.0
50%,24416.0
75%,58900.0
max,99990.0


In [6]:
main_order_log = pd.read_csv("./dataset/olist_orders_dataset.csv")
main_order_log.head()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00


In [7]:
order_to_product = pd.read_csv("./dataset/olist_order_items_dataset.csv")
order_to_product.head()

Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87
3,00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15 10:10:18,12.99,12.79
4,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51,199.9,18.14


In [8]:
order_to_product = pd.read_csv("./dataset/olist_order_items_dataset.csv")
order_to_product.head()
# Load order payments dataset
order_payments = pd.read_csv("./dataset/olist_order_payments_dataset.csv")
print("Payments columns:", order_payments.columns.tolist())
order_payments.head()

Payments columns: ['order_id', 'payment_sequential', 'payment_type', 'payment_installments', 'payment_value']


Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
3,ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
4,42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45


## Queries for Tables and Views

In [None]:
"""CREATE TABLE dim_customer (
    customer_id VARCHAR(50) PRIMARY KEY,
    customer_unique_id VARCHAR(50),
    customer_city VARCHAR(100),
    customer_state CHAR(2)
);"""

"""COMMENT ON TABLE dim_customer IS 'Core customer table. CONTAINS PII (PCI, address). DO NOT USE FOR general BI. Only for auth_service.';"""

In [10]:
def _populate_dim_customer(conn: psycopg2.extensions.connection, df: pd.DataFrame) -> None:
    # Create a string buffer
    # Read the train.csv file into a pandas dataframe, skipping bad lines
    df_copy = df.copy()
    
    # Select only the 4 columns we want
    cols_list: List[str] = ["customer_id", "customer_unique_id", "customer_city", "customer_state"]
    df_copy = df_copy[cols_list]
    
    # Drop rows where any column value is empty
    df_copy = df_copy.dropna()
    df_copy = df_copy.replace({pd.NA: None, pd.NaT: None})
    df_copy = df_copy.where(pd.notnull(df_copy), None)
    print("Starting to populate dim_customer table")
    # Convert DataFrame to csv format in memory
    tuples: List[Tuple] = [tuple(x) for x in df_copy.to_numpy()]
    cols: str = ",".join(cols_list)
    placeholders: str = ",".join(
        ["%s"] * len(cols_list)
    )  # Create the correct number of placeholders
    # Create a parameterized query
    query: str = f"INSERT INTO dim_customer ({cols}) VALUES ({placeholders})"
    cursor: psycopg2.extensions.cursor = conn.cursor()
    try:
        execute_batch(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.Error) as error:
        print(f"Error while inserting data into PostgreSQL: {error}")
        conn.rollback()

    # Commit and close
    conn.commit()
    print("Finished populating dim_customer table")

In [14]:
conn = create_db_connection()
_populate_dim_customer(conn, customer_master)

Starting to populate dim_customer table
Finished populating dim_customer table
Finished populating dim_customer table


In [None]:
"""-- Example for the "clean" sales table
CREATE TABLE fact_orders (
    order_id VARCHAR(50) PRIMARY KEY,
    customer_id VARCHAR(50),
    order_purchase_timestamp TIMESTAMP,
    order_status VARCHAR(20)
);"""

In [15]:
def _populate_fact_orders(conn: psycopg2.extensions.connection, df: pd.DataFrame) -> None:
    # Create a string buffer
    # Read the train.csv file into a pandas dataframe, skipping bad lines
    df_copy = df.copy()
    
    # Select only the 4 columns we want
    cols_list: List[str] = ["order_id", "customer_id", "order_purchase_timestamp", "order_status"]
    df_copy = df_copy[cols_list]
    
    # Drop rows where any column value is empty
    df_copy = df_copy.dropna()
    df_copy = df_copy.replace({pd.NA: None, pd.NaT: None})
    df_copy = df_copy.where(pd.notnull(df_copy), None)
    print("Starting to populate fact_orders table")
    # Convert DataFrame to csv format in memory
    tuples: List[Tuple] = [tuple(x) for x in df_copy.to_numpy()]
    cols: str = ",".join(cols_list)
    placeholders: str = ",".join(
        ["%s"] * len(cols_list)
    )  # Create the correct number of placeholders
    # Create a parameterized query
    query: str = f"INSERT INTO fact_orders ({cols}) VALUES ({placeholders})"
    cursor: psycopg2.extensions.cursor = conn.cursor()
    try:
        execute_batch(cursor, query, tuples)
        conn.commit()
        print("Finished populating fact_orders table")
    except (Exception, psycopg2.Error) as error:
        print(f"Error while inserting data into PostgreSQL: {error}")
        conn.rollback()

In [16]:
_populate_fact_orders(conn, main_order_log)

Starting to populate fact_orders table
Finished populating fact_orders table
Finished populating fact_orders table


In [None]:
"""CREATE TABLE fact_order_payments (
    order_id VARCHAR(50) NOT NULL,
    payment_sequential SMALLINT NOT NULL,
    payment_type VARCHAR(50),
    payment_installments SMALLINT,
    payment_value NUMERIC(10, 2) NOT NULL,

    -- A single order can have multiple payment methods (e.g., part voucher, part credit card)
    -- This composite key ensures each payment line is unique.
    CONSTRAINT pk_fact_order_payments PRIMARY KEY (order_id, payment_sequential),

    -- Links this table back to the main orders table
    CONSTRAINT fk_order
        FOREIGN KEY (order_id)
        REFERENCES fact_orders (order_id),

    -- Ensure payment value isn't negative
    CHECK (payment_value >= 0),
    
    -- Installments should be 1 or more (or NULL if not applicable, like 'boleto')
    CHECK (payment_installments IS NULL OR payment_installments >= 0)
); """

'fact_order_payments'

In [17]:
def _populate_fact_order_payments(conn: psycopg2.extensions.connection, df: pd.DataFrame) -> None:
    # Create a string buffer
    # Read the train.csv file into a pandas dataframe, skipping bad lines
    df_copy = df.copy()
    
    # Select only the 4 columns we want
    cols_list: List[str] = ["order_id", "payment_sequential", "payment_type", "payment_installments", "payment_value"]
    df_copy = df_copy[cols_list]
    
    # Drop rows where any column value is empty
    df_copy = df_copy.dropna()
    df_copy = df_copy.replace({pd.NA: None, pd.NaT: None})
    df_copy = df_copy.where(pd.notnull(df_copy), None)
    print("Starting to populate fact_order_payments table")
    # Convert DataFrame to csv format in memory
    tuples: List[Tuple] = [tuple(x) for x in df_copy.to_numpy()]
    cols: str = ",".join(cols_list)
    placeholders: str = ",".join(
        ["%s"] * len(cols_list)
    )  # Create the correct number of placeholders
    # Create a parameterized query
    query: str = f"INSERT INTO fact_order_payments ({cols}) VALUES ({placeholders})"
    cursor: psycopg2.extensions.cursor = conn.cursor()
    try:
        execute_batch(cursor, query, tuples)
        conn.commit()
        print("Finished populating fact_order_payments table")
    except (Exception, psycopg2.Error) as error:
        print(f"Error while inserting data into PostgreSQL: {error}")
        conn.rollback()

In [18]:
_populate_fact_order_payments(conn, order_payments)

Starting to populate fact_order_payments table
Finished populating fact_order_payments table
Finished populating fact_order_payments table


In [None]:
"""
-- Create the deprecated table (maybe with old data)
CREATE TABLE customer_master_DEPRECATED AS
SELECT * FROM dim_customer TABLESAMPLE BERNOULLI (50); -- Just get 50% of rows

-- Create a confusingly named view
CREATE OR REPLACE VIEW sales_rpt_v2 AS
SELECT * FROM fact_orders WHERE order_purchase_timestamp < '2018-01-01';

-- Create the "deprecated" view that has old data
CREATE OR REPLACE VIEW v_cust_ltv_agg_DEPRECATED AS
SELECT
    t1.customer_unique_id,
    SUM(t3.payment_value) as total_revenue,
    TO_CHAR(MIN(t2.order_purchase_timestamp), 'YYYY-MM-DD') AS first_purchase_date,
    TO_CHAR(MAX(t2.order_purchase_timestamp), 'YYYY-MM-DD') AS last_purchase_date
FROM
    dim_customer AS t1
JOIN
    fact_orders AS t2 ON t1.customer_id = t2.customer_id
JOIN
    fact_order_payments AS t3 ON t2.order_id = t3.order_id
WHERE
    t2.order_status = 'delivered'
    -- deprecation is coming from this line
    AND t2.order_purchase_timestamp < '2018-01-01'
GROUP BY
    t1.customer_unique_id;
"""

"""COMMENT ON VIEW v_cust_ltv_agg_DEPRECATED IS 
'[DEPRECATED] Old LTV calculation. Only includes data before 2018. DEPRECATED as of Q3 2024. Do not use for new reporting. Use v_rpt_customer_ltv_certified instead.';"""

In [None]:
"""CREATE OR REPLACE VIEW v_rpt_customer_ltv_certified AS
SELECT
    c.customer_unique_id,
    c.customer_city,
    c.customer_state,

    -- Core LTV Metrics
    SUM(p.payment_value) AS total_revenue_ltv,
    COUNT(DISTINCT o.order_id) AS total_orders,
    AVG(p.payment_value) AS average_order_value,

    -- Customer Lifecycle Metrics (Converted to YYYY-MM-DD strings)
    TO_CHAR(MIN(o.order_purchase_timestamp), 'YYYY-MM-DD') AS first_purchase_date,
    TO_CHAR(MAX(o.order_purchase_timestamp), 'YYYY-MM-DD') AS last_purchase_date

FROM
    dim_customer AS c
JOIN
    fact_orders AS o ON c.customer_id = o.customer_id
JOIN
    fact_order_payments AS p ON o.order_id = p.order_id

WHERE
    o.order_status = 'delivered' -- Finance standard: only count completed, paid-for orders

GROUP BY
    c.customer_unique_id,
    c.customer_city,
    c.customer_state;"""

"""COMMENT ON VIEW v_rpt_customer_ltv_certified IS 
'[CERTIFIED] Gold-standard, PII-scrubbed view for all customer LTV reporting. Aggregated daily. Maintained by: Finance BI Team';"""

In [None]:
"""COMMENT ON VIEW public.v_cust_ltv_agg_DEPRECATED IS 'DEPRECATED as of Q3 2024. Inaccurate. Use v_rpt_customer_ltv_certified.'
"""