In [61]:
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import os
from dotenv import load_dotenv

In [70]:
load_dotenv(dotenv_path="/Users/mayahbosworth/ae_takehome/.env")

DATABASE = os.getenv("DB_NAME")
USER = os.getenv("DB_USER")
PASSWORD = os.getenv("DB_PASSWORD")
HOST = os.getenv("DB_HOST")
PORT = os.getenv("DB_PORT")
DATABASE_URL = os.getenv("DATABASE_URL")

try:
    with psycopg2.connect(
        database=DATABASE,
        user=USER,
        password=PASSWORD,
        host=HOST,
        port=PORT
    ) as conn:
        print("Database connection successful.")
except Exception as e:
    print(f"Error connecting to the database: {e}")

raw_customers_path = '/Users/mayahbosworth/Desktop/ae_takehome/csv/customers.csv'
raw_orders_path = '/Users/mayahbosworth/Desktop/ae_takehome/csv/orders.csv'
raw_products_path = '/Users/mayahbosworth/Desktop/ae_takehome/csv/products.csv'


customers = pd.read_csv(raw_customers_path, encoding='utf-8')
orders = pd.read_csv(raw_orders_path, encoding='utf-16le')
products = pd.read_csv(raw_products_path, encoding='utf-16le')

Database connection successful.


In [48]:
query = """
CREATE TABLE IF NOT EXISTS raw_customers (
    customer_id TEXT,
    customer_name TEXT,
    region TEXT,
    segment TEXT,
    irrelevant_col_1 TEXT,
    irrelevant_col_2 TEXT,
    irrelevant_col_3 TEXT
);

CREATE TABLE IF NOT EXISTS raw_orders (
    order_id TEXT,
    customer_id TEXT,
    order_date TEXT,
    total_amount TEXT,
    product_id TEXT,
    irrelevant_col_1 TEXT,
    irrelevant_col_2 TEXT,
    irrelevant_col_3 TEXT
);

CREATE TABLE IF NOT EXISTS raw_products (
    product_id TEXT,
    product_name TEXT,
    category TEXT,
    irrelevant_col_1 TEXT,
    irrelevant_col_2 TEXT,
    irrelevant_col_3 TEXT
);
"""

In [None]:
try:
    # Connect to the database using SQLAlchemy for easier bulk insert
    engine = create_engine(DATABASE)

    # Load raw_customers
    customers = pd.read_csv(raw_customers_path, encoding='utf-8')
    customers.to_sql('raw_customers', engine, if_exists='append', index=False)

    # Load raw_orders
    orders = pd.read_csv(raw_orders_path, encoding='utf-16le')
    orders.to_sql('raw_orders', engine, if_exists='append', index=False)

    # Load raw_products
    products = pd.read_csv(raw_products_path, encoding='utf-16le')
    products.to_sql('raw_products', engine, if_exists='append', index=False)

    print("Data loaded successfully into raw tables.")
except Exception as e:
    print(f"An error occurred while loading data: {e}")

Data loaded successfully into raw tables.


In [50]:
query = r"""

-- Understand raw_customers ----------------------------------------------------------

-- 1,200 TOTAL RECORDS
SELECT COUNT(*) FROM raw_customers;

-- 700 CLEAN RECORDS
SELECT DISTINCT COUNT(*) FROM raw_customers WHERE customer_id IS NOT NULL;

-- 643 RECORDS WITH NAMES
SELECT COUNT(*) FROM raw_customers WHERE customer_name IS NOT NULL AND customer_name <> '';

-- MISSING CUSTOMER_ID 999 AND 998
SELECT COUNT(*) FROM raw_customers WHERE customer_id = '999' OR customer_id = '998';

SELECT COUNT(*) FROM raw_customers WHERE customer_name IS NULL OR customer_name = '';

-- CUSTOMER_ID 4 IS MISSING ONLY NAME
SELECT * FROM raw_customers WHERE customer_id = '4';

SELECT * FROM raw_customers WHERE segment IS NULL;

-- Understand raw_orders ----------------------------------------------------------

-- 12,000 TOTAL RECORDS
SELECT COUNT(*) FROM raw_orders;

-- 10,000 CLEAN RECORDS
SELECT DISTINCT COUNT(order_id) FROM raw_orders WHERE order_id IS NOT NULL;

-- CUSTOMER_ID 999 AND 998 IN RAW_ORDERS
SELECT * FROM raw_orders WHERE customer_id = '4';

-- Understand raw_products ----------------------------------------------------------

-- 1,950 TOTAL RECORDS
SELECT COUNT(*) FROM raw_products;

-- 1,150 CLEAN RECORDS
SELECT DISTINCT COUNT(*) FROM raw_products WHERE product_id IS NOT NULL;

-- CREATE DIM TABLES ----------------------------------------------------------

-- Create dim_customer table
CREATE TABLE IF NOT EXISTS dim_customer (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(255),
    region VARCHAR(100),
    segment VARCHAR(100)
);

INSERT INTO dim_customer (customer_id, customer_name, region, segment)
SELECT
    CAST(REGEXP_REPLACE(customer_id::TEXT, '\.0$', '') AS INT) AS customer_id,
    customer_name,
    LOWER(region) AS region,
    segment
FROM raw_customers
WHERE customer_id IS NOT NULL
  AND CAST(customer_id AS TEXT) ~ '^[0-9]+(\.0)?$';

-- 700 TOTAL RECORDS
SELECT DISTINCT COUNT(*) FROM dim_customer;

SELECT * FROM dim_customer;

-- 643 RECORDS WITH NAMES
SELECT DISTINCT COUNT(*) FROM dim_customer WHERE customer_name <> '';

-- CUSTOMER_ID 999 AND 998 NOT IN CUSTOMER TABLE
SELECT COUNT(*) FROM dim_customer WHERE customer_id = 999 OR customer_id = 998;

-- INSERT CUSTOMER_ID 999 AND 998
INSERT INTO dim_customer (customer_id, customer_name, region, segment)
SELECT DISTINCT CAST(customer_id AS INT), 'Unknown', 'Unknown', 'Unknown'
FROM raw_orders
WHERE CAST(customer_id AS INT) NOT IN (SELECT customer_id FROM dim_customer);

SELECT COUNT(*) FROM dim_customer WHERE customer_id IN (998, 999);

-- INSERT JOHN DOE AS CUSTOMER_NAME FOR CUSTOMER_ID 4
UPDATE dim_customer SET customer_name = 'John Doe' WHERE customer_id = 4;

SELECT DISTINCT COUNT(segment) FROM dim_customer;

-- Create dim_product table
CREATE TABLE IF NOT EXISTS dim_product (
    product_id INT PRIMARY KEY,
    category VARCHAR(100)
);

INSERT INTO dim_product (product_id, category)
SELECT
    CAST(REGEXP_REPLACE(product_id::TEXT, '\.0$', '') AS INT) AS product_id,
    category
FROM raw_products
WHERE product_id IS NOT NULL;

-- 1,150 TOTAL RECORDS
SELECT DISTINCT COUNT(*) FROM dim_product;

CREATE TABLE IF NOT EXISTS fact_sales (
    order_id INT PRIMARY KEY,
    customer_id INT NOT NULL,
    product_id INT NOT NULL,
    customer_name VARCHAR(255),
    order_date DATE,
    total_amount NUMERIC(10, 2),
    total_revenue NUMERIC(10, 2),
    FOREIGN KEY (customer_id) REFERENCES dim_customer(customer_id),
    FOREIGN KEY (product_id) REFERENCES dim_product(product_id)
);

WITH cleaned_orders AS (
    SELECT DISTINCT ON (order_id)
        CAST(REGEXP_REPLACE(order_id::TEXT, '\.0$', '') AS INT) AS order_id,
        CAST(REGEXP_REPLACE(customer_id::TEXT, '\.0$', '') AS INT) AS customer_id,
        CAST(REGEXP_REPLACE(product_id::TEXT, '\.0$', '') AS INT) AS product_id,
        CASE 
            WHEN order_date = '2024-02-30' THEN '2024-02-29'
            WHEN order_date = '2023-02-29' THEN '2023-02-28'
            ELSE order_date
        END::DATE AS order_date,
        CAST(total_amount AS NUMERIC(10, 2)) AS total_amount,
        CAST(total_amount AS NUMERIC(10, 2)) AS total_revenue
    FROM raw_orders
    WHERE customer_id IS NOT NULL
      AND product_id IS NOT NULL
    ORDER BY order_id, order_date
)
INSERT INTO fact_sales (order_id, customer_id, product_id, customer_name, order_date, total_amount, total_revenue)
SELECT
    cte.order_id,
    cte.customer_id,
    cte.product_id,
    dc.customer_name,
    cte.order_date,
    cte.total_amount,
    cte.total_revenue
FROM cleaned_orders cte
JOIN dim_customer dc ON cte.customer_id = dc.customer_id
WHERE cte.customer_id IN (SELECT customer_id FROM dim_customer)
  AND cte.product_id IN (SELECT product_id FROM dim_product);

-- 10,000 TOTAL RECORDS
SELECT COUNT(*) FROM fact_sales;

-- 607 RECORDS FOR 999 AND 998
SELECT COUNT(*) FROM fact_sales WHERE customer_id = 999 OR customer_id = 998;

-- 377 RECORDS FOR JOHN DOE
SELECT * FROM fact_sales WHERE customer_id = 4;

"""

In [71]:
try:
    with psycopg2.connect(
        database=DATABASE,
        user=USER,
        password=PASSWORD,
        host=HOST,
        port=PORT
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute(query)
            conn.commit()
            print("SQL transformations executed successfully.")
except Exception as e:
    print(f"An error occurred during transformations: {e}")

An error occurred during transformations: duplicate key value violates unique constraint "dim_customer_pkey"
DETAIL:  Key (customer_id)=(20) already exists.

