In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pandas.tseries.offsets import BDay
from sklearn.impute import KNNImputer
import matplotlib.pyplot as plt
import seaborn as sns
import mplfinance as mpf
import plotly.express as px
from scipy.stats import zscore, median_abs_deviation, mstats
from pykalman import KalmanFilter

class StockDataProcessor:
    def __init__(self, df=None):
        self.df = df.copy() if df is not None else None

    # ---------------- Download ----------------1
    @staticmethod
    def download_stock_data(ticker, period="1y", interval="1d"):
        """
        Download historical stock price data for a given ticker symbol.
        This static method fetches stock data from Yahoo Finance using the
        `yfinance` library and returns a cleaned pandas DataFrame with
        essential columns: date, open, high, low, close, and volume.

        Parameters
        ----------
        ticker : str
            The stock ticker symbol.
            ("AAPL", "TSLA","MSFT" etc)

        period : str, optional, default "1y"
            The time span of historical data to retrieve.
            ["1d", "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y", "10y", "ytd", "max"]

        interval : str, optional, default "1d"
            The frequency of data points.
            ["1m", "2m", "5m", "15m", "30m", "60m", "90m", "1d", "5d", "1wk", "1mo", "3mo"]

        Returns
        -------
        pd.DataFrame
            A pandas DataFrame with columns:
            - 'date'  : Timestamp of the data point
            - 'open'  : Opening price
            - 'high'  : Highest price during the interval
            - 'low'   : Lowest price during the interval
            - 'close' : Closing price
            - 'volume': Trading volume
        """

        df = yf.download(ticker, period=period, interval=interval)
        if isinstance(df.columns, pd.MultiIndex):
            df.columns = df.columns.get_level_values(0)
        df = df.reset_index()[["Date", "Close", "High", "Low", "Open", "Volume"]]
        df.columns = df.columns.str.lower()
        return df

    # ---------------- Fill Missing Dates ----------------2
    @staticmethod
    def fill_missing_dates(df, date_col='date',break_date = False):
        """
        Fill missing dates in a stock data DataFrame and optionally extract day, month, and year.
        This static method identifies missing or NaN values in the date column of a DataFrame
        and fills them using **business day offsets** (pandas BDay). It ensures that all rows
        have valid dates, preserving chronological order. Optionally, it can also split the
        date into separate day, month, and year columns.

        Parameters
        ----------
        df : pd.DataFrame
            The input DataFrame containing stock data, including a date column.

        date_col : str, optional, default 'date'
            The name of the column containing dates. The column should be parseable by
            `pd.to_datetime()`.

        break_date : bool, optional, default False
            If True, creates three new columns in the DataFrame: 
            - 'day'   : day of the month
            - 'month' : month number
            - 'year'  : year
            If False, the original date column is retained as-is.

        Returns
        -------
        pd.DataFrame
            The DataFrame with missing dates filled. If `break_date` is True, the DataFrame
            also includes 'day', 'month', and 'year' columns.

        Notes
        -----
        - Missing dates are filled using the previous valid date plus the number of business days 
        to the missing row. If no previous date exists, the next valid date is used minus the 
        appropriate number of business days.
        - If all surrounding dates are missing, the current date (`pd.Timestamp.today()`) is used.
        - The method sorts the DataFrame by its index before processing.
        """

        df = df.sort_index()  
        df[date_col] = pd.to_datetime(df[date_col])

        nan_idx = df[df[date_col].isna()].index

        for idx in nan_idx:
            prev_idx = idx - 1
            while prev_idx >= 0 and pd.isna(df.at[prev_idx, date_col]):
                prev_idx -= 1

            next_idx = idx + 1
            while next_idx < len(df) and pd.isna(df.at[next_idx, date_col]):
                next_idx += 1

            if prev_idx >= 0:
                prev_date = df.at[prev_idx, date_col]
                df.at[idx, date_col] = prev_date + BDay(idx - prev_idx)
            elif next_idx < len(df):
                next_date = df.at[next_idx, date_col]
                df.at[idx, date_col] = next_date - BDay(next_idx - idx)
            else:
                df.at[idx, date_col] = pd.Timestamp.today()

        if(break_date==True):
            df['day'] = df[date_col].dt.day
            df['month'] = df[date_col].dt.month
            df['year'] = df[date_col].dt.year
        
        return df

    # ---------------- Fill NaN Advanced ----------------3
    @staticmethod
    def markov_impute(series, n_bins=20, strategy="mode"):
        s = series.copy()

        notnull = s.dropna()
        bins = np.linspace(notnull.min(), notnull.max(), n_bins + 1)
        states = np.digitize(notnull, bins) - 1
        states = np.clip(states, 0, n_bins - 1)

        trans_mat = np.zeros((n_bins, n_bins))
        for i in range(len(states) - 1):
            trans_mat[states[i], states[i + 1]] += 1

        row_sums = trans_mat.sum(axis=1, keepdims=True)
        row_sums[row_sums == 0] = 1
        P = trans_mat / row_sums

        for idx in s.index[s.isna()]:
            prev_idx = s.index[s.index.get_loc(idx) - 1] if s.index.get_loc(idx) > 0 else None
            if prev_idx is None or pd.isna(s[prev_idx]):
                s.at[idx] = notnull.mean()
                continue

            prev_val = s[prev_idx]
            prev_state = np.digitize(prev_val, bins) - 1
            prev_state = np.clip(prev_state, 0, n_bins - 1)

            if strategy == "mode":
                next_state = np.argmax(P[prev_state])
            else:
                next_state = np.random.choice(np.arange(n_bins), p=P[prev_state])

            s.at[idx] = (bins[next_state] + bins[next_state + 1]) / 2

        return s
    
    @staticmethod
    def fill_nan_advanced(df, col_tech_map):
        """
        Fill missing values in a DataFrame using advanced, column-specific imputation techniques.
        This static method allows flexible handling of NaN values in a DataFrame. Each column
        can have one or more imputation techniques applied sequentially. Techniques can be
        statistical, interpolation-based, rolling/window-based, machine-learning-based, or 
        smoothing-based.

        Parameters
        ----------
        df : pd.DataFrame
            The input DataFrame containing numeric or categorical columns with missing values.

        col_tech_map : dict
            A dictionary mapping column names to a list of techniques and optional parameters.
            Each column key maps to a list of tuples, where each tuple is:
                (technique_name: str, params: dict)

            Example format:
            {
                'column1': [('mean', {}), ('ffill', {})],
                'column2': [('knn', {'n_neighbors': 5})],
                'column3': [('drop', {})]
            }

            Supported techniques and example format:

            - 'drop'             : [('drop', {})]
            - 'mean'             : [('mean', {})]
            - 'median'           : [('median', {})]
            - 'mode'             : [('mode', {})]
            - 'ffill'            : [('ffill', {})]
            - 'bfill'            : [('bfill', {})]
            - 'sma' / 'rolling'  : [('sma', {'window': 14})]  # 'window' optional, default 14
            - 'ema'              : [('ema', {'alpha': 0.3})]  # 'alpha' optional, default 0.3
            - 'linear'           : [('linear', {})]
            - 'quadratic'        : [('quadratic', {})]
            - 'cubic'            : [('cubic', {})]
            - 'knn'              : [('knn', {'n_neighbors': 3})]  # default n_neighbors=3
            - 'markov'           : [('markov', {})]
            - 'weighted_combo'   : [('weighted_combo', {'window': 14, 'alpha': 0.3})]
            - 'kalman'           : [('kalman', {})]

            Full example dictionary:

            col_tech_map_examples = {
                'column1': [('drop', {})],
                'column2': [('mean', {})],
                'column3': [('median', {})],
                'column4': [('mode', {})],
                'column5': [('ffill', {})],
                'column6': [('bfill', {})],
                'column7': [('sma', {'window': 10})],
                'column8': [('ema', {'alpha': 0.2})],
                'column9': [('linear', {})],
                'column10': [('quadratic', {})],
                'column11': [('cubic', {})],
                'column12': [('knn', {'n_neighbors': 5})],
                'column13': [('markov', {})],
                'column14': [('weighted_combo', {'window': 14, 'alpha': 0.3})],
                'column15': [('kalman', {})]
            }

        Returns
        -------
        pd.DataFrame
            A copy of the input DataFrame with missing values filled according to the
            specified techniques. The original DataFrame remains unchanged.

        Notes
        -----
        - 'drop' is applied first for each column, then other techniques are applied in order.
        - Multiple techniques can be applied sequentially per column.
        - Non-numeric columns are skipped for numeric-only techniques like 'knn', 'kalman', or 'weighted_combo'.
        - Interpolation methods fill in both forward and backward directions.
        - Weighted combo computes 0.5 * SMA + 0.5 * EMA for filling missing values.
        """

        df_filled = df.copy()
        
        drop_cols = [col for col, techs in col_tech_map.items() if any(t[0]=='drop' for t in techs)]
        if drop_cols:
            df_filled = df_filled.dropna(subset=drop_cols)
        
        for col, techs in col_tech_map.items():
            for tech, params in techs:
                if tech == "drop":
                    continue  

                if tech == "mean":
                    df_filled[col] = df_filled[col].fillna(df_filled[col].mean())

                elif tech == "median":
                    df_filled[col] = df_filled[col].fillna(df_filled[col].median())

                elif tech == "mode":
                    df_filled[col] = df_filled[col].fillna(df_filled[col].mode()[0])

                elif tech == "ffill":
                    df_filled[col] = df_filled[col].fillna(method='ffill')

                elif tech == "bfill":
                    df_filled[col] = df_filled[col].fillna(method='bfill')

                elif tech == "sma" or tech == "rolling":
                    window = params.get('window', 14)
                    df_filled[col] = df_filled[col].fillna(df_filled[col].rolling(window=window, min_periods=1).mean())

                elif tech == "ema":
                    alpha = params.get('alpha', 0.3)
                    df_filled[col] = df_filled[col].fillna(df_filled[col].ewm(alpha=alpha, adjust=False).mean())

                elif tech in ["linear", "quadratic", "cubic"]:
                    df_filled[col] = df_filled[col].interpolate(method=tech, limit_direction='both')

                elif tech == "knn":
                    n_neighbors = params.get('n_neighbors', 3)

                    numeric_cols = df_filled.select_dtypes(include=['number']).columns
                    if col not in numeric_cols:
                        continue  

                    if len(numeric_cols) > 1:
                        imputer = KNNImputer(n_neighbors=n_neighbors)
                        df_filled_numeric = pd.DataFrame(
                            imputer.fit_transform(df_filled[numeric_cols]),
                            columns=numeric_cols,
                            index=df_filled.index
                        )
                        df_filled[col] = df_filled_numeric[col]
                    else:
                        if df_filled[col].dropna().empty:
                            df_filled[col] = 0  
                        else:
                            df_filled[col] = df_filled[col].fillna(df_filled[col].mean())


                elif tech == "markov":
                    df_filled[col] = StockDataProcessor.markov_impute(df_filled[col], n_bins=20, strategy="mode")

                elif tech == "weighted_combo":
                    window = params.get('window', 14)
                    alpha = params.get('alpha', 0.3)
                    filled_df = 0.5*df_filled[col].rolling(window=window, min_periods=1).mean() + \
                                0.5*df_filled[col].ewm(alpha=alpha, adjust=False).mean()
                    df_filled[col] = df_filled[col].fillna(filled_df)

                elif tech == "kalman":
                    series = df_filled[col].copy()
                    if series.dropna().empty:
                        continue

                    values = series.interpolate(limit_direction="both").values

                    kf = KalmanFilter(
                        transition_matrices=[1],
                        observation_matrices=[1],
                        initial_state_mean=values[0],
                        initial_state_covariance=1,
                        transition_covariance=0.01,
                        observation_covariance=1
                    )

                    state_means, _ = kf.smooth(values)
                    df_filled[col] = pd.Series(state_means.flatten(), index=series.index)
 
                else:
                    raise ValueError(f"Technique '{tech}' not recognized")
        
        return df_filled

    #----------------- Plot Graphs --------------------4
    @staticmethod
    def plot_graph(df, columns, graph_type, size=(10,6), color='blue', stacked=False):
        """
        Plot various types of graphs from a DataFrame for visualization.
        This static method supports multiple graph types using matplotlib, seaborn,
        mplfinance, and plotly. It handles common chart types for exploratory data
        analysis or financial visualization.

        Parameters
        ----------
        df : pd.DataFrame
            The input DataFrame containing the data to plot.

        columns : list of str
            List of column names to use in the plot. Column type requirements vary by `graph_type`:

            - 'line'          : 2 columns [x (numeric), y (numeric)]
            - 'scatter'       : 2 columns [x (numeric), y (numeric)]
            - 'bar'           : 1 or 2 columns
                                - 1 column: categorical or numeric counts
                                - 2 columns: x (categorical or numeric), y (numeric)
            - 'hist'          : 1 column [numeric]
            - 'box'           : 1 column [numeric]
            - 'violin'        : 1 column [numeric]
            - 'pairplot'      : >=2 columns [all numeric or categorical encoded as numeric]
            - 'area'          : >=2 columns [x (numeric), y series (numeric)]
            - 'stacked_area', 'stream': >=2 columns [x (numeric), multiple y series (numeric)]
            - 'pie'           : 1 or 2 columns
                                - 1 column: categorical counts
                                - 2 columns: category (categorical), value (numeric)
            - 'waterfall'     : 2 columns [category (categorical), value (numeric)]
            - 'treemap'       : 2 columns [category (categorical), value (numeric)]
            - 'sunburst'      : >=2 columns [levels (categorical), value (numeric)]
            - 'choropleth'    : 2 columns [region (categorical/ISO-3), value (numeric)]
            - 'candlestick'   : 4+ columns ['open','high','low','close'] and 'date' as index (numeric prices)

        graph_type : str
            Type of graph to generate. Supported types: see above.

        size : tuple, optional, default (10,6)
            Figure size in inches (width, height).

        color : str or list, optional, default 'blue'
            Color specification for the plot. Can be a single color or a list of colors
            for multi-series plots. For plotly charts, used as color scale.

        stacked : bool, optional, default False
            Only relevant for area/stacked_area/stream plots. If True, series are stacked.

        Returns
        -------
        None
            The function directly displays the plot and does not return a value.

        Notes
        -----
        - Matplotlib is used for standard line, bar, scatter, histogram, area, waterfall plots.
        - Seaborn is used for box, violin, and pairplot visualizations.
        - mplfinance is used for candlestick charts.
        - Plotly is used for treemap, sunburst, and choropleth maps.
        - Users must ensure required columns exist in the DataFrame and have the correct type; otherwise, a ValueError is raised.
        - The method automatically labels axes and titles for clarity.
        """

        plt.figure(figsize=size)

        if graph_type == 'line':
            if len(columns) != 2:
                raise ValueError("Line plot requires 2 columns: x and y")
            plt.plot(df[columns[0]], df[columns[1]], color=color)
            plt.xlabel(columns[0])
            plt.ylabel(columns[1])
            plt.title(f"Line Plot: {columns[1]} vs {columns[0]}")
            plt.show()

        elif graph_type == 'bar':
            if len(columns) == 1:
                df[columns[0]].value_counts().plot(kind='bar', color=color)
            elif len(columns) == 2:
                plt.bar(df[columns[0]], df[columns[1]], color=color)
            else:
                raise ValueError("Bar plot requires 1 or 2 columns")
            plt.title(f"Bar Plot")
            plt.show()

        elif graph_type == 'hist':
            if len(columns) != 1:
                raise ValueError("Histogram requires 1 column")
            plt.hist(df[columns[0]], color=color, bins=20)
            plt.title(f"Histogram: {columns[0]}")
            plt.show()

        elif graph_type == 'scatter':
            if len(columns) != 2:
                raise ValueError("Scatter plot requires 2 columns: x and y")
            plt.scatter(df[columns[0]], df[columns[1]], color=color)
            plt.xlabel(columns[0])
            plt.ylabel(columns[1])
            plt.title(f"Scatter Plot: {columns[1]} vs {columns[0]}")
            plt.show()

        elif graph_type == 'box':
            if len(columns) != 1:
                raise ValueError("Box plot requires 1 column")
            sns.boxplot(y=df[columns[0]], color=color)
            plt.title(f"Box Plot: {columns[0]}")
            plt.show()

        elif graph_type == 'violin':
            if len(columns) != 1:
                raise ValueError("Violin plot requires 1 column")
            sns.violinplot(y=df[columns[0]], color=color)
            plt.title(f"Violin Plot: {columns[0]}")
            plt.show()

        elif graph_type == 'pairplot':
            if len(columns) < 2:
                raise ValueError("Pairplot requires at least 2 columns")
            sns.pairplot(df[columns], palette=color)
            plt.show()

        elif graph_type == 'candlestick':
            if not all(c in df.columns for c in ['open','high','low','close']):
                raise ValueError("Candlestick plot requires columns: 'open','high','low','close'")
            mpf.plot(df.set_index('date'), type='candle', style='charles', figsize=size)

        elif graph_type == 'area':
            if len(columns) < 2:
                raise ValueError("Area plot requires at least 2 columns: x + y series")
            if stacked:
                plt.stackplot(df[columns[0]], df[columns[1:]], labels=columns[1:], colors=color if isinstance(color,list) else None)
            else:
                for col in columns[1:]:
                    plt.plot(df[columns[0]], df[col], label=col, color=color)
            plt.xlabel(columns[0])
            plt.title("Area Plot")
            plt.legend()
            plt.show()

        elif graph_type == 'pie':
            if len(columns) == 1:
                df[columns[0]].value_counts().plot(kind='pie', autopct='%1.1f%%', colors=color if isinstance(color,list) else None)
            elif len(columns) == 2:
                df.groupby(columns[0])[columns[1]].sum().plot(kind='pie', autopct='%1.1f%%', colors=color if isinstance(color,list) else None)
            else:
                raise ValueError("Pie plot requires 1 or 2 columns")
            plt.title("Pie Chart")
            plt.ylabel('')
            plt.show()

        elif graph_type == 'treemap':
            if len(columns) != 2:
                raise ValueError("Treemap requires 2 columns: category + value")
            fig = px.treemap(df, path=[columns[0]], values=columns[1], color=columns[1], color_continuous_scale=color)
            fig.show()

        elif graph_type == 'sunburst':
            if len(columns) < 2:
                raise ValueError("Sunburst requires at least 2 columns: levels + value")
            fig = px.sunburst(df, path=columns[:-1], values=columns[-1], color=columns[-1], color_continuous_scale=color)
            fig.show()

        elif graph_type == 'stacked_area' or graph_type == 'stream':
            if len(columns) < 2:
                raise ValueError("Stacked/Stream plot requires at least 2 columns: x + multiple series")
            plt.stackplot(df[columns[0]], df[columns[1:]], labels=columns[1:], colors=color if isinstance(color,list) else None)
            plt.xlabel(columns[0])
            plt.title(f"{graph_type.replace('_',' ').title()} Plot")
            plt.legend()
            plt.show()

        elif graph_type == 'waterfall':
            if len(columns) != 2:
                raise ValueError("Waterfall plot requires 2 columns: category + value")
            cumulative = df[columns[1]].cumsum()
            colors = ['green' if v >= 0 else 'red' for v in df[columns[1]]]
            plt.bar(df[columns[0]], df[columns[1]], color=colors)
            plt.plot(df[columns[0]], cumulative, color='blue', marker='o', linestyle='--')
            plt.title("Waterfall Chart")
            plt.show()

        elif graph_type == 'choropleth':
            if len(columns) != 2:
                raise ValueError("Choropleth requires 2 columns: region + value")
            fig = px.choropleth(df, locations=columns[0], color=columns[1], color_continuous_scale=color, locationmode='ISO-3')
            fig.show()

        else:
            raise ValueError(f"Graph type '{graph_type}' not recognized")



    # ---------------- Detect Outliers ----------------5
    @staticmethod
    def detect_outliers_advanced(df, numeric_cols, z_thresh=3, mod_z_thresh=3.5, rolling_window=5, price_change_thresh=0.05, plot_graphs=True,combine='union',vote_thresh=None):
        """
        Detect outliers in numeric columns of a DataFrame using multiple statistical and rolling methods.
        This static method applies several techniques to identify outliers in numeric columns. 
        Users can choose to combine the results using union, intersection, or a voting threshold.
        Optionally, it can generate visualizations for each column.

        Parameters
        ----------
        df : pd.DataFrame
            Input DataFrame containing numeric columns for outlier detection. A 'date' column
            can be included for scatter/time-series plots.

        numeric_cols : list of str
            List of numeric column names to check for outliers.

        z_thresh : float, optional, default 3
            Threshold for standard Z-score method. Points with |Z| > z_thresh are considered outliers.

        mod_z_thresh : float, optional, default 3.5
            Threshold for modified Z-score method (based on median and MAD). Points with |modified Z| > mod_z_thresh are outliers.

        rolling_window : int, optional, default 5
            Window size for rolling mean and rolling standard deviation outlier detection.

        price_change_thresh : float, optional, default 0.05
            Threshold for detecting outliers based on absolute percent change between consecutive rows.

        plot_graphs : bool, optional, default True
            If True, generates boxplot, histogram/KDE, scatter/time series (if 'date' exists), and violin plot for each column.

        combine : str, optional, default 'union'
            Method to combine outlier indices from different techniques:
            - 'union': combine all detected indices
            - 'intersection': only keep indices detected by all methods

        vote_thresh : int, optional, default None
            If specified, only indices detected by at least `vote_thresh` methods are considered outliers.
            Overrides `combine` if provided.

        Returns
        -------
        dict
            Dictionary structured as:
            {
                'column_name': {
                    'per_method': {
                        'z_score': set(indices),
                        'modified_z': set(indices),
                        'iqr': set(indices),
                        'rolling': set(indices),
                        'price_change': set(indices),
                        'returns_z': set(indices),
                        'cusum': set(indices)
                    },
                    'combined': set(indices)
                },
                ...
            }
            - 'per_method': indices detected by each individual method
            - 'combined': final set of outlier indices after union/intersection/vote_thresh

        Notes
        -----
        Outlier detection methods included:
        1. Z-score: standard score using mean and std deviation
        2. Modified Z-score: robust score using median and MAD
        3. IQR: points outside 1.5*IQR from Q1 and Q3
        4. Rolling: points outside 3*rolling_std from rolling_mean
        5. Price Change: absolute percentage change > price_change_thresh
        6. Returns Z-score: Z-score on percent changes
        7. CUSUM: cumulative sum method detecting sudden shifts

        Visualizations (if plot_graphs=True):
        - Boxplot
        - Histogram / KDE
        - Scatter / Time Series (if 'date' exists)
        - Violin plot
        """

        outliers = {col:{} for col in numeric_cols}
        for col in numeric_cols:
            series = df[col].dropna()
            if series.empty:
                continue
            
            method_outliers = {}
            
            z_scores = zscore(series)
            method_outliers['z_score'] = set(series.index[(z_scores > z_thresh) | (z_scores < -z_thresh)].tolist())
            
            med = series.median()
            mad = median_abs_deviation(series, scale='normal')
            mod_z = 0.6745 * (series - med) / mad
            method_outliers['modified_z'] = set(series.index[(mod_z > mod_z_thresh) | (mod_z < -mod_z_thresh)].tolist())
            
            Q1 = series.quantile(0.25)
            Q3 = series.quantile(0.75)
            IQR = Q3 - Q1
            method_outliers['iqr'] = set(series.index[(series < Q1 - 1.5*IQR) | (series > Q3 + 1.5*IQR)].tolist())
            
            roll_mean = series.rolling(rolling_window, min_periods=1).mean()
            roll_std = series.rolling(rolling_window, min_periods=1).std()
            method_outliers['rolling'] = set(series.index[(series > roll_mean + 3*roll_std) | (series < roll_mean - 3*roll_std)].tolist())
            
            returns = series.pct_change()
            method_outliers['price_change'] = set(returns.index[returns.abs() > price_change_thresh].tolist())
            
            returns_no_na = returns.dropna()
            if not returns_no_na.empty:
                returns_z = zscore(returns_no_na)
                method_outliers['returns_z'] = set(returns_no_na.index[(returns_z > z_thresh) | (returns_z < -z_thresh)].tolist())
            else:
                method_outliers['returns_z'] = set()
            
            mean_val = series.mean()
            cusum_pos, cusum_neg = 0, 0
            cusum_idx = []
            for idx, x in zip(series.index, series):
                cusum_pos = max(0, cusum_pos + x - mean_val)
                cusum_neg = min(0, cusum_neg + x - mean_val)
                if cusum_pos > 3*series.std() or abs(cusum_neg) > 3*series.std():
                    cusum_idx.append(idx)
                    cusum_pos, cusum_neg = 0,0
            method_outliers['cusum'] = set(cusum_idx)
            
            if vote_thresh is not None:
                all_indices = [idx for s in method_outliers.values() for idx in s]
                from collections import Counter
                counts = Counter(all_indices)
                outliers[col]['combined'] = set(idx for idx, c in counts.items() if c >= vote_thresh)
            else:
                if combine == 'union':
                    outliers[col]['combined'] = set().union(*method_outliers.values())
                elif combine == 'intersection':
                    if method_outliers.values():
                        outliers[col]['combined'] = set.intersection(*method_outliers.values())
                    else:
                        outliers[col]['combined'] = set()
                else:
                    raise ValueError("combine must be 'union' or 'intersection'")
            
            outliers[col]['per_method'] = method_outliers
            
            if plot_graphs and not series.empty:
                plt.figure(figsize=(12,4))
                sns.boxplot(y=series)
                plt.title(f"Boxplot of {col}")
                plt.show()
                
                plt.figure(figsize=(12,4))
                sns.histplot(series, kde=True, color='orange', bins=20)
                plt.title(f"Histogram / KDE of {col}")
                plt.show()
                
                plt.figure(figsize=(12,4))
                if 'date' in df.columns:
                    plt.scatter(df['date'], df[col], color='green')
                    plt.title(f"Scatter / Time Series Plot of {col}")
                    plt.show()
                
                plt.figure(figsize=(12,4))
                sns.violinplot(y=series, color='purple')
                plt.title(f"Violin Plot of {col}")
                plt.show()
                
        return outliers

    # ---------------- Treat Outliers ----------------6
    @staticmethod
    def treat_outliers(df, outlier_results, treatment_map):
        """
        Treat outliers in a DataFrame based on previously detected outlier indices.
        This static method applies various treatments to handle outliers in numeric
        columns. Users can choose from deletion, capping, replacement, transformations,
        smoothing, and imputation techniques. Treatments are specified per column using
        a `treatment_map`.

        Parameters
        ----------
        df : pd.DataFrame
            Input DataFrame containing numeric columns to treat outliers.

        outlier_results : dict
            Dictionary of outlier indices as returned by `detect_outliers_advanced`.
            Structure:
            {
                'column_name': {
                    'per_method': {...},
                    'combined': set(indices)
                },
                ...
            }

        treatment_map : dict
            Dictionary specifying how to treat outliers per column. Each column maps to
            a list of tuples: (method_name: str, params: dict)

            Supported methods and example formats:

            - "delete"          : [('delete', {})]  
            Deletes rows containing outliers.

            - "winsorize" / "cap" : [('winsorize', {'lower':0.01, 'upper':0.99})]  
            Limits extreme values to specified quantiles (lower, upper).

            - "median_replace"  : [('median_replace', {})]  
            Replaces outliers with the median of the column.

            - "mean_cap"        : [('mean_cap', {'k':3})]  
            Clips outliers to mean ± k*std deviation.

            - "log_transform"   : [('log_transform', {})]  
            Applies log(1 + x) transform; negative values clipped to 0.

            - "sqrt_transform"  : [('sqrt_transform', {})]  
            Applies square root transform; negative values clipped to 0.

            - "boxcox"          : [('boxcox', {})]  
            Applies Box-Cox transform; values must be positive.

            - "robust_flag"     : [('robust_flag', {})]  
            Adds a new column `<col>_is_outlier` with 1 for outliers, 0 otherwise.

            - "interpolate" / "interpolate_method" : [('interpolate_linear', {})]  
            Interpolates outliers using linear, quadratic, cubic, etc.  
            - Params: {'method': 'linear'} (optional if using method suffix)

            - "rolling_mean"    : [('rolling_mean', {'window':5})]  
            Replaces values with rolling mean over specified window.

            - "rolling_median"  : [('rolling_median', {'window':5})]  
            Replaces values with rolling median over specified window.

            - "ema_smooth"      : [('ema_smooth', {'alpha':0.3})]  
            Smooths series using exponential moving average with given alpha.

            - "kalman"          : [('kalman', {})]  
            Smooths values using Kalman filter.

            - "markov_prev"     : [('markov_prev', {})]  
            Replaces outlier with previous non-outlier value.

            - "markov_avg"      : [('markov_avg', {})]  
            Replaces outlier with average of previous and next values.

            Notes:
            - Multiple methods can be applied sequentially per column.
            - Parameters are optional; defaults will be used if not provided.
            - Methods like "kalman", "rolling_mean", and "rolling_median" require numeric data.
            - Unknown method names will raise ValueError.

            Example treatment_map:
            treatment_map = {
                'price': [('winsorize', {'lower':0.01, 'upper':0.99}), ('log_transform', {})],
                'volume': [('median_replace', {})],
                'returns': [('ema_smooth', {'alpha':0.2})]
            }

        Returns
        -------
        pd.DataFrame
            A copy of the input DataFrame with outliers treated according to the
            specified methods. The original DataFrame remains unchanged.
        """
        
        df_treated = df.copy()
        
        for col, methods in treatment_map.items():
            if col not in outlier_results:
                continue
            
            outlier_idx = list(outlier_results[col]['combined'])
            
            for method, params in methods:
                if method == "delete":
                    df_treated = df_treated.drop(index=outlier_idx)
                
                elif method in ["winsorize", "cap"]:
                    from scipy.stats import mstats
                    lower = params.get('lower', 0.01)
                    upper = params.get('upper', 0.99)
                    df_treated[col] = mstats.winsorize(df_treated[col], limits=(lower, 1-upper))
                
                elif method == "median_replace":
                    df_treated.loc[outlier_idx, col] = df_treated[col].median()
                
                elif method == "mean_cap":
                    k = params.get('k', 3)
                    mean_val = df_treated[col].mean()
                    std_val = df_treated[col].std()
                    lower, upper = mean_val - k*std_val, mean_val + k*std_val
                    df_treated.loc[outlier_idx, col] = np.clip(df_treated.loc[outlier_idx, col], lower, upper)
                
                elif method == "log_transform":
                    df_treated[col] = np.log1p(df_treated[col].clip(lower=0))
                
                elif method == "sqrt_transform":
                    df_treated[col] = np.sqrt(df_treated[col].clip(lower=0))
                
                elif method == "boxcox":
                    from scipy.stats import boxcox
                    positive_vals = df_treated[col].clip(lower=1e-6)
                    df_treated[col], _ = boxcox(positive_vals)
                
                elif method == "robust_flag":
                    df_treated[col + "_is_outlier"] = 0
                    df_treated.loc[outlier_idx, col + "_is_outlier"] = 1
                
                elif method.startswith("interpolate"):
                    interp_type = method.split("_")[1] if "_" in method else params.get('method','linear')
                    df_treated[col] = df_treated[col].interpolate(method=interp_type)
                
                elif method == "rolling_mean":
                    window = params.get('window', 5)
                    df_treated[col] = df_treated[col].rolling(window, min_periods=1).mean()
                
                elif method == "rolling_median":
                    window = params.get('window', 5)
                    df_treated[col] = df_treated[col].rolling(window, min_periods=1).median()
                
                elif method == "ema_smooth":
                    alpha = params.get('alpha', 0.3)
                    df_treated[col] = df_treated[col].ewm(alpha=alpha, adjust=False).mean()
                
                elif method == "kalman":
                    series = df_treated[col].copy()
                    if series.dropna().empty:
                        continue
                    values = series.interpolate(limit_direction="both").values
                    kf = KalmanFilter(
                        transition_matrices=[1],
                        observation_matrices=[1],
                        initial_state_mean=values[0],
                        initial_state_covariance=1,
                        transition_covariance=0.01,
                        observation_covariance=1
                    )
                    state_means, _ = kf.smooth(values)
                    df_treated[col] = pd.Series(state_means.flatten(), index=series.index)
                
                elif method == "markov_prev":
                    df_treated.loc[outlier_idx, col] = df_treated[col].shift(1).loc[outlier_idx]
                
                elif method == "markov_avg":
                    df_treated.loc[outlier_idx, col] = (
                        (df_treated[col].shift(1) + df_treated[col].shift(-1)) / 2
                    ).loc[outlier_idx]
                
                else:
                    raise ValueError(f"Unknown outlier treatment method: {method}")
        
        return df_treated
    
    # ---------------- Test Data ----------------7
    @staticmethod
    def test_data():
        """
        Generate a predefined test DataFrame containing sample OHLCV (Open, High, Low, Close, Volume) data.

        Returns
        -------
        pd.DataFrame
            A pandas DataFrame with the following columns:
            - date (datetime): Business dates from 2025-10-01 to 2025-10-28
            - low (float): Daily low prices
            - high (float): Daily high prices
            - open (float): Daily opening prices
            - close (float): Daily closing prices
            - volume (int): Trading volumes

        test_df = pd.DataFrame({
            'date': [
                '2025-10-01', '2025-10-02', '2025-10-03', '2025-10-06', '2025-10-07',
                '2025-10-08', '2025-10-09', '2025-10-10', '2025-10-13', '2025-10-14',
                '2025-10-15', '2025-10-16', '2025-10-17', '2025-10-20', '2025-10-21',
                '2025-10-22', '2025-10-23', '2025-10-24', '2025-10-27', '2025-10-28'
            ],
            'low': [
                98.37, 99.10, 101.25, 102.40, 103.55, 104.10, 105.00, 106.25, 107.00, 108.10,
                108.50, 109.00, 110.20, 111.00, 111.50, 112.00, 112.80, 113.50, 114.10, 115.00
            ],
            'high': [
                103.75, 104.50, 106.80, 107.95, 109.20, 110.00, 111.30, 112.50, 113.20, 114.50,
                115.00, 116.20, 117.50, 118.00, 118.50, 119.20, 120.00, 120.50, 121.30, 122.00
            ],
            'open': [
                100.50, 101.80, 103.40, 104.80, 105.60, 106.20, 107.15, 108.50, 109.10, 110.20,
                110.80, 111.50, 112.40, 113.20, 113.80, 114.50, 115.20, 116.00, 116.50, 117.20
            ],
            'close': [
                101.25, 102.40, 104.50, 105.90, 106.80, 107.50, 108.70, 109.80, 110.60, 111.50,
                112.20, 112.90, 113.80, 114.50, 115.00, 115.80, 116.50, 117.00, 117.80, 118.50
            ],
            'volume': [
                1711, 2378, 3200, 4100, 3850, 4500, 5000, 9750, 4900, 5100,
                5300, 5500, 5700, 5900, 6000, 6200, 6400, 6500, 6700, 6900
            ]
        })
        """
        test_df = pd.DataFrame({
            'date': [
                '2025-10-01', '2025-10-02', '2025-10-03', '2025-10-06', '2025-10-07',
                '2025-10-08', '2025-10-09', '2025-10-10', '2025-10-13', '2025-10-14',
                '2025-10-15', '2025-10-16', '2025-10-17', '2025-10-20', '2025-10-21',
                '2025-10-22', '2025-10-23', '2025-10-24', '2025-10-27', '2025-10-28'
            ],
            'low': [
                98.37, 99.10, 101.25, 102.40, 103.55, 104.10, 105.00, 106.25, 107.00, 108.10,
                108.50, 109.00, 110.20, 111.00, 111.50, 112.00, 112.80, 113.50, 114.10, 115.00
            ],
            'high': [
                103.75, 104.50, 106.80, 107.95, 109.20, 110.00, 111.30, 112.50, 113.20, 114.50,
                115.00, 116.20, 117.50, 118.00, 118.50, 119.20, 120.00, 120.50, 121.30, 122.00
            ],
            'open': [
                100.50, 101.80, 103.40, 104.80, 105.60, 106.20, 107.15, 108.50, 109.10, 110.20,
                110.80, 111.50, 112.40, 113.20, 113.80, 114.50, 115.20, 116.00, 116.50, 117.20
            ],
            'close': [
                101.25, 102.40, 104.50, 105.90, 106.80, 107.50, 108.70, 109.80, 110.60, 111.50,
                112.20, 112.90, 113.80, 114.50, 115.00, 115.80, 116.50, 117.00, 117.80, 118.50
            ],
            'volume': [
                1711, 2378, 3200, 4100, 3850, 4500, 5000, 9750, 4900, 5100,
                5300, 5500, 5700, 5900, 6000, 6200, 6400, 6500, 6700, 6900
            ]
        })
        
        test_df['date'] = pd.to_datetime(test_df['date'])
        return test_df
