### Data Cleaning & Pipeline Setup (ELT Approach)

**Objective:**
Establish an efficient pipeline to move raw transactional data from Excel into a PostgreSQL data warehouse for analysis.

**Key Steps:**
1.  **Date Normalization (Python):** Converted the `InvoiceDate` timestamp to a standard date format (`YYYY-MM-DD`) to ensure consistency for daily sales reporting
2.  **Database Connection:** Established a connection to the local PostgreSQL instance using **SQLAlchemy**
3.  **Data Ingestion:** Loaded the raw dataset into a staging table (`raw_retail_transactions`) using chunked processing (`chunksize=10000`) to optimize memory usage during the upload of 500k+ rows

*Note: Null values and negative quantities were intentionally preserved during ingestion to allow for "Data Quality Audit" and cleaning directly within SQL.*

In [80]:
import os
import pandas as pd
import gc # for garbage collection

from sqlalchemy import create_engine
from dotenv import load_dotenv; load_dotenv() #noqa

excel_file_path = os.getenv("REVENUE_DRIVER_DATA")
db_connection_str = os.getenv('DB_CONNECTION_STR')
db_connection = create_engine(db_connection_str)

In [None]:
# create table in sql
%load_ext sql
%config SqlMagic.displaycon = False  # disables the conn string display
%config SqlMagic.feedback = False    # disables the "Done" and row count
%sql {db_connection_str}

In [77]:
%%sql
DROP TABLE IF EXISTS raw_retail_transactions;

CREATE TABLE raw_retail_transactions (
    invoice VARCHAR(20),
    stock_code VARCHAR(20),
    description TEXT,
    quantity INTEGER,
    invoice_date DATE,
    price NUMERIC(10, 2),
    customer_id INTEGER,
    country VARCHAR(50)
);

[]

using Python to import data into SQL

In [6]:
## get sheets name
xls = pd.ExcelFile(excel_file_path)
sheet_names = xls.sheet_names
print("Sheet names:", sheet_names)

Sheet names: ['Year 2009-2010', 'Year 2010-2011']


In [None]:
# iterate and stream each sheet
for sheet in sheet_names:
    print(f"Processing sheet: {sheet}...")

    # read sheet
    df_chunk = pd.read_excel(excel_file_path, sheet_name=sheet)

    # rename cols
    df_chunk.rename(columns={
        'Invoice': 'invoice',
        'StockCode': 'stock_code',
        'Description': 'description',
        'Quantity': 'quantity',
        'InvoiceDate': 'invoice_date',
        'Price': 'price',
        'Customer ID': 'customer_id',
        'Country': 'country'
    }, inplace=True)

    # convert date
    df_chunk['invoice_date'] = pd.to_datetime(df_chunk['invoice_date']).dt.date

    # upload to sql, using 'append' so it adds to the table created by the previous sheet
    df_chunk.to_sql(name='raw_retail_transactions', con=db_connection, if_exists='append', index=False, chunksize=10000)

    print(f"Sheet {sheet} uploaded ({len(df_chunk)} rows).")

    # free memory using gc
    del df_chunk
    gc.collect()

print("All sheets processed successfully.")

Processing sheet: Year 2009-2010...
Sheet Year 2009-2010 uploaded (525461 rows).
Processing sheet: Year 2010-2011...
Sheet Year 2010-2011 uploaded (541910 rows).
All sheets processed successfully.


In [83]:
# checking tb
%sql select * from raw_retail_transactions limit 5;

invoice,stock_code,description,quantity,invoice_date,price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01,6.95,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01,6.75,13085,United Kingdom
489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01,1.25,13085,United Kingdom
