In [1]:
import pandas as pd
from sqlalchemy import create_engine, text

# DB CONNECTION 

DB_URL = "postgresql+psycopg2://postgres:Random2397!@localhost:5432/ecommerce"
engine = create_engine(DB_URL)

In [2]:
# LOAD CSV FILES WITH PANDAS

#CUSTOMER DATASET
customers_df = pd.read_csv("olist_customers_dataset.csv")

#GEOLOCATION DATASET
geolocations_df = pd.read_csv("olist_geolocation_dataset.csv")
geolocations_df = geolocations_df.drop_duplicates(
    subset=["geolocation_zip_code_prefix", "geolocation_lat", "geolocation_lng"],
    keep="first"
)

#ORDER ITEMS DATASET
order_items_df = pd.read_csv(
    "olist_order_items_dataset.csv",
    parse_dates=["shipping_limit_date"]
)
#Fix NaT in shipping_limit_date
order_items_df["shipping_limit_date"] = pd.to_datetime(
    order_items_df["shipping_limit_date"], errors="coerce"
)
order_items_df["shipping_limit_date"] = order_items_df["shipping_limit_date"].where(
    order_items_df["shipping_limit_date"].notna(), None
)


#ORDERPAYMENT DATASET
order_payments_df = pd.read_csv("olist_order_payments_dataset.csv")
order_reviews_df = pd.read_csv(
    "olist_order_reviews_dataset.csv",
    parse_dates=["review_creation_date", "review_answer_timestamp"]
)
order_reviews_df = order_reviews_df.drop_duplicates(subset=["review_id"], keep="first")
#Fix NaT in review datetime columns
review_datetime_cols = ["review_creation_date", "review_answer_timestamp"]
#Removing NA
for col in review_datetime_cols:
    order_reviews_df[col] = pd.to_datetime(order_reviews_df[col], errors="coerce")
    order_reviews_df[col] = order_reviews_df[col].where(
        order_reviews_df[col].notna(), None
)


#PRODUCT DATASET
products_df = pd.read_csv("olist_products_dataset.csv")

products_df = products_df.rename(columns={
    "product_name_lenght": "product_name_length",
    "product_description_lenght": "product_description_length"
})

#SELLER DATASET
sellers_df = pd.read_csv("olist_sellers_dataset.csv")

#PRODUCT CATEGORY TRANSLATION DATASET
category_translation_df = pd.read_csv("product_category_name_translation.csv")

#ORDERS DATASET
orders_df = pd.read_csv(
    "olist_orders_dataset.csv",
    parse_dates=[
        "order_purchase_timestamp",
        "order_approved_at",
        "order_delivered_carrier_date",
        "order_delivered_customer_date",
        "order_estimated_delivery_date",
    ]
)
#FIX NaT
order_datetime_cols = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date",
]
for col in order_datetime_cols:
    orders_df[col] = pd.to_datetime(orders_df[col], errors="coerce")
    # convert to object so None is allowed
    orders_df[col] = orders_df[col].astype("object")
    orders_df.loc[orders_df[col].isna(), col] = None


print("✅ CSVs loaded")


✅ CSVs loaded


In [3]:
# INSERT DATA USING RAW SQL

with engine.begin() as conn:
    
    # ---------- customers ----------
    conn.execute(
        text("""
            INSERT INTO customers (
                customer_id, customer_unique_id, customer_zip_code_prefix,
                customer_city, customer_state
            )
            VALUES (
                :customer_id, :customer_unique_id, :customer_zip_code_prefix,
                :customer_city, :customer_state
            );
        """),
        customers_df.to_dict(orient="records")
    )

    # ---------- geolocations ----------
    conn.execute(
        text("""
            INSERT INTO geolocations (
                geolocation_zip_code_prefix,
                geolocation_lat,
                geolocation_lng,
                geolocation_city,
                geolocation_state
            )
            VALUES (
                :geolocation_zip_code_prefix,
                :geolocation_lat,
                :geolocation_lng,
                :geolocation_city,
                :geolocation_state
            );
        """),
        geolocations_df.to_dict(orient="records")
    )

    # ---------- sellers ----------
    conn.execute(
        text("""
            INSERT INTO sellers (
                seller_id, seller_zip_code_prefix, seller_city, seller_state
            )
            VALUES (
                :seller_id, :seller_zip_code_prefix, :seller_city, :seller_state
            );
        """),
        sellers_df.to_dict(orient="records")
    )

    # ---------- product_categories ----------
    conn.execute(
        text("""
            INSERT INTO product_categories (
                product_category_name, product_category_name_english
            )
            VALUES (
                :product_category_name, :product_category_name_english
            );
        """),
        category_translation_df.to_dict(orient="records")
    )

    # ---------- build category_id map for products ----------
    result = conn.execute(
        text("SELECT category_id, product_category_name FROM product_categories")
    )
    # build category_id map
    cat_map = {
        row._mapping["product_category_name"]: row._mapping["category_id"]
        for row in result
    }
    products_df["category_id"] = (
        products_df["product_category_name"]
          .map(cat_map)          
          .astype("Int64")
    )
    products_df["category_id"] = products_df["category_id"].where(
        products_df["category_id"].notnull(), None
    )

    # ---------- products ----------
    conn.execute(
        text("""
            INSERT INTO products (
                product_id, category_id,
                product_name_length, product_description_length,
                product_photos_qty, product_weight_g,
                product_length_cm, product_height_cm, product_width_cm
            )
            VALUES (
                :product_id, :category_id,
                :product_name_length, :product_description_length,
                :product_photos_qty, :product_weight_g,
                :product_length_cm, :product_height_cm, :product_width_cm
            );
        """),
        products_df[[
            "product_id", "category_id",
            "product_name_length", "product_description_length",
            "product_photos_qty", "product_weight_g",
            "product_length_cm", "product_height_cm", "product_width_cm"
        ]].to_dict(orient="records")
    )

    # ---------- orders ----------
    conn.execute(
        text("""
            INSERT INTO orders (
                order_id, customer_id, order_status,
                order_purchase_timestamp, order_approved_at,
                order_delivered_carrier_date, order_delivered_customer_date,
                order_estimated_delivery_date
            )
            VALUES (
                :order_id, :customer_id, :order_status,
                :order_purchase_timestamp, :order_approved_at,
                :order_delivered_carrier_date, :order_delivered_customer_date,
                :order_estimated_delivery_date
            );
        """),
        orders_df[[
            "order_id", "customer_id", "order_status",
            "order_purchase_timestamp", "order_approved_at",
            "order_delivered_carrier_date", "order_delivered_customer_date",
            "order_estimated_delivery_date"
        ]].to_dict(orient="records")
    )

    # ---------- order_items ----------
    conn.execute(
        text("""
            INSERT INTO order_items (
                order_id, order_item_id, product_id, seller_id,
                shipping_limit_date, price, freight_value
            )
            VALUES (
                :order_id, :order_item_id, :product_id, :seller_id,
                :shipping_limit_date, :price, :freight_value
            );
        """),
        order_items_df[[
            "order_id", "order_item_id", "product_id", "seller_id",
            "shipping_limit_date", "price", "freight_value"
        ]].to_dict(orient="records")
    )

    # ---------- order_payments ----------
    conn.execute(
        text("""
            INSERT INTO order_payments (
                order_id, payment_sequential, payment_type,
                payment_installments, payment_value
            )
            VALUES (
                :order_id, :payment_sequential, :payment_type,
                :payment_installments, :payment_value
            );
        """),
        order_payments_df[[
            "order_id", "payment_sequential", "payment_type",
            "payment_installments", "payment_value"
        ]].to_dict(orient="records")
    )

    # ---------- order_reviews ----------
    conn.execute(
        text("""
            INSERT INTO order_reviews (
                review_id, order_id, review_score,
                review_comment_title, review_comment_message,
                review_creation_date, review_answer_timestamp
            )
            VALUES (
                :review_id, :order_id, :review_score,
                :review_comment_title, :review_comment_message,
                :review_creation_date, :review_answer_timestamp
            );
        """),
        order_reviews_df[[
            "review_id", "order_id", "review_score",
            "review_comment_title", "review_comment_message",
            "review_creation_date", "review_answer_timestamp"
        ]].to_dict(orient="records")
    )

print("✅ All data inserted")


✅ All data inserted
