# ✅ ETL: Load CSV Into PostgreSQL (Clean, Table-by-Table Validation)
This notebook loads each normalized table separately with row‑count validation.

In [16]:
import psycopg2
import pandas as pd

# ✅ PostgreSQL configuration
PG_HOST="localhost"
PG_PORT="5433"
PG_USER="postgres"
PG_PASSWORD="admin"
PG_DB="robot_vacuum"

# ✅ Load CSV
df = pd.read_csv("../data/RobotVacuumDepot_MasterData.csv")
df.head()

Unnamed: 0,OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,CustomerZipCode,CustomerAddress,BillingZipCode,BillingAddress,DeliveryStatus,...,ExpectedDeliveryDate,ActualDeliveryDate,PaymentMethod,CardNumber,CardBrand,ReviewID,ReviewRating,ReviewText,ReviewDate,ReviewSentiment
0,1-17000001,10/21/2025 18:32,CUST6048,Zane Anderson,liam.johnson160@inbox.com,60617,"4297 W Belmont Ave, Chicago, IL 60613",60617,"4297 W Belmont Ave, Chicago, IL 60613",Delivered,...,10/26/2025 17:32,10/26/2025 20:32,Credit,6192-7888-5489-8568,MasterCard,,,,10/29/2025 21:32,
1,1-17000002,10/22/2025 9:26,CUST8667,Frank Walker,frank.walker825@example.com,60659,"7592 N Lincoln Ave, Chicago, IL 60659",60659,"7592 N Lincoln Ave, Chicago, IL 60659",Delayed,...,10/28/2025 0:26,10/29/2025 19:26,Debit,3533-1750-9961-2477,MasterCard,REV002450,3.0,Review for ILIFE V3s Pro (V3s Pro): overall de...,11/6/2025 7:26,Neutral
2,1-17000003,10/22/2025 15:15,CUST9761,Sara Scott,sara.scott987@mail.com,60659,"6176 N Clark St, Chicago, IL 60659",60659,"6176 N Clark St, Chicago, IL 60659",Delivered,...,10/27/2025 11:15,10/27/2025 12:15,Credit,9513-9561-6068-8247,MasterCard,REV000680,5.0,Review for Ecovacs Deebot N10 Plus (N10 Plus):...,10/30/2025 19:15,Positive
3,1-17000004,10/22/2025 10:40,CUST9270,Frank Wright,frank.wright250@mail.com,60613,"1183 W Irving Park Rd, Chicago, IL 60613",60613,"1183 W Irving Park Rd, Chicago, IL 60613",Delayed,...,10/27/2025 2:40,10/29/2025 8:40,Debit,3240-2362-6128-5343,MasterCard,REV001929,3.0,Review for Ecovacs Deebot N10 (N10): overall d...,11/2/2025 21:40,Neutral
4,1-17000005,10/23/2025 6:04,CUST8253,Qi Johnson,qi.johnson114@mail.com,60616,"5501 W 79th St, Chicago, IL 60616",60616,"5501 W 79th St, Chicago, IL 60616",Delayed,...,10/29/2025 10:04,10/30/2025 9:04,Credit,1427-5110-7866-3118,Visa,REV000137,1.0,Review for Eufy RoboVac X8 (X8): did not meet ...,11/6/2025 14:04,Negative


In [17]:
def get_conn():
    return psycopg2.connect(
        host=PG_HOST, port=PG_PORT,
        user=PG_USER, password=PG_PASSWORD, dbname=PG_DB
    )

In [18]:
# ✅ Columns that must be timestamps
date_cols = [
    'OrderDate',
    'ExpectedDeliveryDate',
    'ActualDeliveryDate'
]

# ✅ Force to string first so floats can't sneak in
for col in date_cols:
    if col in df.columns:
        df[col] = df[col].astype(str)

# ✅ Convert to datetime
for col in date_cols:
    if col in df.columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')

# ✅ Convert NaT → None
for col in date_cols:
    if col in df.columns:
        df[col] = df[col].apply(lambda x: x if pd.notnull(x) else None)


In [19]:
conn = get_conn()
cur = conn.cursor()

print("✅ Connecting to PostgreSQL...")

cur.execute("""
    TRUNCATE TABLE
        robot_vacuum.Review,
        robot_vacuum.WarehouseDistributionCenter,
        robot_vacuum.WarehouseProductStock,
        robot_vacuum."Order",
        robot_vacuum.DistributionCenter,
        robot_vacuum.Warehouse,
        robot_vacuum.Product,
        robot_vacuum.Customer,
        robot_vacuum.Manufacturer
    CASCADE;
""")

conn.commit()
cur.close()
conn.close()

print("✅ All tables truncated successfully!")


✅ Connecting to PostgreSQL...
✅ All tables truncated successfully!


## ✅ Load Manufacturer Table

In [20]:
manufacturers = df[['ManufacturerID','ManufacturerName','LeadTimeDays','ReliabilityScore']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in manufacturers.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.Manufacturer
        (ManufacturerID, ManufacturerName, Country, LeadTimeDays, ReliabilityScore)
        VALUES (%s,%s,NULL,%s,%s)
        ON CONFLICT DO NOTHING""",

        (r['ManufacturerID'], r['ManufacturerName'], r['LeadTimeDays'], r['ReliabilityScore'])
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ Manufacturer rows inserted: {count}")

✅ Manufacturer rows inserted: 8


## ✅ Load Customer Table

In [21]:
customers = df[['CustomerID','CustomerName','CustomerEmail','CustomerAddress',
                 'CustomerZipCode','BillingZipCode','Segment']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in customers.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.Customer
        (CustomerID, CustomerName, CustomerEmail, CustomerStreetAddress,
         CustomerZipCode, BillingZipCode, Segment)
        VALUES (%s,%s,%s,%s,%s,%s,%s)
        ON CONFLICT DO NOTHING""",

        (r['CustomerID'], r['CustomerName'], r['CustomerEmail'], r['CustomerAddress'],
         r['CustomerZipCode'], r['BillingZipCode'], r['Segment'])
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ Customer rows inserted: {count}")

✅ Customer rows inserted: 2568


## ✅ Load Product Table

In [22]:
products = df[['ProductID','ProductName','ModelNumber','ManufacturerID',
                'UnitPrice','ProductDescription']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in products.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.Product
        (ProductID, ProductName, ModelNumber, ManufacturerID, UnitPrice, ProductDescription)
        VALUES (%s,%s,%s,%s,%s,%s)
        ON CONFLICT DO NOTHING""",

        tuple(r)
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ Product rows inserted: {count}")

✅ Product rows inserted: 100


## ✅ Load Warehouse Table

In [23]:
warehouses = df[['WarehouseID','WarehouseStreetAddress','WarehouseZipCode','WarehouseCapacity']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in warehouses.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.Warehouse
        (WarehouseID, WarehouseStreetAddress, WarehouseZipCode, WarehouseCapacity)
        VALUES (%s,%s,%s,%s)
        ON CONFLICT DO NOTHING""",

        tuple(r)
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ Warehouse rows inserted: {count}")

✅ Warehouse rows inserted: 4


## ✅ Load Distribution Center Table

In [24]:
dcs = df[['DistributionCenterID','Region','DistributionCenterStreetAddress',
           'DistributionCenterZipCode','FleetSize']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in dcs.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.DistributionCenter
        (DistributionCenterID, Region, DistributionCenterStreetAddress,
         DistributionCenterZipCode, FleetSize)
        VALUES (%s,%s,%s,%s,%s)
        ON CONFLICT DO NOTHING""",

        tuple(r)
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ Distribution Center rows inserted: {count}")

✅ Distribution Center rows inserted: 17


## ✅ Load Order Table

In [29]:
conn = get_conn()
cur = conn.cursor()
count = 0

def clean_ts(value):
    """Convert any invalid timestamp to None."""
    if value in [None, "NaT", "nan", "None", "", float('nan')]:
        return None
    return value

for _, r in orders.iterrows():

    # ✅ Clean datetime fields AFTER slicing (this is the missing step)
    order_date = clean_ts(r['OrderDate'])
    expected_date = clean_ts(r['ExpectedDeliveryDate'])
    actual_date = clean_ts(r['ActualDeliveryDate'])

    cur.execute(
        """
        INSERT INTO robot_vacuum."Order"
        (OrderID, CustomerID, ProductID, WarehouseID, DistributionCenterID, Quantity,
         UnitPrice, DiscountAmount, PromoCode, TaxAmount, ShippingCost, CostOfGoods,
         TotalAmount, OrderDate, ExpectedDeliveryDate, ActualDeliveryDate, DeliveryStatus,
         PaymentMethod, CardNumber, CardBrand, BillingZipCode, DeliveryStreetAddress,
         DeliveryZipCode, ShippingCarrier)
        VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
        ON CONFLICT DO NOTHING
        """,
        (
            r['OrderID'], r['CustomerID'], r['ProductID'], r['WarehouseID'], r['DistributionCenterID'],
            r['Quantity'], r['UnitPrice'], r['DiscountAmount'], r['PromoCode'], r['TaxAmount'],
            r['ShippingCost'], r['CostOfGoods'], r['TotalAmount'],

            # ✅ CLEANED VALUES
            order_date,
            expected_date,
            actual_date,

            r['DeliveryStatus'],
            r['PaymentMethod'], r['CardNumber'], r['CardBrand'], r['BillingZipCode'],
            r['DeliveryAddress'], r['DeliveryZipCode'], r['ShippingCarrier']
        )
    )
    count += cur.rowcount

conn.commit()
cur.close()
conn.close()

print(f"✅ Order rows inserted: {count}")

InvalidDatetimeFormat: invalid input syntax for type timestamp: "NaT"
LINE 8: ....99,NULL,2974.35,'2025-10-22T18:45:00'::timestamp,'NaT'::tim...
                                                             ^


## ✅ Load WarehouseProductStock

In [None]:
wps = df[['WarehouseID','ProductID','StockLevel','RestockThreshold','LastRestockDate','LastUpdated']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in wps.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.WarehouseProductStock
        VALUES (%s,%s,%s,%s,%s,%s)
        ON CONFLICT DO NOTHING""",

        tuple(r)
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ WarehouseProductStock rows inserted: {count}")

## ✅ Load WarehouseDistributionCenter Mapping

In [None]:
wdc = df[['WarehouseID','DistributionCenterID']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in wdc.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.WarehouseDistributionCenter
        VALUES (%s,%s)
        ON CONFLICT DO NOTHING""",

        tuple(r)
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ WarehouseDistributionCenter rows inserted: {count}")

## ✅ Load Review Table

In [None]:
reviews = df[['ReviewID','OrderID','ProductID','ReviewRating','ReviewDate','ReviewText']].drop_duplicates()

conn = get_conn(); cur = conn.cursor()
count = 0

for _, r in reviews.iterrows():
    cur.execute(
        """INSERT INTO robot_vacuum.Review
        VALUES (%s,%s,%s,%s,%s,%s)
        ON CONFLICT DO NOTHING""",

        tuple(r)
    )
    count += cur.rowcount

conn.commit(); cur.close(); conn.close()
print(f"✅ Review rows inserted: {count}")