In [2]:
# ===== ALL NECESSARY IMPORTS AND CLASSES =====
import pandas as pd
import numpy as np
import os
import math
import pickle
import sklearn
from typing import Union, Literal, TypeAlias
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report

# ===== CleanFeatureSelector Class =====
class CleanFeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, missing_thresh=0.3, variance_thresh=1e-6):
        self.missing_thresh = missing_thresh
        self.variance_thresh = variance_thresh
        self.keep_features_ = []

    def fit(self, X, y=None):
        X = pd.DataFrame(X, columns=X.columns if hasattr(X, "columns") else None)
        # Drop by missing %
        keep_missing = X.isna().mean() < self.missing_thresh
        X2 = X.loc[:, keep_missing]
        # Drop by variance
        var = X2.var()
        keep_var = var > self.variance_thresh
        self.keep_features_ = X2.columns[keep_var].tolist()
        return self

    def transform(self, X):
        X = pd.DataFrame(X, columns=X.columns if hasattr(X, "columns") else None)
        return X[self.keep_features_]

# ===== train_test_split_patients Function =====
def train_test_split_patients(dataframe: pd.DataFrame, identifier: str, endpoint: str, test_ratio: float = 0.3):
    unique_patients = dataframe[identifier].unique()
    test_patients = unique_patients[math.floor(1-test_ratio * len(unique_patients)):]
    train_patients = unique_patients[: math.floor(1-test_ratio * len(unique_patients))]
    train_mask = dataframe[identifier].isin(train_patients)
    test_mask = dataframe[identifier].isin(test_patients)
    x_train = dataframe[train_mask].drop(columns=[endpoint, identifier], axis=1)
    x_test = dataframe[test_mask].drop(columns=[endpoint, identifier], axis=1)
    y_train = dataframe[train_mask][endpoint]
    y_test = dataframe[test_mask][endpoint]
    return x_train, x_test, y_train, y_test

# ===== preprocess_data_w_pipeline Function =====
Processed: TypeAlias = Union[tuple[pd.DataFrame, pd.DataFrame], tuple[np.ndarray, np.ndarray]]
def preprocess_data_w_pipeline(input_data: tuple[pd.DataFrame, pd.DataFrame], 
                               preprocess_pipe: sklearn.pipeline.Pipeline,
                               keep_cols=True,
                               output: Union[Literal["dataframe"], Literal["ndarray"]]="dataframe") -> Processed:
    if not isinstance(preprocess_pipe, sklearn.pipeline.Pipeline):
        raise TypeError("preprocess_pipe argument must be instance of sklearn.pipeline.Pipeline!")
    if output not in ["dataframe", "ndarray"]:
        raise ValueError("output must either be 'dataframe' or 'ndarray'!")
    if not isinstance(input_data, tuple) or len(input_data) != 2:
        raise TypeError("input_data must be a tuple of (x_train, x_test) DataFrames.")
    
    x_tr, x_ts = input_data
    original_x_tr, original_x_ts = x_tr, x_ts
    
    try:
        x_tr_index = x_tr.index
        x_ts_index = x_ts.index
        x_cols = x_tr.columns
        x_tr = preprocess_pipe.fit_transform(x_tr)
        x_ts = preprocess_pipe.transform(x_ts)
        # rebuild DataFrames with only the kept columns
        kept_cols = preprocess_pipe.named_steps['screening'].keep_features_
        x_tr = pd.DataFrame(x_tr, columns=kept_cols, index=x_tr_index)
        x_ts = pd.DataFrame(x_ts, columns=kept_cols, index=x_ts_index)
        if output == "ndarray":
            return x_tr.to_numpy(), x_ts.to_numpy()
        return x_tr, x_ts
    except Exception as e:
        print(e)
        return original_x_tr, original_x_ts

# ===== BaseEnsembleClassifier Class =====
class BaseEnsembleClassifier:
    def __init__(self, base_estimators: list, final_estimator, pickle_file_path: str):
        self.base_estimators = base_estimators
        self.base_estimator_info = {}
        self.final_estimator = final_estimator
        self.classification_report = {}
        self.final_estimator_dataset = pd.DataFrame([])
        self.final_est_params = pd.DataFrame({})
        self.pickle_file_path = pickle_file_path
        # Automatically load pickle file if it exists
        self.load_pickle_file()
        
    def load_pickle_file(self) -> bool:
        """Load weights/parameters from a pickle file and update the associated base estimators."""
        if os.path.exists(self.pickle_file_path):
            try:
                with open(self.pickle_file_path, 'rb') as f:
                    weights_dict = pickle.load(f)
                # Update weights of associated models by indexing the pickle dictionary
                for idx, estimator in enumerate(self.base_estimators):
                    if idx in weights_dict:
                        self._update_estimator_weights(estimator, weights_dict[idx])
                    elif hasattr(estimator, '__class__'):
                        estimator_name = estimator.__class__.__name__
                        if estimator_name in weights_dict:
                            self._update_estimator_weights(estimator, weights_dict[estimator_name])
                        elif str(idx) in weights_dict:
                            self._update_estimator_weights(estimator, weights_dict[str(idx)])
                self.base_estimator_info = weights_dict
                print(f"Successfully loaded weights from {self.pickle_file_path}")
                return True
            except Exception as e:
                print(f"Error loading pickle file {self.pickle_file_path}: {e}")
                return False
        else:
            print(f"Pickle file not found: {self.pickle_file_path}")
            return False
    
    def _update_estimator_weights(self, estimator, weights):
        """Update the weights/parameters of an estimator."""
        try:
            if isinstance(weights, dict):
                if hasattr(estimator, 'set_params'):
                    estimator.set_params(**weights)
                elif hasattr(estimator, 'coef_'):
                    if 'coef_' in weights:
                        estimator.coef_ = weights['coef_']
                elif hasattr(estimator, 'feature_importances_'):
                    if 'feature_importances_' in weights:
                        estimator.feature_importances_ = weights['feature_importances_']
            elif isinstance(weights, np.ndarray):
                if hasattr(estimator, 'coef_'):
                    estimator.coef_ = weights
                elif hasattr(estimator, 'feature_importances_'):
                    estimator.feature_importances_ = weights
        except Exception as e:
            print(f"Warning: Could not update weights for {type(estimator).__name__}: {e}")
    
    def generate_classification_report(self, y_test, y_pred):
        """Generates, prints, and stores the classification report."""
        self.classification_report = classification_report(y_test, y_pred, output_dict=True)
        print("\n=== Classification Report ===")
        print(classification_report(y_test, y_pred))
        return pd.DataFrame(self.classification_report).transpose()

    def _construct_meta_estimator_dataset(self, X):
        """Construct meta-features dataset from base estimator predictions."""
        if isinstance(X, pd.DataFrame):
            X = X.values
        meta_features_list = []
        for estimator in self.base_estimators:
            if hasattr(estimator, 'predict_proba'):
                try:
                    proba = estimator.predict_proba(X)
                    if proba.shape[1] == 2:
                        meta_features_list.append(proba[:, 1])
                    else:
                        meta_features_list.append(proba.flatten())
                except:
                    meta_features_list.append(estimator.predict(X))
            else:
                meta_features_list.append(estimator.predict(X))
        meta_features = np.column_stack(meta_features_list)
        return meta_features
    
    def fit(self, X, y, sample_weight=None):
        """Fit the ensemble model following sklearn conventions."""
        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, (pd.Series, pd.DataFrame)):
            y = y.values
        # Step 1: Fit all base estimators
        for estimator in self.base_estimators:
            if sample_weight is not None and hasattr(estimator, 'fit'):
                try:
                    estimator.fit(X, y, sample_weight=sample_weight)
                except TypeError:
                    estimator.fit(X, y)
            else:
                estimator.fit(X, y)
        # Step 2: Construct meta-features dataset
        self.final_estimator_dataset = self._construct_meta_estimator_dataset(X)
        # Step 3: Fit the final estimator
        if sample_weight is not None and hasattr(self.final_estimator, 'fit'):
            try:
                self.final_estimator.fit(self.final_estimator_dataset, y, sample_weight=sample_weight)
            except TypeError:
                self.final_estimator.fit(self.final_estimator_dataset, y)
        else:
            self.final_estimator.fit(self.final_estimator_dataset, y)
        return self
    
    def predict(self, X):
        """Predict class labels for samples in X."""
        meta_features = self._construct_meta_estimator_dataset(X)
        y_pred = self.final_estimator.predict(meta_features)
        return y_pred
    
    def predict_proba(self, X):
        """Predict class probabilities for samples in X."""
        if not hasattr(self.final_estimator, 'predict_proba'):
            raise AttributeError(f"Final estimator {type(self.final_estimator).__name__} does not support predict_proba")
        meta_features = self._construct_meta_estimator_dataset(X)
        probabilities = self.final_estimator.predict_proba(meta_features)
        return probabilities
    
    def decision_function(self, X):
        """Compute decision function of samples in X."""
        if not hasattr(self.final_estimator, 'decision_function'):
            raise AttributeError(f"Final estimator {type(self.final_estimator).__name__} does not support decision_function")
        meta_features = self._construct_meta_estimator_dataset(X)
        decision = self.final_estimator.decision_function(meta_features)
        return decision

# ===== StackingClassifier Class =====
class StackingClassifier(BaseEnsembleClassifier):
    def __init__(self, base_estimators, final_estimator, pickle_file_path: str = ""):
        # If no pickle file path provided, use a dummy path that won't exist
        if not pickle_file_path:
            pickle_file_path = os.path.join(os.getcwd(), "__dummy_pickle_path__.pkl")
        super().__init__(base_estimators, final_estimator, pickle_file_path)
        
    def fit_base_estimators(self, x_train: Union[pd.DataFrame, np.ndarray], y_train: Union[pd.DataFrame, np.ndarray]) -> None:
        """Fit all base models."""
        for estimator in self.base_estimators:
            estimator.fit(x_train, y_train)

    def fit_final_estimator(self, x_train: Union[pd.DataFrame, np.ndarray], y_train: Union[pd.DataFrame, np.ndarray]):
        """Fit the final estimator."""
        self.final_estimator.fit(x_train, y_train)
        return self

# ===== Import test cases (if available) =====


In [None]:
model_pool = [
    {
        "estimator": LogisticRegression,
        "params": {
            'C': [0.1, 1, 10, 100],
            'penalty': ['l1', 'l2', 'elasticnet', None],
            'solver': ['lbfgs', 'liblinear', 'newton-cg', 'sag', 'saga'],
            'max_iter': [100, 200, 500],
            'class_weight': [None, 'balanced']
        }
    },
    {
        "estimator": SVC,
        "params": {
            'C': [0.1, 1, 10, 100],
            'kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
            'gamma': ['scale', 'auto'],
            'class_weight': [None, 'balanced'],
            'probability': [True]  # Required for stacking
        }
    },
    {
        "estimator": RandomForestClassifier,
        "params": {
            'n_estimators': [100, 200, 500],
            'max_depth': [None, 10, 20, 30],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'class_weight': [None, 'balanced', 'balanced_subsample']
        }
    },
    {
        "estimator": KNeighborsClassifier,
        "params": {
            'n_neighbors': [3, 5, 7, 10],
            'weights': ['uniform', 'distance'],
            'p': [1, 2]  # 1: manhattan, 2: euclidean
        }
    },
    {
        "estimator": XGBClassifier,
        "params": {
            'n_estimators': [100, 200, 500],
            'max_depth': [3, 6, 9],
            'learning_rate': [0.01, 0.1, 0.3],
            'subsample': [0.8, 1.0],
            'colsample_bytree': [0.8, 1.0],
            'min_child_weight': [1, 3, 5]
        }
    }
]

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier


# model_pool = [LogisticRegression, SVC, RandomForestClassifier, KNeighborsClassifier, XGBClassifier]


model_pool = [
    {
        "estimator" : LogisticRegression, 
        "params" : 
        
    }
]
for model in model_pool:
