In [0]:
import pandas as pd
import os
import logging


In [0]:
# Configure logging for pipeline monitoring
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)


In [0]:
# Path to raw SuperStore CSV file (Unity Catalog Volume)
RAW_DATA_PATH = "/Volumes/shopz_catalog/default/superstore_volume/Sample - Superstore.csv"

# Output paths (same volume)
PROCESSED_DATA_PATH = "/Volumes/shopz_catalog/default/superstore_volume/superstore_cleaned.parquet"
PANDAS_SUMMARY_PATH = "/Volumes/shopz_catalog/default/superstore_volume/pandas_sales_summary.csv"


In [0]:
def load_raw_data(file_path):
    """Load raw CSV file"""
    try:
        logging.info("Loading raw CSV data")
        return pd.read_csv(file_path, encoding="latin1")
    except Exception as e:
        logging.error(f"Failed to load raw data: {e}")
        raise


In [0]:
def clean_data(df):
    """Clean and transform raw dataset"""
    try:
        logging.info("Starting data cleaning process")

        # Convert date columns to datetime format
        # Convert date columns to Spark-compatible timestamps
        df["Order Date"] = pd.to_datetime(df["Order Date"], errors="coerce").astype("datetime64[us]")
        df["Ship Date"] = pd.to_datetime(df["Ship Date"], errors="coerce").astype("datetime64[us]")


        # Remove duplicate rows
        df = df.drop_duplicates()

        # Handle missing numeric values
        df["Postal Code"] = df["Postal Code"].fillna(0)
        df["Sales"] = df["Sales"].fillna(0)
        df["Profit"] = df["Profit"].fillna(0)

        # Standardize categorical text columns
        df["Region"] = df["Region"].str.title().str.strip()
        df["Category"] = df["Category"].str.title().str.strip()
        df["Sub-Category"] = df["Sub-Category"].str.title().str.strip()

        # Create derived metric: Profit Margin
        df["Profit Margin"] = df["Profit"] / df["Sales"]
        df["Profit Margin"] = (
            df["Profit Margin"]
            .replace([float("inf"), -float("inf")], 0)
            .fillna(0)
        )

        logging.info("Data cleaning completed successfully")
        return df

    except Exception as e:
        logging.error(f"Error during data cleaning: {e}")
        raise

# This step handles missing data, duplicates, standardization, and derived metrics.

In [0]:
def pandas_sales_summary(df):
    """Create category-level sales summary using Pandas"""
    try:
        logging.info("Generating Pandas sales summary")

        # Filter records with valid sales
        df_filtered = df[df["Sales"] > 0]

        # Group and aggregate sales data
        summary = (
            df_filtered
            .groupby("Category")
            .agg(
                Total_Sales=("Sales", "sum"),
                Total_Profit=("Profit", "sum"),
                Avg_Profit_Margin=("Profit Margin", "mean")
            )
            .reset_index()
        )

        logging.info("Pandas aggregation completed successfully")
        return summary

    except Exception as e:
        logging.error(f"Error during Pandas aggregation: {e}")
        raise

# This explicitly demonstrates Pandas filtering, grouping, and aggregation.

In [0]:
try:
    # Load raw dataset
    df_raw = load_raw_data(RAW_DATA_PATH)

    # Clean and transform data
    df_cleaned = clean_data(df_raw)

    # Generate Pandas aggregation summary
    pandas_summary = pandas_sales_summary(df_cleaned)

    # Save cleaned dataset as parquet (Silver output)
    df_cleaned.to_parquet(PROCESSED_DATA_PATH, index=False)

    # Save Pandas aggregation output
    pandas_summary.to_csv(PANDAS_SUMMARY_PATH, index=False)

    logging.info("Data cleaning and transformation pipeline executed successfully")

    # Preview outputs
    display(df_cleaned.head())
    display(pandas_summary)

except Exception as e:
    logging.critical(f"Pipeline execution failed: {e}")


2025-12-29 08:25:47,307 - INFO - Loading raw CSV data
2025-12-29 08:25:52,265 - INFO - Starting data cleaning process
2025-12-29 08:25:52,317 - INFO - Data cleaning completed successfully
2025-12-29 08:25:52,318 - INFO - Generating Pandas sales summary
2025-12-29 08:25:52,330 - INFO - Pandas aggregation completed successfully
2025-12-29 08:25:52,735 - INFO - Data cleaning and transformation pipeline executed successfully


Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country,City,State,Postal Code,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit,Profit Margin
1,CA-2016-152156,2016-11-08T00:00:00.000Z,2016-11-11T00:00:00.000Z,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,2,0.0,41.9136,0.16
2,CA-2016-152156,2016-11-08T00:00:00.000Z,2016-11-11T00:00:00.000Z,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",731.94,3,0.0,219.582,0.3
3,CA-2016-138688,2016-06-12T00:00:00.000Z,2016-06-16T00:00:00.000Z,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters by Universal,14.62,2,0.0,6.8714,0.47
4,US-2015-108966,2015-10-11T00:00:00.000Z,2015-10-18T00:00:00.000Z,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775,5,0.45,-383.031,-0.4
5,US-2015-108966,2015-10-11T00:00:00.000Z,2015-10-18T00:00:00.000Z,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368,2,0.2,2.5164,0.1125


Category,Total_Sales,Total_Profit,Avg_Profit_Margin
Furniture,741999.7953,18451.2728,0.0387835332152663
Office Supplies,719047.032,122490.8008,0.1380302946491131
Technology,836154.033,145454.9481,0.1561380531277661
