In [None]:
# data_cleaning_pipeline.py

import pandas as pd
import numpy as np
from scipy import stats

class DataCleaningPipeline:
    """
    A reusable data‐cleaning pipeline that provides methods to:
      - Load data from CSV (or pass a DataFrame directly)
      - Remove duplicates
      - Handle missing values (drop or impute)
      - Standardize column names
      - Correct data types
      - Detect and handle outliers
      - Run all steps in sequence via run()
    """

    def __init__(self, df=None, csv_path=None, excel_path=None):
        """
        Initialize the pipeline. Provide either:
          - df: a pandas DataFrame,
            - excel_path: path to an Excel file to load or
          - csv_path: path to a CSV file to load.
        """
        if df is not None:
            self.df = df.copy()
        elif csv_path is not None:
            self.df = pd.read_csv(csv_path)
        elif excel_path is not None:
            self.df = pd.read_excel(excel_path)
        else:
            raise ValueError("You must provide either a DataFrame (df), csv_path or excel_path.")

        # Store intermediate states if needed
        self._original_df = self.df.copy()

    def check_duplicates(self):
        """
        Returns the number of duplicate rows in the DataFrame.
        """
        return self.df.duplicated().sum()

    def remove_duplicates(self):
        """
        (Full Row Matches)
        Remove duplicate rows from the DataFrame while keeping the first occurance.
        Optionally specify subset of columns and which duplicate to keep.
        """
        self.df.drop_duplicates(inplace=True)
        return self.df

    def standardize_column_names(self, lowercase=True, strip_whitespace=True, replace_spaces="_"):
        """
        Normalize column names for consistency:
          - lowercase: convert all to lowercase
          - strip_whitespace: remove leading/trailing whitespace
          - replace_spaces: character to replace spaces with (e.g. '_')
        """
        new_cols = []
        for col in self.df.columns:
            new_col = col
            if strip_whitespace:
                new_col = new_col.strip()
            if lowercase:
                new_col = new_col.lower()
            if replace_spaces:
                new_col = new_col.replace(" ", replace_spaces)
            new_cols.append(new_col)
        self.df.columns = new_cols
        return self.df

    def convert_dtypes(self, dtype_map):
        """
        Convert columns to specified dtypes.
        - dtype_map: dictionary {col_name: target_dtype}, e.g. {'amount': 'float', 'order_date': 'datetime64'}.
        """
        for col, dtype in dtype_map.items():
            try:
                if dtype.startswith("datetime"):
                    self.df[col] = pd.to_datetime(self.df[col], errors="coerce")
                else:
                    self.df[col] = self.df[col].astype(dtype, errors="ignore")
            except Exception as e:
                # Fallback to using to_numeric when dtype is numeric but conversion fails
                if dtype in ("int", "float", "numeric"):
                    self.df[col] = pd.to_numeric(self.df[col], errors="coerce")
                else:
                    raise
        return self.df

    def handle_missing(
        self,
        drop_threshold=0.5,
        impute_strategy_numeric="median",
        impute_strategy_categorical="mode",
        missing_indicator=False
    ):
        """
        Handle missing values in the DataFrame.
        - drop_threshold: proportion (0–1) of allowed missing in a column; if missing% > drop_threshold, drop the column.
        - impute_strategy_numeric: 'mean' or 'median' for numeric columns.
        - impute_strategy_categorical: 'mode' for categorical columns.
        - missing_indicator: if True, create boolean flag columns for each column that had missing values.
        """
        # 1) Drop columns with too many missing
        missing_pct = self.df.isnull().mean()
        cols_to_drop = missing_pct[missing_pct > drop_threshold].index.tolist()
        self.df.drop(columns=cols_to_drop, inplace=True)

        # 2) Optionally add missing-indicator flags
        if missing_indicator:
            for col in self.df.columns:
                if self.df[col].isnull().any():
                    self.df[f"{col}_was_missing"] = self.df[col].isnull().astype(int)

        # 3) Impute numeric and categorical separately
        for col in self.df.columns:
            if self.df[col].dtype in [np.float64, np.int64]:
                if impute_strategy_numeric == "mean":
                    fill_value = self.df[col].mean()
                else:
                    fill_value = self.df[col].median()
                self.df[col].fillna(fill_value, inplace=True)
            else:
                # Treat non-numeric as categorical/text
                if impute_strategy_categorical == "mode":
                    try:
                        mode_val = self.df[col].mode(dropna=True)[0]
                    except IndexError:
                        mode_val = ""
                    self.df[col].fillna(mode_val, inplace=True)

        return self.df

    def detect_outliers_zscore(self, col_list=None, threshold=3.0, method="remove"):
        """
        Detect and handle outliers using the z-score method on numeric columns.
        - col_list: list of columns to check; if None, uses all numeric columns.
        - threshold: z-score threshold beyond which points are considered outliers.
        - method: 'remove' to drop outlier rows, 'cap' to clip at threshold, or 'flag' to add a boolean column.
        """
        numeric_cols = col_list or self.df.select_dtypes(include=[np.number]).columns.tolist()
        for col in numeric_cols:
            # Compute z-scores for the column
            col_vals = self.df[col]
            zscores = np.abs(stats.zscore(col_vals, nan_policy="omit"))
            outlier_mask = zscores > threshold

            if method == "remove":
                # Drop rows where this column is an outlier
                self.df = self.df.loc[~outlier_mask].reset_index(drop=True)
            elif method == "cap":
                # Clip values at threshold * std dev from mean
                mean_ = col_vals.mean()
                std_ = col_vals.std()
                upper = mean_ + threshold * std_
                lower = mean_ - threshold * std_
                self.df[col] = col_vals.clip(lower, upper)
            elif method == "flag":
                # Create a boolean indicator column for outliers
                self.df[f"{col}_outlier_flag"] = outlier_mask.astype(int)
            else:
                raise ValueError("method must be one of ['remove','cap','flag']")
        return self.df

    def run(
        self,
        subset_duplicates=None,
        keep_duplicates="first",
        lowercase_cols=True,
        strip_whitespace=True,
        replace_spaces="_",
        dtype_map=None,
        drop_threshold=0.5,
        impute_strategy_numeric="median",
        impute_strategy_categorical="mode",
        missing_indicator=True,
        outlier_cols=None,
        outlier_threshold=3.0,
        outlier_method="remove"
    ):
        """
        Execute the full cleaning pipeline in sequence:
          1. remove_duplicates
          2. standardize_column_names
          3. convert_dtypes (if dtype_map provided)
          4. handle_missing
          5. detect_outliers_zscore
        Returns the cleaned DataFrame.
        """
        # 1) Remove duplicates
        self.remove_duplicates(subset=subset_duplicates, keep=keep_duplicates)

        # 2) Standardize column names
        self.standardize_column_names(
            lowercase=lowercase_cols,
            strip_whitespace=strip_whitespace,
            replace_spaces=replace_spaces
        )

        # 3) Convert data types if a map is provided
        if dtype_map:
            self.convert_dtypes(dtype_map)

        # 4) Handle missing values
        self.handle_missing(
            drop_threshold=drop_threshold,
            impute_strategy_numeric=impute_strategy_numeric,
            impute_strategy_categorical=impute_strategy_categorical,
            missing_indicator=missing_indicator
        )

        # 5) Detect (and optionally remove/flag/cap) outliers
        self.detect_outliers_zscore(
            col_list=outlier_cols,
            threshold=outlier_threshold,
            method=outlier_method
        )

        return self.df
