# DATASETS CLEANING

In [2]:
import pandas as pd
import numpy as np

# Function to clean dataframe
def clean_dataframe(df, date_columns=[]):
    # Standardize column names: Replace spaces with underscores
    df.columns = df.columns.str.replace(" ", "_")

    # Handle date columns: Convert and format correctly
    for col in date_columns:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce").dt.strftime("%Y-%m-%d")

    # Replace problematic values (NaN, NaT, empty strings) with None
    df.replace({np.nan: None, "nan": None, "NaT": None, "": None}, inplace=True)
    
    # Convert all columns to appropriate types
    for col in df.columns:
        if df[col].dtype == object:
            df[col] = df[col].apply(lambda x: None if pd.isna(x) or x == "nan" else x.strip() if isinstance(x, str) else x)
        else:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Convert NaN values in numeric columns to None (MySQL NULL)
    df = df.where(pd.notna(df), None)

    # Drop duplicates
    df.drop_duplicates(inplace=True)
    return df

# List of datasets with required cleaning steps
datasets = {
    "Customers": {"drop_cols": ["State_Code", "Zip_Code", "Continent"], "date_cols": ["Birthday"]},
    "Exchange_Rates": {"drop_cols": [], "date_cols": ["Date"]},
    "Products": {"drop_cols": ["SubcategoryKey"], "date_cols": []},
    "Sales": {"drop_cols": [], "date_cols": ["Order_Date", "Delivery_Date"]},
    "Stores": {"drop_cols": [], "date_cols": ["Open_Date"]}
}

# Process each dataset
for name, config in datasets.items():
    df = pd.read_csv(f"{name}.csv", encoding="ANSI")
    
    # Replace spaces in column names before dropping any columns
    df.columns = df.columns.str.replace(" ", "_")

    # Drop specified columns if they exist
    df.drop(columns=[col for col in config["drop_cols"] if col in df.columns], errors="ignore", inplace=True)

    # Clean the dataframe
    df = clean_dataframe(df, date_columns=config["date_cols"])

    # Additional cleaning for specific datasets
    if name == "Products":
        df["Unit_Cost_USD"] = df["Unit_Cost_USD"].str.replace("$", "").str.replace(",", "").str.strip().astype(float)
        df["Unit_Price_USD"] = df["Unit_Price_USD"].str.replace("$", "").str.replace(",", "").str.strip().astype(float)
    if name == "Stores":
        df["Square_Meters"] = df["Square_Meters"].fillna(0)
    if name == "Sales":
        df.loc[:, "Delivery_Date"] = df["Delivery_Date"].fillna(np.nan)  # Avoids chained assignment issue

    # Save cleaned CSV
    df.to_csv(f"Cleaned_{name}.csv", index=False)
    print(f"Cleaned_{name}.csv saved.")

print("All datasets cleaned successfully!")


Cleaned_Customers.csv saved.
Cleaned_Exchange_Rates.csv saved.
Cleaned_Products.csv saved.
Cleaned_Sales.csv saved.
Cleaned_Stores.csv saved.
All datasets cleaned successfully!


# MERGING ALL DATASETS

In [3]:
import pandas as pd

# Load cleaned datasets
customers = pd.read_csv("Cleaned_Customers.csv")
exchange_rates = pd.read_csv("Cleaned_Exchange_Rates.csv")
products = pd.read_csv("Cleaned_Products.csv")
sales = pd.read_csv("Cleaned_Sales.csv")
stores = pd.read_csv("Cleaned_Stores.csv")

# Convert necessary columns to datetime format and format them for MySQL (YYYY-MM-DD)
sales["Order_Date"] = pd.to_datetime(sales["Order_Date"], errors="coerce").dt.strftime("%Y-%m-%d")
exchange_rates["Date"] = pd.to_datetime(exchange_rates["Date"], errors="coerce").dt.strftime("%Y-%m-%d")

# Merge datasets
df = (
    sales
    .merge(customers, on="CustomerKey", how="inner")
    .merge(products, on="ProductKey", how="inner")
    .merge(stores, on="StoreKey", how="inner")
)

# Merge with exchange rates (left join to keep all sales records)
merge_keys = ["Order_Date"]
if "Currency" in df.columns and "Currency" in exchange_rates.columns:
    merge_keys.append("Currency")

df = df.merge(exchange_rates, left_on=merge_keys, right_on=["Date"] + merge_keys[1:], how="left")

# Clean up columns
df.drop(columns=["State_y", "Country_y", "Date"], inplace=True, errors="ignore")
df.rename(columns={"State_x": "State", "Country_x": "Country"}, inplace=True)

# **Replace problematic values with None (MySQL NULL)**
for col in df.columns:
    if df[col].dtype == object:
        df[col] = df[col].apply(lambda x: None if pd.isna(x) or x in ["nan", "NaT", ""] else x.strip() if isinstance(x, str) else x)
    else:
        df[col] = pd.to_numeric(df[col], errors="coerce")  # Convert invalid numeric values to NaN
df = df.where(pd.notna(df), None)  # Convert NaN to None

# **Rename columns to replace spaces with underscores**
df.columns = df.columns.str.replace(" ", "_")

# **Remove duplicates**
df.drop_duplicates(inplace=True)

# Save and display results
df.to_csv("Merged_Dataset.csv", index=False)
print("Merged dataset saved as 'Merged_Dataset.csv'")
print(df.head())
print("Columns:", df.columns.tolist())

# Display all columns for better readability
pd.set_option("display.max_columns", None)
print(df)


Merged dataset saved as 'Merged_Dataset.csv'
   Order_Number  Line_Item  Order_Date Delivery_Date  CustomerKey  StoreKey  \
0        366000          1  2016-01-01          None       265598        10   
1        366000          1  2016-01-01          None       265598        10   
2        366000          1  2016-01-01          None       265598        10   
3        366000          1  2016-01-01          None       265598        10   
4        366000          1  2016-01-01          None       265598        10   

   ProductKey  Quantity Currency_Code Gender  ...  Color Unit_Cost_USD  \
0        1304         1           CAD   Male  ...  White         31.27   
1        1304         1           CAD   Male  ...  White         31.27   
2        1304         1           CAD   Male  ...  White         31.27   
3        1304         1           CAD   Male  ...  White         31.27   
4        1304         1           CAD   Male  ...  White         31.27   

  Unit_Price_USD                   

# DATABASE AND TABLES CREATION AND INSERTION 

In [5]:
!pip install mysql-connector-python


Collecting mysql-connector-python
  Downloading mysql_connector_python-9.2.0-cp312-cp312-win_amd64.whl.metadata (6.2 kB)
Downloading mysql_connector_python-9.2.0-cp312-cp312-win_amd64.whl (16.1 MB)
   ---------------------------------------- 0.0/16.1 MB ? eta -:--:--
   ------- -------------------------------- 3.1/16.1 MB 20.6 MB/s eta 0:00:01
   -------------- ------------------------- 6.0/16.1 MB 16.8 MB/s eta 0:00:01
   --------------------- ------------------ 8.7/16.1 MB 15.4 MB/s eta 0:00:01
   ---------------------------- ----------- 11.3/16.1 MB 14.7 MB/s eta 0:00:01
   ---------------------------------- ----- 13.9/16.1 MB 14.3 MB/s eta 0:00:01
   ---------------------------------------  16.0/16.1 MB 14.2 MB/s eta 0:00:01
   ---------------------------------------- 16.1/16.1 MB 13.2 MB/s eta 0:00:00
Installing collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.2.0


In [8]:
import mysql.connector
import configparser
import pandas as pd

# Read database credentials from config.ini
config = configparser.ConfigParser()
config.read("config.ini")  # Ensure this file is present in the working directory

host = config["mysql"]["host"]
user = config["mysql"]["user"]
password = config["mysql"]["password"]
database = "Data_Spark"  # Fixed database name as required

# Connect to MySQL server (without specifying database to create it first)
connection = mysql.connector.connect(
    host=host,
    user=user,
    password=password
)
cursor = connection.cursor()

# Create Database
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database};")
cursor.execute(f"USE {database};")

# **Table Creation Queries**
tables = {
    "Customers": """
        CREATE TABLE IF NOT EXISTS Customers (
            CustomerKey INT PRIMARY KEY,
            Gender VARCHAR(10),
            Name VARCHAR(255),
            City VARCHAR(100),
            State VARCHAR(100),
            Country VARCHAR(100),
            Birthday DATE
        );
    """,
    "Exchange_Rates": """
        CREATE TABLE IF NOT EXISTS Exchange_Rates (
            Date DATE,
            Currency VARCHAR(10),
            Exchange DECIMAL(10,4),
            PRIMARY KEY (Date, Currency)
        );
    """,
    "Products": """
        CREATE TABLE IF NOT EXISTS Products (
            ProductKey INT PRIMARY KEY,
            Product_Name VARCHAR(255),
            Brand VARCHAR(100),
            Color VARCHAR(50),
            Unit_Cost_USD DECIMAL(10,2),
            Unit_Price_USD DECIMAL(10,2),
            Subcategory VARCHAR(100),
            CategoryKey INT,
            Category VARCHAR(100)
        );
    """,
    "Sales": """
        CREATE TABLE IF NOT EXISTS Sales (
            Order_Number INT,
            Line_Item INT,
            Order_Date DATE,
            Delivery_Date DATE,
            CustomerKey INT,
            StoreKey INT,
            ProductKey INT,
            Quantity INT,
            Currency_Code VARCHAR(10),
            PRIMARY KEY (Order_Number, Line_Item)
        );
    """,
    "Stores": """
        CREATE TABLE IF NOT EXISTS Stores (
            StoreKey INT PRIMARY KEY,
            Country VARCHAR(100),
            State VARCHAR(100),
            Square_Meters DECIMAL(10,2),
            Open_Date DATE
        );
    """,
    "Merged_Dataset": """
        CREATE TABLE IF NOT EXISTS Merged_Dataset (
            CustomerKey INT,
            Gender VARCHAR(10),
            Name VARCHAR(255),
            City VARCHAR(100),
            State VARCHAR(100),
            Country VARCHAR(100),
            Birthday DATE,
            Order_Number INT,
            Line_Item INT,
            Order_Date DATE,
            Delivery_Date DATE,
            StoreKey INT,
            ProductKey INT,
            Quantity INT,
            Currency_Code VARCHAR(10),
            Product_Name VARCHAR(255),
            Brand VARCHAR(100),
            Color VARCHAR(50),
            Unit_Cost_USD DECIMAL(10,2),
            Unit_Price_USD DECIMAL(10,2),
            Subcategory VARCHAR(100),
            CategoryKey INT,
            Category VARCHAR(100),
            Square_Meters DECIMAL(10,2),
            Open_Date DATE,
            Currency VARCHAR(10),
            Exchange DECIMAL(10,4)
        );
    """
}

# Execute table creation queries
for table_name, query in tables.items():
    cursor.execute(query)

connection.commit()
print("✅ Database and tables created successfully.")

# **Function to Load and Insert Data in Chunks**
def load_and_insert_data(csv_file, table_name, chunk_size=1000):
    print(f"Processing {csv_file} → {table_name}...")
    
    # Load CSV file in chunks
    chunk_iter = pd.read_csv(csv_file, chunksize=chunk_size)

    for i, chunk in enumerate(chunk_iter):
        print(f"Inserting chunk {i + 1} into {table_name}...")

        # **Fix column names (replace spaces with underscores)**
        chunk.columns = chunk.columns.str.replace(" ", "_")

        # **Convert NaN, NaT, empty strings to None (MySQL NULL)**
        for col in chunk.columns:
            if chunk[col].dtype == object:
                chunk[col] = chunk[col].apply(lambda x: None if pd.isna(x) or x in ["nan", "NaT", ""] else x.strip() if isinstance(x, str) else x)
            else:
                chunk[col] = pd.to_numeric(chunk[col], errors="coerce")
        chunk = chunk.where(pd.notna(chunk), None)

        # **Remove Duplicates**
        chunk.drop_duplicates(inplace=True)

        # **Convert DataFrame to Tuple Format**
        data_tuples = [tuple(x) for x in chunk.itertuples(index=False, name=None)]

        # **Create Insert Query**
        columns = ", ".join(chunk.columns)
        placeholders = ", ".join(["%s"] * len(chunk.columns))
        insert_query = f"INSERT IGNORE INTO {table_name} ({columns}) VALUES ({placeholders})"

        # **Insert Data in Chunks**
        cursor.executemany(insert_query, data_tuples)
        connection.commit()
    
    print(f"{table_name} inserted successfully.")

# **Load and Insert Data from CSV Files in Chunks**
csv_table_mapping = {
    "Cleaned_Customers.csv": "Customers",
    "Cleaned_Exchange_Rates.csv": "Exchange_Rates",
    "Cleaned_Products.csv": "Products",
    "Cleaned_Sales.csv": "Sales",
    "Cleaned_Stores.csv": "Stores",
    "Merged_Dataset.csv": "Merged_Dataset"
}

for csv_file, table_name in csv_table_mapping.items():
    load_and_insert_data(csv_file, table_name, chunk_size=500)  # Adjust chunk_size as needed


# **Verify Tables**
cursor.execute("SHOW TABLES;")
tables = cursor.fetchall()
print("\nTables in Data_Spark:", [t[0] for t in tables])

# **Close Connection**
cursor.close()
connection.close()
print("MySQL connection closed.")


✅ Database and tables created successfully.
Processing Cleaned_Customers.csv → Customers...
Inserting chunk 1 into Customers...
Inserting chunk 2 into Customers...
Inserting chunk 3 into Customers...
Inserting chunk 4 into Customers...
Inserting chunk 5 into Customers...
Inserting chunk 6 into Customers...
Inserting chunk 7 into Customers...
Inserting chunk 8 into Customers...
Inserting chunk 9 into Customers...
Inserting chunk 10 into Customers...
Inserting chunk 11 into Customers...
Inserting chunk 12 into Customers...
Inserting chunk 13 into Customers...
Inserting chunk 14 into Customers...
Inserting chunk 15 into Customers...
Inserting chunk 16 into Customers...
Inserting chunk 17 into Customers...
Inserting chunk 18 into Customers...
Inserting chunk 19 into Customers...
Inserting chunk 20 into Customers...
Inserting chunk 21 into Customers...
Inserting chunk 22 into Customers...
Inserting chunk 23 into Customers...
Inserting chunk 24 into Customers...
Inserting chunk 25 into Custo