In [None]:
# ============ DROP ALL TABLES AND REDO ============
# import pymysql
# from pymongo import MongoClient
# 
# # Connect to MySQL
# conn = pymysql.connect(host="localhost", user="root", password="nazmul.2025", database="benchmark_db")
# cursor = conn.cursor()
# 
# # Drop tables
# cursor.execute("DROP TABLE IF EXISTS orders")
# cursor.execute("DROP TABLE IF EXISTS customers")
# 
# # Commit and close
# conn.commit()
# cursor.close()
# conn.close()
# 
# print("All MySQL tables dropped successfully.")
# 
# 
# # Drop Mongo
# # Connect to MongoDB
# client = MongoClient("mongodb://localhost:27017/")
# db = client["benchmark_db"]
# 
# # Drop the orders collection
# db["orders"].drop()
# 
# print("All MongoDB collections dropped successfully.")

In [4]:
# ============ LIBRARIES ============
import os
import pandas as pd
import pymysql
import pymongo
import time

# ============ SETTINGS ============
EXCEL_PATH = "data/online_retail_II.xlsx"
CACHE_PATH = "data/cleaned_data.csv"
MYSQL_DB = "benchmark_db"
MYSQL_TABLE = "orders"
MONGO_DB = "benchmark_db"
MONGO_COLLECTION = "orders"
# ==================================

In [5]:
# ============ LOAD / MERGE / CLEAN ============

if os.path.exists(CACHE_PATH):
    print(f"Cached cleaned dataset found: {CACHE_PATH}")
    df = pd.read_csv(CACHE_PATH, parse_dates=["InvoiceDate"])
else:
    print(f"Loading Excel file: {EXCEL_PATH}")
    df_2009 = pd.read_excel(EXCEL_PATH, sheet_name="Year 2009-2010")
    df_2010 = pd.read_excel(EXCEL_PATH, sheet_name="Year 2010-2011")
    df = pd.concat([df_2009, df_2010], ignore_index=True)
    # Replace empty strings with NaN before dropping
    df.replace("", pd.NA, inplace=True)
    df.dropna(inplace=True)

    # Clean and standardize
    df = df.astype({
        "Invoice": str,
        "StockCode": str,
        "Description": str,
        "Quantity": int,
        "InvoiceDate": "datetime64[ns]",
        "Price": float,
        "Customer ID": str,
        "Country": str
    })
    df.rename(columns={
        "Invoice": "InvoiceNo",
        "Price": "UnitPrice",
        "Customer ID": "CustomerID"
    }, inplace=True)
    df.reset_index(drop=True, inplace=True)

    os.makedirs("data", exist_ok=True)
    df.to_csv(CACHE_PATH, index=False)
    print(f"Cleaned data saved to: {CACHE_PATH}")

print(f"Total records in DataFrame: {len(df)}")

Cached cleaned dataset found: data/cleaned_data.csv
Total records in DataFrame: 824364


In [7]:
# ============ INSERT INTO MySQL ============
def is_mysql_data_loaded():
    try:
        conn = pymysql.connect(host="localhost", user="root", password="", database=MYSQL_DB)
        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {MYSQL_TABLE}")
        count = cursor.fetchone()[0] # type: ignore
        cursor.close()
        conn.close()
        return count == len(df)  # Exact match
    except:
        return False

if is_mysql_data_loaded():
    print("MySQL already contains full dataset. Skipping insert.")
else:
    print("Inserting into MySQL...")
    conn_init = pymysql.connect(host="localhost", user="root", password="")
    cursor_init = conn_init.cursor()
    cursor_init.execute(f"CREATE DATABASE IF NOT EXISTS {MYSQL_DB}")
    cursor_init.close()
    conn_init.close()

    conn = pymysql.connect(host="localhost", user="root", password="", database=MYSQL_DB)
    cursor = conn.cursor()
    cursor.execute(f"DROP TABLE IF EXISTS {MYSQL_TABLE}")
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_TABLE} (
            InvoiceNo VARCHAR(20),
            StockCode VARCHAR(20),
            Description TEXT,
            Quantity INT,
            InvoiceDate DATETIME,
            UnitPrice FLOAT,
            CustomerID VARCHAR(20),
            Country VARCHAR(100)
        )
    """)
    insert_query = f"""
        INSERT INTO {MYSQL_TABLE}
        (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """
    batch_size = 500
    start_time = time.time()
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        values = [tuple(x) for x in batch.to_numpy()]
        cursor.executemany(insert_query, values)
        conn.commit()
        if i % (batch_size * 10) == 0:
            print(f"  Inserted {i} rows...")
    cursor.execute(f"SELECT COUNT(*) FROM {MYSQL_TABLE}")
    mysql_count = cursor.fetchone()[0] # type: ignore
    end_time = time.time()
    print(f"MySQL insert complete in {end_time - start_time:.2f} seconds. Rows inserted: {mysql_count}")
    if mysql_count != len(df):
        print(f"Warning: MySQL row count ({mysql_count}) does not match DataFrame ({len(df)})")

    cursor.close()
    conn.close()

Inserting into MySQL...


OperationalError: (2003, "Can't connect to MySQL server on 'localhost' ([WinError 10061] No connection could be made because the target machine actively refused it)")

In [None]:
# ============ INSERT INTO MongoDB ============
def is_mongo_data_loaded():
    client = pymongo.MongoClient("mongodb://localhost:27017/")
    collection = client[MONGO_DB][MONGO_COLLECTION]
    count = collection.count_documents({})
    return count == len(df)  # Exact match

if is_mongo_data_loaded():
    print("MongoDB already contains full dataset. Skipping insert.")
else:
    print("Inserting into MongoDB...")
    client = pymongo.MongoClient("mongodb://localhost:27017/")
    collection = client[MONGO_DB][MONGO_COLLECTION]
    collection.drop()
    batch_size = 500
    start_time = time.time()
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        collection.insert_many(batch.to_dict("records"))
        if i % (batch_size * 10) == 0:
            print(f"  Inserted {i} rows...")
    mongo_count = collection.count_documents({})
    end_time = time.time()
    print(f"MongoDB insert complete in {end_time - start_time:.2f} seconds. Rows inserted: {mongo_count}")
    if mongo_count != len(df):
        print(f"Warning: MongoDB row count ({mongo_count}) does not match DataFrame ({len(df)})")


Inserting into MongoDB...
  Inserted 0 rows...
  Inserted 5000 rows...
  Inserted 10000 rows...
  Inserted 15000 rows...
  Inserted 20000 rows...
  Inserted 25000 rows...
  Inserted 30000 rows...
  Inserted 35000 rows...
  Inserted 40000 rows...
  Inserted 45000 rows...
  Inserted 50000 rows...
  Inserted 55000 rows...
  Inserted 60000 rows...
  Inserted 65000 rows...
  Inserted 70000 rows...
  Inserted 75000 rows...
  Inserted 80000 rows...
  Inserted 85000 rows...
  Inserted 90000 rows...
  Inserted 95000 rows...
  Inserted 100000 rows...
  Inserted 105000 rows...
  Inserted 110000 rows...
  Inserted 115000 rows...
  Inserted 120000 rows...
  Inserted 125000 rows...
  Inserted 130000 rows...
  Inserted 135000 rows...
  Inserted 140000 rows...
  Inserted 145000 rows...
  Inserted 150000 rows...
  Inserted 155000 rows...
  Inserted 160000 rows...
  Inserted 165000 rows...
  Inserted 170000 rows...
  Inserted 175000 rows...
  Inserted 180000 rows...
  Inserted 185000 rows...
  Inserted 