# Base estimator

> `PoniardBaseEstimator` is where the magic happens. As a user, you should be using `PoniardClassifier` and `PoniardRegressor`

In [None]:
#| default_exp estimators.core

In [None]:
#| hide
from nbdev.showdoc import *

## Introduction

In [None]:
#| export

from __future__ import annotations
import warnings
import itertools
import inspect
from abc import ABC, abstractmethod
from typing import List, Optional, Union, Callable, Dict, Tuple, Any, Sequence, Iterable

import pandas as pd
import numpy as np
import joblib

try:
    import ipywidgets
    from tqdm.notebook import tqdm
except ImportError:
    from tqdm import tqdm
from sklearn.base import ClassifierMixin, RegressorMixin, TransformerMixin, clone
from sklearn.model_selection._split import BaseCrossValidator, BaseShuffleSplit
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
    StandardScaler,
    MinMaxScaler,
    RobustScaler,
    OneHotEncoder,
    OrdinalEncoder,
)
from sklearn.feature_selection import VarianceThreshold
from sklearn.ensemble import (
    VotingClassifier,
    VotingRegressor,
    StackingClassifier,
    StackingRegressor,
)
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.dummy import DummyClassifier, DummyRegressor
from sklearn.model_selection import (
    cross_validate,
    cross_val_predict,
    GridSearchCV,
    RandomizedSearchCV,
)
from sklearn.impute import SimpleImputer
from sklearn.exceptions import UndefinedMetricWarning

from poniard.preprocessing import DatetimeEncoder, TargetEncoder
from poniard.utils.stats import cramers_v
from poniard.utils.hyperparameters import GRID
from poniard.utils.estimate import get_target_info, element_to_list_maybe
from poniard.plot import PoniardPlotFactory

In [None]:
#| export

class PoniardBaseEstimator(ABC):
    """Base estimator that sets up all the functionality for the classifier and regressor.

    Parameters
    ----------
    estimators :
        Estimators to evaluate.
    metrics :
        Metrics to compute for each estimator. This is more restrictive than sklearn's scoring
        parameter, as it does not allow callable scorers. Single strings are cast to lists
        automatically.
    preprocess : bool, optional
        If True, impute missing values, standard scale numeric data and one-hot or ordinal
        encode categorical data.
    scaler :
        Numeric scaler method. Either "standard", "minmax", "robust" or scikit-learn Transformer.
    high_cardinality_encoder :
        Encoder for categorical features with high cardinality. Either "target" or "ordinal",
        or scikit-learn Transformer.
    numeric_imputer :
        Imputation method. Either "simple", "iterative" or scikit-learn Transformer.
    custom_preprocessor :
        Preprocessor used instead of the default preprocessing pipeline. It must be able to be
        included directly in a scikit-learn Pipeline.
    numeric_threshold :
        Number features with unique values above a certain threshold will be treated as numeric. If
        float, the threshold is `numeric_threshold * samples`.
    cardinality_threshold :
        Non-number features with cardinality above a certain threshold will be treated as
        ordinal encoded instead of one-hot encoded. If float, the threshold is
        `cardinality_threshold * samples`.
    cv :
        Cross validation strategy. Either an integer, a scikit-learn cross validation object,
        or an iterable.
    verbose :
        Verbosity level. Propagated to every scikit-learn function and estimator.
    random_state :
        RNG. Propagated to every scikit-learn function and estimator. The default None sets
        random_state to 0 so that cross_validate results are comparable.
    n_jobs :
        Controls parallel processing. -1 uses all cores. Propagated to every scikit-learn
        function.
    plugins :
        Plugin instances that run in set moments of setup, fit and plotting.
    plot_options :
        :class:poniard.plot.plot_factory.PoniardPlotFactory instance specifying Plotly format
        options or None, which sets the default factory.
    cache_transformations :
        Whether to cache transformations and set the `memory` parameter for Pipelines. This can speed up slow transformations as they are not recalculated for each estimator.
    """

    def __init__(
        self,
        estimators: Optional[
            Union[
                Sequence[ClassifierMixin],
                Dict[str, ClassifierMixin],
                Sequence[RegressorMixin],
                Dict[str, RegressorMixin],
            ]
        ] = None,
        metrics: Optional[Union[str, Dict[str, Callable], Sequence[str]]] = None,
        preprocess: bool = True,
        scaler: Optional[Union[str, TransformerMixin]] = None,
        high_cardinality_encoder: Optional[Union[str, TransformerMixin]] = None,
        numeric_imputer: Optional[Union[str, TransformerMixin]] = None,
        custom_preprocessor: Union[None, Pipeline, TransformerMixin] = None,
        numeric_threshold: Union[int, float] = 0.1,
        cardinality_threshold: Union[int, float] = 20,
        cv: Union[int, BaseCrossValidator, BaseShuffleSplit, Sequence] = None,
        verbose: int = 0,
        random_state: Optional[int] = None,
        n_jobs: Optional[int] = None,
        plugins: Optional[Sequence[Any]] = None,
        plot_options: Optional[PoniardPlotFactory] = None,
        cache_transformations: bool = False,
    ):
        # TODO: Ugly check that metrics conforms to expected types. Should improve.
        if metrics and (
            (
                isinstance(metrics, Sequence)
                and not all(isinstance(m, str) for m in metrics)
            )
            or (
                isinstance(metrics, Dict)
                and not all(isinstance(m, str) for m in metrics.keys())
                and not all(isinstance(m, Callable) for m in metrics.values())
            )
        ):
            raise ValueError(
                "metrics can only be a string, a sequence of strings, a dict with "
                "strings as keys and callables as values, or None."
            )
        self.metrics = metrics
        self.preprocess = preprocess
        self.scaler = scaler or "standard"
        self.high_cardinality_encoder = high_cardinality_encoder or "target"
        self.numeric_imputer = numeric_imputer or "simple"
        self.numeric_threshold = numeric_threshold
        self.custom_preprocessor = custom_preprocessor
        self.cardinality_threshold = cardinality_threshold
        self.cv = cv
        self.verbose = verbose
        self.random_state = random_state or 0
        self.estimators = element_to_list_maybe(estimators)
        self.n_jobs = n_jobs
        if cache_transformations:
            self._memory = joblib.Memory("transformation_cache", verbose=self.verbose)
        else:
            self._memory = None

        self._init_plugins(plugins)
        self._init_plots(plot_options)

    def _init_plugins(self, plugins: Optional[Sequence[Any]] = None) -> None:
        self.plugins = element_to_list_maybe(plugins)
        if self.plugins:
            [setattr(plugin, "_poniard", self) for plugin in self.plugins]
        return

    def _init_plots(self, plot_options: Optional[PoniardPlotFactory] = None) -> None:
        self.plot_options = plot_options or PoniardPlotFactory()
        self.plot = self.plot_options
        self.plot._poniard = self
        return

    @property
    def poniard_task(self) -> Optional[str]:
        """Check whether self is a Poniard regressor or classifier.

        Returns
        -------
        Optional[str]
            "regression", "classification" or None
        """
        from poniard import PoniardRegressor, PoniardClassifier

        if isinstance(self, PoniardRegressor):
            return "regression"
        elif isinstance(self, PoniardClassifier):
            return "classification"
        else:
            return None

    def setup(
        self,
        X: Union[pd.DataFrame, np.ndarray, List],
        y: Union[pd.DataFrame, np.ndarray, List],
    ) -> PoniardBaseEstimator:
        """Orchestrator.

        Converts inputs to arrays if necessary, sets `metrics`,
        `preprocessor`, `cv` and `pipelines`.
        
        After running `setup`, both `X` and `y` will be held as attributes.


        Parameters
        ----------
        X :
            Features.
        y :
            Target

        """
        self._run_plugin_method("on_setup_start")
        if not isinstance(X, (pd.DataFrame, pd.Series, np.ndarray)):
            X = np.array(X)
        if not isinstance(y, (pd.DataFrame, pd.Series, np.ndarray)):
            y = np.array(y)
        self.X = X
        self.y = y
        self._run_plugin_method("on_setup_data")

        self.target_info = get_target_info(self.y, self.poniard_task)
        print("Target info", "-----------", sep="\n")
        print(
            f"Type: {self.target_info['type_']}",
            f"Shape: {self.target_info['shape']}",
            f"Unique values: {self.target_info['nunique']}",
            sep="\n",
            end="\n\n",
        )
        if self.target_info["type_"] == "multiclass-multioutput":
            raise NotImplementedError(
                "multiclass-multioutput targets are not supported as "
                "no sklearn metrics support them."
            )

        if self.metrics:
            self.metrics = element_to_list_maybe(self.metrics)
        else:
            self.metrics = self._build_metrics()
        print(
            "Main metric",
            "-----------",
            self._first_scorer(sklearn_scorer=False),
            sep="\n",
            end="\n\n",
        )

        if self.preprocess:
            if self.custom_preprocessor:
                self.preprocessor = self.custom_preprocessor
            else:
                self.preprocessor = self._build_preprocessor()
        self._run_plugin_method("on_setup_preprocessor")

        self.pipelines = self._build_pipelines()

        self.cv = self._build_cv()

        self._run_plugin_method("on_setup_end")
        return self

    def _infer_dtypes(self) -> Tuple[List[str], List[str], List[str]]:
        """Infer feature types (numeric, low-cardinality categorical or high-cardinality
        categorical).

        Returns
        -------
        List[str], List[str], List[str]
            Three lists with column names or indices.
        """
        X = self.X
        numeric = []
        categorical_high = []
        categorical_low = []
        datetime = []
        if not isinstance(self.cardinality_threshold, int):
            self.cardinality_threshold = int(self.cardinality_threshold * X.shape[0])
        if not isinstance(self.numeric_threshold, int):
            self.numeric_threshold = int(self.numeric_threshold * X.shape[0])
        print(
            "Thresholds",
            "----------",
            f"Minimum unique values to consider a feature numeric: {self.numeric_threshold}",
            f"Minimum unique values to consider a categorical high cardinality: {self.cardinality_threshold}",
            sep="\n",
            end="\n\n",
        )
        if isinstance(X, pd.DataFrame):
            datetime = X.select_dtypes(
                include=["datetime64[ns]", "datetimetz"]
            ).columns.tolist()
            numbers = X.select_dtypes(include="number").columns
            for column in numbers:
                if X[column].nunique() > self.numeric_threshold:
                    numeric.append(column)
                elif X[column].nunique() > self.cardinality_threshold:
                    categorical_high.append(column)
                else:
                    categorical_low.append(column)
            strings = X.select_dtypes(exclude=["number", "datetime"]).columns
            for column in strings:
                if X[column].nunique() > self.cardinality_threshold:
                    categorical_high.append(column)
                else:
                    categorical_low.append(column)
        else:
            if np.issubdtype(X.dtype, np.datetime64):
                datetime.extend(range(X.shape[1]))
            if np.issubdtype(X.dtype, np.number):
                for i in range(X.shape[1]):
                    if np.unique(X[:, i]).shape[0] > self.numeric_threshold:
                        numeric.append(i)
                    elif np.unique(X[:, i]).shape[0] > self.cardinality_threshold:
                        categorical_high.append(i)
                    else:
                        categorical_low.append(i)
            else:
                for i in range(X.shape[1]):
                    if np.unique(X[:, i]).shape[0] > self.cardinality_threshold:
                        categorical_high.append(i)
                    else:
                        categorical_low.append(i)
        self._inferred_types = {
            "numeric": numeric,
            "categorical_high": categorical_high,
            "categorical_low": categorical_low,
            "datetime": datetime,
        }
        print("Inferred feature types", "----------------------", sep="\n")
        self.inferred_types = pd.DataFrame.from_dict(
            self._inferred_types, orient="index"
        ).T.fillna("")
        try:
            # Try to print the table nicely
            from IPython.display import display, HTML

            display(HTML(self.inferred_types.to_html()))
            print("\n")
        except ImportError:
            print(self.inferred_types)
        self._run_plugin_method("on_infer_types")
        return numeric, categorical_high, categorical_low, datetime

    def _build_preprocessor(
        self, assigned_types: Optional[Dict[str, List[Union[str, int]]]] = None
    ) -> Pipeline:
        """Build default preprocessor.

        The preprocessor imputes missing values, scales numeric features and encodes categorical
        features according to inferred types.

        """
        X = self.X
        if hasattr(self, "preprocessor") and not assigned_types:
            return self.preprocessor
        if assigned_types:
            numeric = assigned_types["numeric"]
            categorical_high = assigned_types["categorical_high"]
            categorical_low = assigned_types["categorical_low"]
            datetime = assigned_types["datetime"]
        else:
            numeric, categorical_high, categorical_low, datetime = self._infer_dtypes()

        if isinstance(self.scaler, TransformerMixin):
            scaler = self.scaler
        elif self.scaler == "standard":
            scaler = StandardScaler()
        elif self.scaler == "minmax":
            scaler = MinMaxScaler()
        else:
            scaler = RobustScaler()

        target_is_multilabel = self.target_info["type_"] in [
            "multilabel-indicator",
            "multiclass-multioutput",
            "continuous-multioutput",
        ]
        if isinstance(self.high_cardinality_encoder, TransformerMixin):
            high_cardinality_encoder = self.high_cardinality_encoder
        elif self.high_cardinality_encoder == "target":
            if target_is_multilabel:
                warnings.warn(
                    "TargetEncoder is not supported for multilabel or multioutput targets. "
                    "Switching to OrdinalEncoder.",
                    stacklevel=2,
                )
                high_cardinality_encoder = OrdinalEncoder(
                    handle_unknown="use_encoded_value", unknown_value=99999
                )
            else:
                high_cardinality_encoder = TargetEncoder(
                    task=self.poniard_task, handle_unknown="ignore"
                )
        else:
            high_cardinality_encoder = OrdinalEncoder(
                handle_unknown="use_encoded_value", unknown_value=99999
            )

        cat_date_imputer = SimpleImputer(strategy="most_frequent")

        if isinstance(self.numeric_imputer, TransformerMixin):
            num_imputer = self.numeric_imputer
        elif self.numeric_imputer == "iterative":
            from sklearn.experimental import enable_iterative_imputer
            from sklearn.impute import IterativeImputer

            num_imputer = IterativeImputer(random_state=self.random_state)
        else:
            num_imputer = SimpleImputer(strategy="mean")

        numeric_preprocessor = Pipeline(
            [("numeric_imputer", num_imputer), ("scaler", scaler)]
        )
        cat_low_preprocessor = Pipeline(
            [
                ("categorical_imputer", cat_date_imputer),
                (
                    "one-hot_encoder",
                    OneHotEncoder(
                        drop="if_binary", handle_unknown="ignore", sparse=False
                    ),
                ),
            ]
        )
        cat_high_preprocessor = Pipeline(
            [
                ("categorical_imputer", cat_date_imputer),
                (
                    "high_cardinality_encoder",
                    high_cardinality_encoder,
                ),
            ],
        )
        datetime_preprocessor = Pipeline(
            [
                (
                    "datetime_encoder",
                    DatetimeEncoder(),
                ),
                ("datetime_imputer", cat_date_imputer),
            ],
        )
        if isinstance(X, pd.DataFrame):
            type_preprocessor = ColumnTransformer(
                [
                    ("numeric_preprocessor", numeric_preprocessor, numeric),
                    (
                        "categorical_low_preprocessor",
                        cat_low_preprocessor,
                        categorical_low,
                    ),
                    (
                        "categorical_high_preprocessor",
                        cat_high_preprocessor,
                        categorical_high,
                    ),
                    ("datetime_preprocessor", datetime_preprocessor, datetime),
                ],
                n_jobs=self.n_jobs,
            )
        else:
            if np.issubdtype(X.dtype, np.datetime64):
                type_preprocessor = datetime_preprocessor
            elif np.issubdtype(X.dtype, np.number):
                type_preprocessor = ColumnTransformer(
                    [
                        ("numeric_preprocessor", numeric_preprocessor, numeric),
                        (
                            "categorical_low_preprocessor",
                            cat_low_preprocessor,
                            categorical_low,
                        ),
                        (
                            "categorical_high_preprocessor",
                            cat_high_preprocessor,
                            categorical_high,
                        ),
                    ],
                    n_jobs=self.n_jobs,
                )
            else:
                type_preprocessor = ColumnTransformer(
                    [
                        (
                            "categorical_low_preprocessor",
                            cat_low_preprocessor,
                            categorical_low,
                        ),
                        (
                            "categorical_high_preprocessor",
                            cat_high_preprocessor,
                            categorical_high,
                        ),
                    ],
                    n_jobs=self.n_jobs,
                )
        # Some transformers might not be applied to any features, so we remove them.
        non_empty_transformers = [
            x for x in type_preprocessor.transformers if x[2] != []
        ]
        type_preprocessor.transformers = non_empty_transformers
        # If type_preprocessor has a single transformer, use the transformer directly.
        # This transformer generally is a Pipeline.
        if len(type_preprocessor.transformers) == 1:
            type_preprocessor = type_preprocessor.transformers[0][1]
        preprocessor = Pipeline(
            [
                ("type_preprocessor", type_preprocessor),
                ("remove_invariant", VarianceThreshold()),
            ],
            memory=self._memory,
        )
        return preprocessor

    @property
    @abstractmethod
    def _default_estimators(self) -> List[ClassifierMixin]:
        return []

    @property
    def estimators_(self):
        warnings.warn(
            "'estimators_' has been renamed to 'pipelines'",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.pipelines

    @property
    def preprocessor_(self):
        warnings.warn(
            "'preprocessor_' has been renamed to 'preprocessor'",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.preprocessor

    @property
    def metrics_(self):
        warnings.warn(
            "'metrics_' has been renamed to 'metrics'", DeprecationWarning, stacklevel=2
        )
        return self.metrics

    @property
    def cv_(self):
        warnings.warn(
            "'cv_' has been renamed to 'cv'", DeprecationWarning, stacklevel=2
        )
        return self.cv

    def show_results(
        self,
        std: bool = False,
        wrt_dummy: bool = False,
    ):
        warnings.warn(
            "'show_results' has been renamed to 'get_results'",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.get_results(return_train_scores=False, std=std, wrt_dummy=wrt_dummy)

    def _build_pipelines(
        self,
    ) -> Dict[str, Union[ClassifierMixin, RegressorMixin]]:
        """Build :attr:`pipelines` dict where keys are the estimator class names.

        Adds dummy estimators if not included during construction. Does nothing if
        :attr:`pipelines` exists.

        """
        if isinstance(self.estimators, dict):
            estimators = self.estimators.copy()
        elif self.estimators:
            estimators = {
                estimator.__class__.__name__: estimator for estimator in self.estimators
            }
        else:
            estimators = {
                estimator.__class__.__name__: estimator
                for estimator in self._default_estimators
            }
        estimators = self._add_dummy_estimators(estimators)

        for estimator in estimators.values():
            self._pass_instance_attrs(estimator)

        pipelines = {}
        if self.preprocess:
            pipelines.update(
                {
                    name: Pipeline(
                        [("preprocessor", self.preprocessor), (name, estimator)],
                        memory=self._memory,
                    )
                    for name, estimator in estimators.items()
                }
            )
        else:
            pipelines.update(
                {
                    name: Pipeline([(name, estimator)])
                    for name, estimator in estimators.items()
                }
            )
        self._fitted_pipeline_ids = []
        return pipelines

    def _add_dummy_estimators(self, estimators: dict):
        if (
            "DummyClassifier" in estimators.keys()
            or "DummyRegressor" in estimators.keys()
        ):
            return estimators
        if self.poniard_task == "classification":
            estimators.update({"DummyClassifier": DummyClassifier(strategy="prior")})
        elif self.poniard_task == "regression":
            estimators.update({"DummyRegressor": DummyRegressor(strategy="mean")})
        return estimators

    @abstractmethod
    def _build_metrics(self) -> Union[Dict[str, Callable], List[str]]:
        """Build metrics."""
        return ["accuracy"]

    @abstractmethod
    def _build_cv(self):
        return self.cv

    def fit(self) -> PoniardBaseEstimator:
        """This is the main Poniard method. It uses scikit-learn's `cross_validate` function to
        score all `metrics` for every `pipelines`, using `cv` for cross validation.

        Parameters
        ----------
        X :
            Features.
        y :
            Target.

        Returns
        -------
        PoniardBaseEstimator
            Self.
        """
        if not hasattr(self, "cv"):
            raise ValueError("`setup` must be called before `fit`.")
        self._run_plugin_method("on_fit_start")

        results = {}
        filtered_pipelines = {
            name: pipeline
            for name, pipeline in self.pipelines.items()
            if id(pipeline) not in self._fitted_pipeline_ids
        }
        pbar = tqdm(filtered_pipelines.items())
        for i, (name, pipeline) in enumerate(pbar):
            pbar.set_description(f"{name}")
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
                warnings.filterwarnings(
                    "ignore", message=".*will be encoded as all zeros"
                )
                result = cross_validate(
                    pipeline,
                    self.X,
                    self.y,
                    scoring=self.metrics,
                    cv=self.cv,
                    return_train_score=True,
                    verbose=self.verbose,
                    n_jobs=self.n_jobs,
                )
            results.update({name: result})
            self._fitted_pipeline_ids.append(id(pipeline))
            if i == len(pbar) - 1:
                pbar.set_description("Completed")
        if hasattr(self, "_experiment_results"):
            self._experiment_results.update(results)
        else:
            self._experiment_results = results

        self._process_results()
        self._process_long_results()
        self._run_plugin_method("on_fit_end")
        return self

    def _predict(
        self, method: str, estimator_names: Optional[Sequence[str]] = None
    ) -> Dict[str, np.ndarray]:
        """Helper method for predicting targets or target probabilities with cross validation.
        Accepts predict, predict_proba, predict_log_proba or decision_function."""
        if not hasattr(self, "cv"):
            raise ValueError("`setup` must be called before `predict`.")
        X, y = self.X, self.y
        estimator_names = element_to_list_maybe(estimator_names)
        if not estimator_names:
            estimator_names = [estimator for estimator in self.pipelines.keys()]
        results = {}
        pbar = tqdm(estimator_names)
        for i, name in enumerate(pbar):
            pbar.set_description(f"{name}")
            pipeline = self.pipelines[name]
            try:
                result = cross_val_predict(
                    pipeline,
                    X,
                    y,
                    cv=self.cv,
                    method=method,
                    verbose=self.verbose,
                    n_jobs=self.n_jobs,
                )
            except AttributeError:
                warnings.warn(
                    f"{name} does not support `{method}` method. Filling with nan.",
                    stacklevel=2,
                )
                result = np.empty(self.y.shape)
                result[:] = np.nan
            results.update({name: result})

            if not hasattr(self, "_experiment_results"):
                self._experiment_results = {}
                self._experiment_results.update({name: {method: result}})
            elif name not in self._experiment_results:
                self._experiment_results.update({name: {method: result}})
            else:
                self._experiment_results[name][method] = result

            if i == len(pbar) - 1:
                pbar.set_description("Completed")
        return results

    def predict(
        self, estimator_names: Optional[Sequence[str]] = None
    ) -> Dict[str, np.ndarray]:
        """Get cross validated target predictions where each sample belongs to a single test set.

        Parameters
        ----------
        estimator_names :
            Estimators to include. If None, predict all estimators.

        Returns
        -------
        Dict
            Dict where keys are estimator names and values are numpy arrays of predictions.
        """
        return self._predict(method="predict", estimator_names=estimator_names)

    def predict_proba(
        self, estimator_names: Optional[Sequence[str]] = None
    ) -> Dict[str, np.ndarray]:
        """Get cross validated target probability predictions where each sample belongs to a
        single test set.

        Returns
        -------
        Dict
            Dict where keys are estimator names and values are numpy arrays of prediction
            probabilities.
        """
        return self._predict(method="predict_proba", estimator_names=estimator_names)

    def decision_function(
        self, estimator_names: Optional[Sequence[str]] = None
    ) -> Dict[str, np.ndarray]:
        """Get cross validated decision function predictions where each sample belongs to a
        single test set.

        Parameters
        ----------
        estimator_names :
            Estimators to include. If None, predict all estimators.

        Returns
        -------
        Dict
            Dict where keys are estimator names and values are numpy arrays of prediction
            probabilities.
        """
        return self._predict(
            method="decision_function", estimator_names=estimator_names
        )

    def predict_all(
        self, estimator_names: Optional[Sequence[str]] = None
    ) -> Tuple[Dict[str, np.ndarray]]:
        """Get cross validated target predictions, probabilities and decision functions
        where each sample belongs to all test sets.

        Parameters
        ----------
        estimator_names :
            Estimators to include. If None, predict all estimators.

        Returns
        -------
        Dict
            Dict where keys are estimator names and values are numpy arrays of prediction
            probabilities.
        """
        return (
            self._predict(method="predict", estimator_names=estimator_names),
            self._predict(method="predict_proba", estimator_names=estimator_names),
            self._predict(method="decision_function", estimator_names=estimator_names),
        )

    def reassign_types(
        self,
        numeric: Optional[List[Union[str, int]]] = None,
        categorical_high: Optional[List[Union[str, int]]] = None,
        categorical_low: Optional[List[Union[str, int]]] = None,
        datetime: Optional[List[Union[str, int]]] = None,
    ) -> PoniardBaseEstimator:
        """Reassign feature types.

        Parameters
        ----------
        numeric :
            List of column names or indices. Default None.
        categorical_high :
            List of column names or indices. Default None.
        categorical_low :
            List of column names or indices. Default None.
        datetime :
            List of column names or indices. Default None.

        Returns
        -------
        PoniardBaseEstimator
            self.
        """
        assigned_types = {
            "numeric": numeric or [],
            "categorical_high": categorical_high or [],
            "categorical_low": categorical_low or [],
            "datetime": datetime or [],
        }
        self._inferred_types = assigned_types
        print("Assigned feature types", "----------------------", sep="\n")
        assigned_types_df = pd.DataFrame.from_dict(
            self._inferred_types, orient="index"
        ).T.fillna("")
        try:
            # Try to print the table nicely
            from IPython.display import display, HTML

            display(HTML(assigned_types_df.to_html()))
            print("\n")
        except ImportError:
            print(assigned_types_df)
        # Don't build the preprocessor if no preprocessing should be done or a
        # custom preprocessor was set.
        if not self.preprocess or self.custom_preprocessor is not None:
            return self
        self.preprocessor = self._build_preprocessor(assigned_types=assigned_types)
        self._run_plugin_method("on_reassign_types")
        self.pipelines = self._build_pipelines()
        return self

    def add_preprocessing_step(
        self,
        step: Union[
            Union[Pipeline, TransformerMixin, ColumnTransformer],
            Tuple[str, Union[Pipeline, TransformerMixin, ColumnTransformer]],
        ],
        position: Union[str, int] = "end",
    ) -> Pipeline:
        """Add a preprocessing step to :attr:`preprocessor`.

        Parameters
        ----------
        step :
            A tuple of (str, transformer) or a scikit-learn transformer. Note that
            the transformer can also be a Pipeline or ColumnTransformer.
        position :
            Either an integer denoting before which step in the existing preprocessing pipeline
            the new step should be added, or 'start' or 'end'.

        Returns
        -------
        PoniardBaseEstimator
            self
        """
        if not isinstance(position, int) and position not in ["start", "end"]:
            raise ValueError("`position` can only be int, 'start' or 'end'.")
        existing_preprocessor = self.preprocessor
        if not isinstance(step, Tuple):
            step = (f"step_{step.__class__.__name__.lower()}", step)
        if isinstance(position, str) and isinstance(existing_preprocessor, Pipeline):
            if position == "start":
                position = 0
            elif position == "end":
                position = len(existing_preprocessor.steps)
        if isinstance(existing_preprocessor, Pipeline):
            existing_preprocessor.steps.insert(position, step)
        else:
            if isinstance(position, int):
                raise ValueError(
                    "If the existing preprocessor is not a Pipeline, only 'start' and "
                    "'end' are accepted as `position`."
                )
            if position == "start":
                self.preprocessor = Pipeline(
                    [step, ("initial_preprocessor", self.preprocessor)],
                    memory=self._memory,
                )
            else:
                self.preprocessor = Pipeline(
                    [("initial_preprocessor", self.preprocessor), step],
                    memory=self._memory,
                )
        self.pipelines = self._build_pipelines()
        self._run_plugin_method("on_add_preprocessing_step")
        return self

    def get_results(
        self,
        return_train_scores: bool = False,
        std: bool = False,
        wrt_dummy: bool = False,
    ) -> Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]:
        """Return dataframe containing scoring results. By default returns the mean score and fit
        and score times. Optionally returns standard deviations as well.

        Parameters
        ----------
        return_train_scores :
            If False, only return test scores.
        std :
            Whether to return standard deviation of the scores. Default False.
        wrt_dummy :
            Whether to compute each score/time with respect to the dummy estimator results. Default
            False.

        Returns
        -------
        Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]
            Results
        """
        means = self._means
        stds = self._stds
        if not return_train_scores:
            means = means.loc[
                :, means.columns.str.contains("test_|fit|score", regex=True)
            ]
            stds = stds.loc[:, stds.columns.str.contains("test_|fit|score", regex=True)]
        if wrt_dummy:
            dummy_means = means.loc[means.index.str.contains("Dummy")]
            dummy_stds = stds.loc[stds.index.str.contains("Dummy")]
            means = means / dummy_means.squeeze()
            stds = stds / dummy_stds.squeeze()
        if std:
            return means, stds
        else:
            return means

    def add_estimators(
        self, estimators: Union[Dict[str, ClassifierMixin], Sequence[ClassifierMixin]]
    ) -> PoniardBaseEstimator:
        """Include new estimator. This is the recommended way of adding an estimator (as opposed
        to modifying :attr:`pipelines` directly), since it also injects random state, n_jobs
        and verbosity.

        Parameters
        ----------
        estimators :
            Estimators to add.

        Returns
        -------
        PoniardBaseEstimator
            Self.

        """
        estimators = element_to_list_maybe(estimators)
        if not isinstance(estimators, dict):
            new_estimators = {
                estimator.__class__.__name__: estimator for estimator in estimators
            }
        else:
            new_estimators = estimators
        for new_estimator in new_estimators.values():
            self._pass_instance_attrs(new_estimator)
        self.pipelines.update(new_estimators)
        self._run_plugin_method("on_add_estimators")
        return self

    def remove_estimators(
        self, estimator_names: Sequence[str], drop_results: bool = True
    ) -> PoniardBaseEstimator:
        """Remove estimators. This is the recommended way of removing an estimator (as opposed
        to modifying :attr:`pipelines` directly), since it also removes the associated rows from
        the results tables.

        Parameters
        ----------
        estimator_names :
            Estimators to remove.
        drop_results :
            Whether to remove the results associated with the estimators. Default True.

        Returns
        -------
        PoniardBaseEstimator
            Self.
        """
        estimator_names = element_to_list_maybe(estimator_names)
        pruned_estimators = {
            k: v for k, v in self.pipelines.items() if k not in estimator_names
        }
        if len(pruned_estimators) == 0:
            raise ValueError("Cannot remove all estimators.")
        self.pipelines = pruned_estimators
        if drop_results and hasattr(self, "_means"):
            self._means = self._means.loc[~self._means.index.isin(estimator_names)]
            self._stds = self._stds.loc[~self._stds.index.isin(estimator_names)]
            self._experiment_results = {
                k: v
                for k, v in self._experiment_results.items()
                if k not in estimator_names
            }
            self._process_long_results()
        self._run_plugin_method("on_remove_estimators")
        return self

    def get_estimator(
        self,
        estimator_name: str,
        include_preprocessor: bool = True,
        retrain: bool = False,
    ) -> Union[Pipeline, ClassifierMixin, RegressorMixin]:
        """Obtain an estimator in :attr:`pipelines` by name. This is useful for extracting default
        estimators or hyperparmeter-optimized estimators (after using :meth:`tune_estimator`).

        Parameters
        ----------
        estimator_name :
            Estimator name.
        include_preprocessor :
            Whether to return a pipeline with a preprocessor or just the estimator. Default True.
        retrain :
            Whether to retrain with full data. Default False.

        Returns
        -------
        ClassifierMixin
            Estimator.
        """
        model = self.pipelines[estimator_name]
        if not include_preprocessor:
            model = model._final_estimator
        model = clone(model)
        if retrain:
            model.fit(self.X, self.y)
        self._run_plugin_method(
            "on_get_estimator", estimator=model, name=estimator_name
        )
        return model

    def build_ensemble(
        self,
        method: str = "stacking",
        estimator_names: Optional[Sequence[str]] = None,
        top_n: Optional[int] = 3,
        sort_by: Optional[str] = None,
        ensemble_name: Optional[str] = None,
        **kwargs,
    ) -> PoniardBaseEstimator:
        """Combine estimators into an ensemble.

        By default, orders estimators according to the first metric.

        Parameters
        ----------
        method :
            Ensemble method. Either "stacking" or "voring". Default "stacking".
        estimator_names :
            Names of estimators to include. Default None, which uses `top_n`
        top_n :
            How many of the best estimators to include.
        sort_by :
            Which metric to consider for ordering results. Default None, which uses the first metric.
        ensemble_name :
            Ensemble name when adding to :attr:`pipelines`. Default None.

        Returns
        -------
        PoniardBaseEstimator
            Self.

        Raises
        ------
        ValueError
            If `method` is not "stacking" or "voting".
        """
        if method not in ["voting", "stacking"]:
            raise ValueError("Method must be either voting or stacking.")
        estimator_names = element_to_list_maybe(estimator_names)
        if estimator_names:
            models = [
                (name, self.pipelines[name]._final_estimator)
                for name in estimator_names
            ]
        else:
            if sort_by:
                sorter = sort_by
            else:
                sorter = self._means.columns[0]
            models = [
                (name, self.pipelines[name]._final_estimator)
                for name in self._means.sort_values(sorter, ascending=False).index[
                    :top_n
                ]
            ]
        if method == "voting":
            if self.poniard_task == "classification":
                ensemble = VotingClassifier(
                    estimators=models, verbose=self.verbose, **kwargs
                )
            else:
                ensemble = VotingRegressor(
                    estimators=models, verbose=self.verbose, **kwargs
                )
        else:
            if self.poniard_task == "classification":
                ensemble = StackingClassifier(
                    estimators=models, verbose=self.verbose, cv=self.cv, **kwargs
                )
            else:
                ensemble = StackingRegressor(
                    estimators=models, verbose=self.verbose, cv=self.cv, **kwargs
                )
        ensemble_name = ensemble_name or ensemble.__class__.__name__
        self.add_estimators(estimators={ensemble_name: ensemble})
        return self

    def get_predictions_similarity(
        self,
        on_errors: bool = True,
    ) -> pd.DataFrame:
        """Compute correlation/association between cross validated predictions for each estimator.

        This can be useful for ensembling.

        Parameters
        ----------
        on_errors :
            Whether to compute similarity on prediction errors instead of predictions. Default
            True.

        Returns
        -------
        pd.DataFrame
            Similarity.
        """
        if self.y.ndim > 1:
            raise ValueError("y must be a 1-dimensional array.")
        raw_results = self.predict()
        results = raw_results.copy()
        for name, result in raw_results.items():
            if on_errors:
                if self.poniard_task == "regression":
                    results[name] = self.y - result
                else:
                    results[name] = np.where(result == self.y, 1, 0)
        results = pd.DataFrame(results)
        if self.poniard_task == "classification":
            estimator_names = [x for x in results.columns if x != "DummyClassifier"]
            table = pd.DataFrame(
                data=np.nan, index=estimator_names, columns=estimator_names
            )
            for row, col in itertools.combinations_with_replacement(
                table.index[::-1], 2
            ):
                cramer = cramers_v(results[row], results[col])
                if row == col:
                    table.loc[row, col] = 1
                else:
                    table.loc[row, col] = cramer
                    table.loc[col, row] = cramer
        else:
            table = results.drop("DummyRegressor", axis=1).corr()
        return table

    def tune_estimator(
        self,
        estimator_name: str,
        grid: Optional[Dict] = None,
        mode: str = "grid",
        tuned_estimator_name: Optional[str] = None,
        **kwargs,
    ) -> Union[GridSearchCV, RandomizedSearchCV]:
        """Hyperparameter tuning for a single estimator.

        Parameters
        ----------
        estimator_name :
            Estimator to tune.
        grid :
            Hyperparameter grid. Default None, which uses the grids available for default
            estimators.
        mode :
            Type of search. Eithe "grid", "halving" or "random". Default "grid".
        tuned_estimator_name :
            Estimator name when adding to :attr:`pipelines`. Default None.
        kwargs :
            Passed to the search instance.

        Returns
        -------
        PoniardBaseEstimator
            Self.

        Raises
        ------
        KeyError
            If no grid is defined and the estimator is not a default one.
        """
        X, y = self.X, self.y
        estimator = clone(self.pipelines[estimator_name])
        if not grid:
            try:
                grid = GRID[estimator_name]
                grid = {f"{estimator_name}__{k}": v for k, v in grid.items()}
            except KeyError:
                raise NotImplementedError(
                    f"Estimator {estimator_name} has no predefined hyperparameter grid, so it has to be supplied."
                )
        self._pass_instance_attrs(estimator)

        scoring = self._first_scorer(sklearn_scorer=True)
        if mode == "random":
            search = RandomizedSearchCV(
                estimator,
                grid,
                scoring=scoring,
                cv=self.cv,
                verbose=self.verbose,
                n_jobs=self.n_jobs,
                random_state=self.random_state,
                **kwargs,
            )
        elif mode == "halving":
            from sklearn.experimental import enable_halving_search_cv
            from sklearn.model_selection import HalvingGridSearchCV

            search = HalvingGridSearchCV(
                estimator,
                grid,
                scoring=scoring,
                cv=self.cv,
                verbose=self.verbose,
                n_jobs=self.n_jobs,
                random_state=self.random_state,
                **kwargs,
            )
        else:
            search = GridSearchCV(
                estimator,
                grid,
                scoring=scoring,
                cv=self.cv,
                verbose=self.verbose,
                n_jobs=self.n_jobs,
                **kwargs,
            )
        search.fit(X, y)
        tuned_estimator_name = tuned_estimator_name or f"{estimator_name}_tuned"
        self.add_estimators(
            estimators={
                tuned_estimator_name: clone(search.best_estimator_._final_estimator)
            }
        )
        return self

    def _process_results(self) -> None:
        """Compute mean and standard deviations of  experiment results."""
        # TODO: This processes every result, even those that were processed
        # in previous runs (before add_estimators). Should be made more efficient
        results = pd.DataFrame(self._experiment_results).T
        results = results.loc[
            :,
            [
                x
                for x in results.columns
                if x not in ["predict", "predict_proba", "decision_function"]
            ],
        ]
        means = results.apply(lambda x: np.mean(x.values.tolist(), axis=1))
        stds = results.apply(lambda x: np.std(x.values.tolist(), axis=1))
        means = means[list(means.columns[2:]) + ["fit_time", "score_time"]]
        stds = stds[list(stds.columns[2:]) + ["fit_time", "score_time"]]
        self._means = means.sort_values(means.columns[0], ascending=False)
        self._stds = stds.reindex(self._means.index)
        return

    def _process_long_results(self) -> None:
        """Prepare experiment results for plotting."""
        base = pd.DataFrame(self._experiment_results).T
        melted = (
            base.rename_axis("Model")
            .reset_index()
            .melt(id_vars="Model", var_name="Metric", value_name="Score")
            .explode("Score")
        )
        melted["Type"] = "Fold"
        means = melted.groupby(["Model", "Metric"])["Score"].mean().reset_index()
        means["Type"] = "Mean"
        melted = pd.concat([melted, means])
        melted["Model"] = melted["Model"].str.replace(
            "Classifier|Regressor", "", regex=True
        )

        self._long_results = melted
        return

    def _first_scorer(self, sklearn_scorer: bool) -> Union[str, Callable]:
        """Helper method to get the first scoring function or name."""
        if isinstance(self.metrics, Sequence):
            return self.metrics[0]
        elif isinstance(self.metrics, dict):
            if sklearn_scorer:
                return list(self.metrics.values())[0]
            else:
                return list(self.metrics.keys())[0]
        else:
            raise ValueError(
                "self.metrics can only be a sequence of str or dict of str: callable."
            )

    def _train_test_split_from_cv(self):
        """Split data in a 80/20 fashion following the cross-validation strategy defined in the constructor."""
        if isinstance(self.cv, (int, Iterable)):
            cv_params_for_split = {}
        else:
            cv_params_for_split = {
                k: v
                for k, v in vars(self.cv).items()
                if k in ["shuffle", "random_state"]
            }
            stratify = self.y if "Stratified" in self.cv.__class__.__name__ else None
            cv_params_for_split.update({"stratify": stratify})
        return train_test_split(self.X, self.y, test_size=0.2, **cv_params_for_split)

    def _pass_instance_attrs(self, obj: Union[ClassifierMixin, RegressorMixin]):
        """Helper method to propagate instance attributes to objects."""
        for attr, value in zip(
            ["random_state", "verbose", "verbosity"],
            [self.random_state, self.verbose, self.verbose],
        ):
            if hasattr(obj, attr):
                setattr(obj, attr, value)
        return

    def _run_plugin_method(self, method: str, **kwargs):
        """Helper method to run plugin methods by name."""
        if not self.plugins:
            return
        for plugin in self.plugins:
            fetched_method = getattr(plugin, method, None)
            if callable(fetched_method):
                accepted_kwargs = inspect.getargs(fetched_method.__code__).args
                matched_kwargs = {
                    k: v for k, v in kwargs.items() if k in accepted_kwargs
                }
                fetched_method(**matched_kwargs)
        return

    def __repr__(self):
        return f"""{self.__class__.__name__}(estimators={self.estimators}, metrics={self.metrics},
    preprocess={self.preprocess}, scaler={self.scaler}, numeric_imputer={self.numeric_imputer},
    custom_preprocessor={self.custom_preprocessor}, numeric_threshold={self.numeric_threshold},
    cardinality_threshold={self.cardinality_threshold}, cv={self.cv}, verbose={self.verbose},
    random_state={self.random_state}, n_jobs={self.n_jobs}, plugins={self.plugins},
    plot_options={str(self.plot_options)})
            """

    def __add__(
        self,
        estimators: Union[
            Dict[str, Union[ClassifierMixin, RegressorMixin]],
            Sequence[Union[ClassifierMixin, RegressorMixin]],
        ],
    ) -> PoniardBaseEstimator:
        """Add estimators to a Poniard Estimator.

        Parameters
        ----------
        estimators :
            List or dict of estimators to add.

        Returns
        -------
        PoniardBaseEstimator
            Self.
        """
        estimators = element_to_list_maybe(estimators)
        return self.add_estimators(estimators)

    def __sub__(self, estimator_names: Sequence[str]) -> PoniardBaseEstimator:
        """Remove an estimator and its results.

        Parameters
        ----------
        estimator :
            List of estimators names.

        Returns
        -------
        PoniardBaseEstimator
            Self.
        """
        estimator_names = element_to_list_maybe(estimator_names)
        return self.remove_estimators(estimator_names, drop_results=True)

    def __getitem__(
        self, estimator_name: str
    ) -> Union[Pipeline, ClassifierMixin, RegressorMixin]:
        """Get an estimator by indexing with its name

        Parameters
        ----------
        estimator_name :
            Estimator name as string.

        Returns
        -------
        Union[Pipeline, ClassifierMixin, RegressorMixin]
            Built estimator.
        """
        return self.get_estimator(estimator_name)

## `estimators`, `metrics` and `cv`

Poniard estimators' main parameters can be grouped in the following way:

1. Estimators.
2. Preprocessing parameters.
    * Imputers
    * Numeric scaler
    * Categorical encoder
    * Custom preprocessor
3. Metrics.
4. Cross validation strategy.
5. Rest.

These give a good amount of flexibility while providing sane defaults, so that after initialization only `setup` and `fit` have to be called in order to train multiple models.

`estimators` takes a scikit-learn-compatible estimator, array of estimators or dict of *name: estimators*.

In [None]:
from poniard import PoniardRegressor
from sklearn.linear_model import LinearRegression, Ridge

In [None]:
estimators = [LinearRegression(), Ridge()]
PoniardRegressor(estimators)

PoniardRegressor(estimators=[LinearRegression(), Ridge()], metrics=None,
    preprocess=True, scaler=standard, numeric_imputer=simple,
    custom_preprocessor=None, numeric_threshold=0.1,
    cardinality_threshold=20, cv=None, verbose=0,
    random_state=0, n_jobs=None, plugins=None,
    plot_options=PoniardPlotFactory())
            

## `setup`

`setup` takes features and target as parameters, while `fit` does not accept any. This runs contrary to the established convention defined by scikit-learn where there is no setting up to do and `fit` takes the data as params.

This is because Poniard does not only fit the models, but also infer features types and create the `preprocesor` based on these types. While this could all be stuffed inside `fit` (that was the case initially), having it separated allows the user to check whether Poniard's assumptions are correct and adjust if needed before running `fit`, which can take long depending on how many models were passed to `estimators`, the cross validation strategy and the size of the dataset.

In [None]:
show_doc(PoniardBaseEstimator.setup)

---

### PoniardBaseEstimator.setup

>      PoniardBaseEstimator.setup
>                                  (X:Union[pandas.core.frame.DataFrame,numpy.nd
>                                  array,List], y:Union[pandas.core.frame.DataFr
>                                  ame,numpy.ndarray,List])

Orchestrator.

Converts inputs to arrays if necessary, sets `metrics`,
`preprocessor`, `cv` and `pipelines`.

After running `setup`, both `X` and `y` will be held as attributes.

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| X | Union[pd.DataFrame, np.ndarray, List] | Features. |
| y | Union[pd.DataFrame, np.ndarray, List] | Target |
| **Returns** | **PoniardBaseEstimator** |  |

### An example

Let's load some random data and setup a `PoniardClassifier`, which inherits from `PoniardBaseEstimator`.

In [None]:
#| hide

import random

import numpy as np
import pandas as pd

In [None]:
from poniard import PoniardClassifier

In [None]:
random.seed(0)
rng = np.random.default_rng(0)

data = pd.DataFrame({"type": random.choices(["house", "apartment"], k=500),
                     "age": rng.uniform(1, 200, 500).astype(int),
                     "date": pd.date_range("2022-01-01", freq="M", periods=500),
                     "rating": random.choices(range(50), k=500),
                     "target": random.choices([0, 1], k=500)})
data.head()

Unnamed: 0,type,age,date,rating,target
0,apartment,127,2022-01-31,1,1
1,apartment,54,2022-02-28,17,1
2,house,9,2022-03-31,0,1
3,house,4,2022-04-30,48,1
4,apartment,162,2022-05-31,40,0


`setup` will conveniently output information about the data so it can be reviewed.

In [None]:
X, y = data.drop("target", axis=1), data["target"]
pnd = PoniardClassifier()
pnd.setup(X, y)

Target info
-----------
Type: binary
Shape: (500,)
Unique values: 2

Main metric
-----------
roc_auc

Thresholds
----------
Minimum unique values to consider a feature numeric: 50
Minimum unique values to consider a categorical high cardinality: 20

Inferred feature types
----------------------


Unnamed: 0,numeric,categorical_high,categorical_low,datetime
0,age,rating,type,date






PoniardClassifier(estimators=None, metrics=['roc_auc', 'accuracy', 'precision', 'recall', 'f1'],
    preprocess=True, scaler=standard, numeric_imputer=simple,
    custom_preprocessor=None, numeric_threshold=50,
    cardinality_threshold=20, cv=StratifiedKFold(n_splits=5, random_state=0, shuffle=True), verbose=0,
    random_state=0, n_jobs=None, plugins=None,
    plot_options=PoniardPlotFactory())
            

### Attributes available after `setup`

After passing data to Poniard estimators through `setup`, multiple attributes become available.

`inferred_types` is a `DataFrame` that sorts features in 4 categories (numeric, categorical_high, categorical_low and datetime) using some basic heuristics.

These depend on the feature `dtypes`, and `numeric_threshold` and `cardinality_threshold` which are set during `PoniardBaseEstimator`'s construction.

In [None]:
pnd.inferred_types

Unnamed: 0,numeric,categorical_high,categorical_low,datetime
0,age,rating,type,date


The `preprocessor` in turn depends on `inferred_types`, and the `scaler`, `numeric_imputer` and `high_cardinality_encoder` parameters passed to the Poniard estimator init.

As will be seen further on, this preprocessor can be modified significantly to fit multiple use cases and datasets.

In [None]:
pnd.preprocessor

Each estimator has a set of default `metrics`, but others can be passed during construction.

In [None]:
pnd.metrics

['roc_auc', 'accuracy', 'precision', 'recall', 'f1']

Likewise, `cv` has sane defaults but can be modified accordingly.

In [None]:
pnd.cv

StratifiedKFold(n_splits=5, random_state=0, shuffle=True)

`target_info` lists information about `y`.

In [None]:
pnd.target_info

{'type_': 'binary', 'ndim': 1, 'shape': (500,), 'nunique': 2}

`pipelines` is a dict containing each pipeline which will be trained during `fit`. Each Poniard estimator has a limited set of default estimators.

In [None]:
pnd.pipelines["SVC"]

## `fit` and `get_results`


Because features and target are passed to the Poniard estimator, `fit` does not take any parameters. Its main purpose is to run sklearn's `cross_validate` function on each `pipeline`, scoring each `metrics` with the `cv` strategy, and store the results.

In [None]:
pnd.fit()

Completed: 100%|██████████| 9/9 [00:15<00:00,  1.69s/it]                     


PoniardClassifier(estimators=None, metrics=['roc_auc', 'accuracy', 'precision', 'recall', 'f1'],
    preprocess=True, scaler=standard, numeric_imputer=simple,
    custom_preprocessor=None, numeric_threshold=50,
    cardinality_threshold=20, cv=StratifiedKFold(n_splits=5, random_state=0, shuffle=True), verbose=0,
    random_state=0, n_jobs=None, plugins=None,
    plot_options=PoniardPlotFactory())
            

After fitting `pipelines`, cross validated results can be accessed by running `get_results`

In [None]:
show_doc(PoniardBaseEstimator.get_results)

---

### PoniardBaseEstimator.get_results

>      PoniardBaseEstimator.get_results (return_train_scores:bool=False,
>                                        std:bool=False, wrt_dummy:bool=False)

Return dataframe containing scoring results. By default returns the mean score and fit
and score times. Optionally returns standard deviations as well.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| return_train_scores | bool | False | If False, only return test scores. |
| std | bool | False | Whether to return standard deviation of the scores. Default False. |
| wrt_dummy | bool | False | Whether to compute each score/time with respect to the dummy estimator results. Default<br>False. |
| **Returns** | **Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]** |  | **Results** |

In [None]:
pnd.get_results()

Unnamed: 0,test_roc_auc,test_accuracy,test_precision,test_recall,test_f1,fit_time,score_time
DecisionTreeClassifier,0.510256,0.51,0.531145,0.503846,0.516707,0.016876,0.013337
DummyClassifier,0.5,0.52,0.52,1.0,0.684211,0.014292,0.010856
KNeighborsClassifier,0.496675,0.492,0.50915,0.534615,0.519465,0.016555,0.016932
SVC,0.472356,0.476,0.499007,0.688462,0.575907,0.82123,0.013713
LogisticRegression,0.46899,0.488,0.509234,0.573077,0.536862,0.039569,0.037738
XGBClassifier,0.460417,0.486,0.502401,0.5,0.49933,0.071057,0.015129
HistGradientBoostingClassifier,0.456571,0.488,0.505975,0.484615,0.494283,1.556914,0.042196
RandomForestClassifier,0.435056,0.462,0.479861,0.476923,0.477449,0.10718,0.020063
GaussianNB,0.423317,0.468,0.492473,0.565385,0.525371,0.015108,0.010184


In [None]:
means, stds = pnd.get_results(std=True, return_train_scores=True)
stds

Unnamed: 0,test_roc_auc,train_roc_auc,test_accuracy,train_accuracy,test_precision,train_precision,test_recall,train_recall,test_f1,train_f1,fit_time,score_time
DecisionTreeClassifier,0.060706,0.0,0.060332,0.0,0.059942,0.0,0.058835,0.0,0.057785,0.0,0.001712,0.001982
DummyClassifier,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.004226,0.001764
KNeighborsClassifier,0.021105,0.008429609,0.019391,0.01084,0.01914,0.008157,0.081043,0.022053,0.04976,0.012869,0.002909,0.007211
SVC,0.038609,0.0360072,0.042708,0.032496,0.031965,0.028405,0.085485,0.07314,0.036968,0.026864,0.11012,0.004044
LogisticRegression,0.068079,0.02545484,0.041183,0.027946,0.037992,0.024759,0.065948,0.021371,0.036585,0.022583,0.009066,0.044312
XGBClassifier,0.065278,0.0,0.035553,0.0,0.033315,0.0,0.091826,0.0,0.061108,0.0,0.00801,0.003623
HistGradientBoostingClassifier,0.059681,0.0007749323,0.041183,0.007483,0.039938,0.011912,0.070291,0.005607,0.054859,0.007046,0.600654,0.006484
RandomForestClassifier,0.060809,7.021667e-17,0.039192,0.0,0.038392,0.0,0.077307,0.0,0.056132,0.0,0.025762,0.002754
GaussianNB,0.045845,0.02494438,0.042143,0.018303,0.03733,0.01583,0.031246,0.038051,0.025456,0.018727,0.004558,0.001748


In [None]:
show_doc(PoniardBaseEstimator.reassign_types)

---

### PoniardBaseEstimator.reassign_types

>      PoniardBaseEstimator.reassign_types
>                                           (numeric:Union[List[Union[str,int]],
>                                           NoneType]=None, categorical_high:Uni
>                                           on[List[Union[str,int]],NoneType]=No
>                                           ne, categorical_low:Union[List[Union
>                                           [str,int]],NoneType]=None, datetime:
>                                           Union[List[Union[str,int]],NoneType]
>                                           =None)

Reassign feature types.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| numeric | Optional[List[Union[str, int]]] | None | List of column names or indices. Default None. |
| categorical_high | Optional[List[Union[str, int]]] | None | List of column names or indices. Default None. |
| categorical_low | Optional[List[Union[str, int]]] | None | List of column names or indices. Default None. |
| datetime | Optional[List[Union[str, int]]] | None | List of column names or indices. Default None. |
| **Returns** | **PoniardBaseEstimator** |  | **self.** |

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()