# `Extract Transform Load`

Now we will extract, transform and load the information into a suitable database.

In [1]:
# CONST
available_data = {
    "purchases": "PurchasesFINAL12312016csv.zip",
    "sales": "SalesFINAL12312016csv.zip"
}

First lets declare our functions that will allow us to extract and load the data

In [2]:
from sqlalchemy import create_engine
from zipfile import ZipFile
from io import BytesIO

def get_first_csv_buffer_from_zip(file_path: str) -> BytesIO:
    """
    Extracts the first CSV file from a ZIP archive at the given 
    file path and returns it as a BytesIO buffer.

    Args:
        file_path (str): Path to the ZIP file on disk.

    Returns:
        BytesIO: Buffer containing the CSV file's bytes, 
        or None if extraction failed.
    """
    try:
        with ZipFile(file_path, 'r') as zip_file:
            # Find the first CSV file in the ZIP
            for filename in zip_file.namelist():
                if filename.lower().endswith('.csv'):
                    with zip_file.open(filename) as csv_file:
                        csv_bytes = csv_file.read()
                        return BytesIO(csv_bytes)
            print("No CSV file found in ZIP archive.", flush=True)
            return None
    except Exception as e:
        print(f"Error extracting CSV from ZIP: {e}", flush=True)
        return None

def get_database_connection():
    return create_engine("postgresql://annie:annieMagicWord@localhost:5432/liquor")

In [3]:
import pandas as pd

csv_buffer = get_first_csv_buffer_from_zip(
    file_path=f"data/{available_data['purchases']}"
)
df_purchases = pd.read_csv(csv_buffer)

In [4]:
df_purchases

Unnamed: 0,InventoryId,Store,Brand,Description,Size,VendorNumber,VendorName,PONumber,PODate,ReceivingDate,InvoiceDate,PayDate,PurchasePrice,Quantity,Dollars,Classification
0,69_MOUNTMEND_8412,69,8412,Tequila Ocho Plata Fresno,750mL,105,ALTAMAR BRANDS LLC,8124,2015-12-21,2016-01-02,2016-01-04,2016-02-16,35.71,6,214.26,1
1,30_CULCHETH_5255,30,5255,TGI Fridays Ultimte Mudslide,1.75L,4466,AMERICAN VINTAGE BEVERAGE,8137,2015-12-22,2016-01-01,2016-01-07,2016-02-21,9.35,4,37.40,1
2,34_PITMERDEN_5215,34,5215,TGI Fridays Long Island Iced,1.75L,4466,AMERICAN VINTAGE BEVERAGE,8137,2015-12-22,2016-01-02,2016-01-07,2016-02-21,9.41,5,47.05,1
3,1_HARDERSFIELD_5255,1,5255,TGI Fridays Ultimte Mudslide,1.75L,4466,AMERICAN VINTAGE BEVERAGE,8137,2015-12-22,2016-01-01,2016-01-07,2016-02-21,9.35,6,56.10,1
4,76_DONCASTER_2034,76,2034,Glendalough Double Barrel,750mL,388,ATLANTIC IMPORTING COMPANY,8169,2015-12-24,2016-01-02,2016-01-09,2016-02-16,21.32,5,106.60,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2372469,49_GARIGILL_22298,49,22298,Zorvino Vyds Sangiovese,750mL,90058,ZORVINO VINEYARDS,13593,2016-12-19,2016-12-28,2017-01-09,2017-02-06,8.06,12,96.72,2
2372470,1_HARDERSFIELD_19556,1,19556,Zorvino Bacca Z Blackberry,750mL,90058,ZORVINO VINEYARDS,13593,2016-12-19,2016-12-27,2017-01-09,2017-02-06,9.39,12,112.68,2
2372471,66_EANVERNESS_22297,66,22297,Zorvino Vyds Pearz,750mL,90058,ZORVINO VINEYARDS,13593,2016-12-19,2016-12-26,2017-01-09,2017-02-06,6.75,12,81.00,2
2372472,69_MOUNTMEND_19557,69,19557,Zorvino Fragole Z Strawberry,750mL,90058,ZORVINO VINEYARDS,13593,2016-12-19,2016-12-26,2017-01-09,2017-02-06,9.39,12,112.68,2


Before storing it into the database, lets drop the ones with InventoryId duplicated, as for this table we just want to know the purchase price

In [5]:
print(df_purchases.shape[0], ": Lenght before droping duplicates")
df_purchases = (df_purchases[["InventoryId", "PurchasePrice"]].drop_duplicates(subset="InventoryId")).rename(columns={
    "InventoryId": "inventory_id",
    "PurchasePrice": "purchase_price"
})
print(df_purchases.shape[0], ": Lenght after droping duplicates")

2372474 : Lenght before droping duplicates
245907 : Lenght after droping duplicates


In [96]:
from sqlalchemy import text
engine = get_database_connection()
with engine.connect() as connection:
    connection.execute(text(
        "DROP MATERIALIZED VIEW IF EXISTS sales_with_costs;"
    ))
    connection.commit()

df_purchases.to_sql(
    name="purchases",
    con=get_database_connection(),
    if_exists="replace",
    index=False
)

907

Now lets load sales data

In [53]:
import pandas as pd

csv_buffer = get_first_csv_buffer_from_zip(
    file_path=f"data/{available_data['sales']}"
)
df_sales = pd.read_csv(csv_buffer)

In [54]:
df_sales.shape[0]

12825363

In [55]:
df_sales.columns

Index(['InventoryId', 'Store', 'Brand', 'Description', 'Size', 'SalesQuantity',
       'SalesDollars', 'SalesPrice', 'SalesDate', 'Volume', 'Classification',
       'ExciseTax', 'VendorNo', 'VendorName'],
      dtype='object')

In [56]:
# As we already dropped duplicated ones, lets use the column
unique_inventory_ids = df_purchases["inventory_id"]
df_sales = df_sales[df_sales["InventoryId"].isin(unique_inventory_ids)]
df_sales_sample = df_sales.sample(1_000_000)

In [57]:
df_sales_sample = df_sales_sample.drop(columns=["Description", "Size", "Classification", "VendorNo"])

In [58]:
df_sales_sample["SalesDate"] = pd.to_datetime(df_sales_sample["SalesDate"], format="ISO8601")

Sales data has over a billion entries, so, for computational limitations, lets take a sample of 1 million sales.

In [59]:
df_sales_sample.rename(
    columns={
        "InventoryId": "inventory_id", 
        "Store": "store", 
        "Brand": "brand", 
        "SalesDate": "sale_date", 
        "SalesPrice": "sale_price",
        "SalesDollars": "sale_amount",
        "Volume": "product_volume", 
        "ExciseTax": "excise_tax", 
        "VendorName": "vendor_name",
        "SalesQuantity": "sale_quantity"
    },
    inplace=True
)

In [61]:
df_sales_sample.shape[0]

1000000

In [66]:
df_sales_sample.to_sql(
    name="sales",
    con=get_database_connection(),
    if_exists="replace",
    index=False
)

1000

In [102]:
from sqlalchemy import text
engine = get_database_connection()
with engine.connect() as connection:
    connection.execute(text("""
DROP MATERIALIZED VIEW IF EXISTS sales_with_costs;
CREATE MATERIALIZED VIEW sales_with_costs AS
SELECT
    s.*,
    p.purchase_price,
    ROUND((s.sale_quantity * p.purchase_price)::numeric, 2) as total_cost,
    ROUND((s.sale_amount - (s.sale_quantity * p.purchase_price))::numeric, 2) as gross_profit,
    ROUND(((s.sale_amount - (s.sale_quantity * p.purchase_price)) - s.excise_tax)::numeric, 2) as net_profit,
    CASE
        WHEN s.sale_amount > 0 
            THEN ROUND(((s.sale_amount - (s.sale_quantity * p.purchase_price)) / s.sale_amount)::numeric, 2)
            ELSE 0
    END AS margin,
    CASE
        WHEN s.sale_amount > 0 
            THEN ROUND((((s.sale_amount - (s.sale_quantity * p.purchase_price)) - s.excise_tax) / s.sale_amount)::numeric, 2)
            ELSE 0
    END AS net_margin
FROM
    sales s
JOIN purchases p
ON s.inventory_id = p.inventory_id;
"""))
    connection.execute(text("""
DROP MATERIALIZED VIEW IF EXISTS product_margins;
CREATE MATERIALIZED VIEW product_margins AS
SELECT
    s.brand,
    s.vendor_name,
    s.sale_price,
    s.excise_tax,
    p.purchase_price,
    ROUND((s.sale_price - p.purchase_price)::numeric, 2) as gross_profit,
    ROUND(((s.sale_price - p.purchase_price) - (s.excise_tax / s.sale_quantity))::numeric, 2) as net_profit,
    CASE
        WHEN s.sale_price > 0 
            THEN ROUND(((s.sale_price - p.purchase_price) / s.sale_price)::numeric, 2)
        ELSE 0
    END AS margin,
    CASE
        WHEN s.sale_price > 0 
            THEN ROUND((((s.sale_price - p.purchase_price) - (s.excise_tax / s.sale_quantity)) / s.sale_price)::numeric, 2)
        ELSE 0
    END AS net_margin
FROM
    sales s
JOIN purchases p
ON s.inventory_id = p.inventory_id;
"""))
    connection.commit()

Now that we have our data stored in the database. Lets find some truth and insights from the data!

This was the manual process, for deployment this will be automated on src/main.py as a ETL process