In [36]:
%%writefile daily_loader.py

import os
import re
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy import text

# === CONFIGURATION ===
DATA_LAKE_PATH = "data_lake"
DB_URL = "sqlite:///my_database.db"
engine = create_engine(DB_URL)

# === DATE EXTRACTORS ===
def extract_date_from_filename(filename):
    match = re.search(r'(\d{8})', filename)
    if match:
        return pd.to_datetime(match.group(1), format='%Y%m%d').strftime("%Y-%m-%d")
    return None

def extract_datekey_from_filename(filename):
    match = re.search(r'(\d{8})', filename)
    if match:
        return match.group(1)
    return None

# === TRUNCATE TABLE ===
def truncate_table(table_name):
    try:
        with engine.begin() as conn:
            conn.execute(text(f"DELETE FROM {table_name}"))  # ✅ wrap with text()
            print(f"✅ Truncated {table_name}")
    except Exception as e:
        print(f"⚠️ Could not truncate {table_name}: {e}")


# === CATEGORIZE FILES ===
cust_mstr_files = []
master_child_files = []
ecom_order_files = []

os.makedirs(DATA_LAKE_PATH, exist_ok=True)

for file in os.listdir(DATA_LAKE_PATH):
    if file.startswith("CUST_MSTR_"):
        cust_mstr_files.append(file)
    elif file.startswith("master_child_export-"):
        master_child_files.append(file)
    elif file.startswith("H_ECOM_ORDER"):
        ecom_order_files.append(file)

# === LOAD CUST_MSTR FILES ===
if cust_mstr_files:
    try:
        truncate_table("CUST_MSTR")
    except Exception as e:
        print("⚠️ Could not truncate CUST_MSTR:", e)

    for file in cust_mstr_files:
        date_val = extract_date_from_filename(file)
        df = pd.read_csv(os.path.join(DATA_LAKE_PATH, file))
        df["Date"] = date_val
        df.to_sql("CUST_MSTR", engine, index=False, if_exists="append")
        print(f"✅ Loaded {file} into CUST_MSTR")

# === LOAD master_child_export FILES ===
if master_child_files:
    try:
        truncate_table("master_child")
    except Exception as e:
        print("⚠️ Could not truncate master_child:", e)

    for file in master_child_files:
        date_val = extract_date_from_filename(file)
        date_key = extract_datekey_from_filename(file)
        df = pd.read_csv(os.path.join(DATA_LAKE_PATH, file))
        df["Date"] = date_val
        df["DateKey"] = date_key
        df.to_sql("master_child", engine, index=False, if_exists="append")
        print(f"✅ Loaded {file} into master_child")

# === LOAD H_ECOM_ORDER FILES ===
if ecom_order_files:
    try:
        truncate_table("H_ECOM_Orders")
    except Exception as e:
        print("⚠️ Could not truncate H_ECOM_Orders:", e)

    for file in ecom_order_files:
        df = pd.read_csv(os.path.join(DATA_LAKE_PATH, file))
        df.to_sql("H_ECOM_Orders", engine, index=False, if_exists="append")
        print(f"✅ Loaded {file} into H_ECOM_Orders")


Overwriting daily_loader.py


In [37]:
os.makedirs("data_lake", exist_ok=True)

with open("data_lake/CUST_MSTR_20191112.csv", "w") as f:
    f.write("CustomerID,Name\n1,Alice\n2,Bob")

with open("data_lake/master_child_export-20191112.csv", "w") as f:
    f.write("ParentID,ChildID\n100,200\n101,201")

with open("data_lake/H_ECOM_ORDER.csv", "w") as f:
    f.write("OrderID,Amount\n5001,250.75\n5002,199.99")


In [38]:

!pip install sqlalchemy




In [39]:
!python daily_loader.py



✅ Truncated CUST_MSTR
✅ Loaded CUST_MSTR_20191112.csv into CUST_MSTR
✅ Truncated master_child
✅ Loaded master_child_export-20191112.csv into master_child
✅ Truncated H_ECOM_Orders
✅ Loaded H_ECOM_ORDER.csv into H_ECOM_Orders
