In [None]:
import json
import numpy as np
import time
import importlib
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import VotingClassifier

class MiniAutoML:
    def __init__(self, models_config):
        self.models_config = models_config
        self.best_model_pipeline = None
        self.best_model_name = None
        self.voting_model_pipeline = None
        self.model_times = []  
        self.tree_based_models = (
            "RandomForestClassifier", "XGBClassifier", 
            "CatBoostClassifier", "LGBMClassifier", 
            "GradientBoostingClassifier"
        )
        
    def load_models_from_json(self, json_path: str):
        seed = 123

        with open(json_path, "r", encoding="utf-8") as f:
            models_config = json.load(f)

        loaded_models = []

        for cfg in models_config:
            name = cfg["name"]
            class_path = cfg["class"]
            params = cfg.get("params", {}).copy()

            module_name, class_name = class_path.rsplit(".", 1)
            module = importlib.import_module(module_name)
            model_class = getattr(module, class_name)

            try:
                if "random_state" in model_class().get_params():
                    params.setdefault("random_state", seed)
            except Exception:
                pass

            if "XGB" in class_name:
                params.setdefault("random_state", seed)
                params.setdefault("seed", seed)
                params.setdefault("verbosity", 0)

            if "LGBM" in class_name:
                params.setdefault("random_state", seed)
                params.setdefault("seed", seed)
                params.setdefault("verbosity", -1)
                params.setdefault("verbose", -1)

            if "CatBoost" in class_name:
                params.setdefault("random_seed", seed)
                params.setdefault("allow_writing_files", False)
                params.setdefault("verbose", 0)

            model = model_class(**params)

            loaded_models.append({
                "name": name,
                "class": model_class,
                "params": params,
                "model": model
            })

        return loaded_models

    def _get_preprocessor(self, X, model_class_name):
        numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
        categorical_features = X.select_dtypes(include=['object', 'category', 'bool']).columns.tolist()

        numeric_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ])

        if any(tree_name in model_class_name for tree_name in self.tree_based_models):
            categorical_transformer = Pipeline(steps=[
                ('imputer', SimpleImputer(strategy='most_frequent')),
                ('encoder', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1))
            ])
        else:
            categorical_transformer = Pipeline(steps=[
                ('imputer', SimpleImputer(strategy='most_frequent')),
                ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
            ])

        return ColumnTransformer(transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])

    def fit(self, X_train, y_train):
        self.model_times = []
        skf = StratifiedKFold(
            n_splits=5,
            shuffle=True,
            random_state=123
        )

        all_results = []
        best_single_score = -np.inf
        best_single_pipeline = None
        best_single_name = None

        for cfg in self.models_config:
            name = cfg["name"]
            model = cfg["model"]
            class_name = str(cfg["class"])

            preprocessor = self._get_preprocessor(X_train, class_name)

            pipeline = Pipeline(steps=[
                ("preprocessor", preprocessor),
                ("model", model)
            ])

            start = time.perf_counter()

            scores = cross_val_score(
                pipeline,
                X_train,
                y_train,
                cv=skf,
                scoring="balanced_accuracy",
                n_jobs=-1
            )

            elapsed = time.perf_counter() - start
            mean_score = scores.mean()

            print(
                f"{name:25s} | "
                f"CV bal_acc={mean_score:.4f} | "
                f"time={elapsed:.2f}s"
            )

            self.model_times.append({
                "model": name,
                "cv_score": mean_score,
                "cv_time_sec": elapsed
            })

            all_results.append({
                "name": name,
                "pipeline": pipeline,
                "score": mean_score
            })

            if mean_score > best_single_score:
                best_single_score = mean_score
                best_single_pipeline = pipeline
                best_single_name = name

        top_models = sorted(
            all_results,
            key=lambda x: x["score"],
            reverse=True
        )[:5]

        voting_estimators = [
            (m["name"], m["pipeline"]) for m in top_models
        ]

        voting_pipeline = VotingClassifier(
            estimators=voting_estimators,
            voting="soft",
            n_jobs=-1
        )

        start = time.perf_counter()

        voting_score = cross_val_score(
            voting_pipeline,
            X_train,
            y_train,
            cv=skf,
            scoring="balanced_accuracy",
            n_jobs=-1
        ).mean()

        voting_time = time.perf_counter() - start

        print(
            f"{'VOTING':25s} | "
            f"CV bal_acc={voting_score:.4f} | "
            f"time={voting_time:.2f}s"
        )

        self.model_times.append({
            "model": "VotingClassifier",
            "cv_score": voting_score,
            "cv_time_sec": voting_time
        })

        if voting_score > best_single_score:
            self.best_model_pipeline = voting_pipeline
            self.best_model_name = "VotingClassifier"
            self.best_cv_score = voting_score
        else:
            self.best_model_pipeline = best_single_pipeline
            self.best_model_name = best_single_name
            self.best_cv_score = best_single_score

        start = time.perf_counter()
        self.best_model_pipeline.fit(X_train, y_train)
        final_train_time = time.perf_counter() - start

        print(
            f"\nFINAL MODEL: {self.best_model_name} "
            f"(CV bal_acc={self.best_cv_score:.4f})"
        )
        print(f"Final training time: {final_train_time:.2f}s")

        return self

    def predict(self, X_test):
        return self.best_model_pipeline.predict(X_test)

    def predict_proba(self, X_test):
        return self.best_model_pipeline.predict_proba(X_test)[:, 1]