In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import io
import time
from abc import ABC
from typing import Tuple, List, Dict

from sklearn.model_selection import train_test_split, GridSearchCV, KFold, validation_curve
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                             classification_report, roc_curve, auc, confusion_matrix)
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

# Check if XGBoost is available
try:
    from xgboost import XGBClassifier
    xgboost_available = True
except ImportError:
    xgboost_available = False
    print("XGBoost is not installed. To install, execute: !pip install xgboost")


# -----------------------
# MetricTimer Class: Measures execution time for metric calculations
# -----------------------
class MetricTimer:
    """Utility class to measure execution time of a metric function."""
    @staticmethod
    def time_function(func, *args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start_time
        return result, elapsed


# -----------------------
# Data Loading and Processing
# -----------------------
class DataLoader:
    """Loads a CSV dataset using Google Colab upload."""
    @staticmethod
    def load_data_csv() -> pd.DataFrame:
        from google.colab import files
        print("Please upload your CSV file:")
        uploaded = files.upload()
        for fn in uploaded.keys():
            print(f"File uploaded: {fn}")
            return pd.read_csv(io.BytesIO(uploaded[fn]))
        raise ValueError("No file was uploaded.")


class DataProcessor:
    """Processes the data by handling missing values, encoding categorical features, and scaling."""
    def __init__(self, random_state: int = 42):
        self.random_state = random_state
        self.scaler = StandardScaler()
        self.feature_names = None

    def preprocess(self, df: pd.DataFrame, target_column: str,
                   test_size: float = 0.25) -> Tuple[np.ndarray, np.ndarray, pd.Series, pd.Series]:
        _handle_missing_values(df)
        X = df.drop(target_column, axis=1)
        y = df[target_column]
        # One-hot encode categorical features if needed
        X = pd.get_dummies(X)
        self.feature_names = X.columns

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=self.random_state, stratify=y
        )
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        return X_train_scaled, X_test_scaled, y_train, y_test


def _handle_missing_values(df: pd.DataFrame) -> None:
    """Handles missing values without causing chained assignment warnings."""
    if df.isnull().sum().sum() > 0:
        print("Missing values detected. Filling them...")
        num_cols = df.select_dtypes(include=[np.number]).columns
        for col in num_cols:
            if df[col].isnull().sum() > 0:
                df.loc[:, col] = df[col].fillna(df[col].mean())
        cat_cols = df.select_dtypes(exclude=[np.number]).columns
        for col in cat_cols:
            if df[col].isnull().sum() > 0:
                df.loc[:, col] = df[col].fillna(df[col].mode()[0])


# -----------------------
# Model Definition and Factory
# -----------------------
class Model(ABC):
    """Simple wrapper around a scikit-learn classifier."""
    def __init__(self, name: str, model):
        self.name = name
        self.model = model
        self.metrics: Dict = {}
        self.roc_data = None

    def fit(self, X_train, y_train):
        self.model.fit(X_train, y_train)
        return self

    def predict(self, X):
        return self.model.predict(X)

    def predict_proba(self, X):
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        score = self.model.decision_function(X)
        prob = 1 / (1 + np.exp(-score))
        return np.vstack([1 - prob, prob]).T

    def get_name(self) -> str:
        return self.name

    def get_metrics(self) -> Dict:
        return self.metrics


class ModelFactory:
    """Factory for creating all available models."""
    @staticmethod
    def create_models() -> List[Model]:
        models = []
        models.append(Model("Regressão Logística",
                            LogisticRegression(max_iter=1000, random_state=42)))
        models.append(Model("Árvore de Decisão",
                            DecisionTreeClassifier(max_depth=5, random_state=42)))
        models.append(Model("KNN",
                            KNeighborsClassifier(n_neighbors=5)))
        models.append(Model("Naive Bayes (Gaussian)",
                            GaussianNB()))
        models.append(Model("SVM",
                            SVC(probability=True, random_state=42)))
        # For MLPClassifier, we will later provide a tuned version via hyperparameter search.
        models.append(Model("Rede Neural (MLP)",
                            MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=1000, random_state=42)))
        models.append(Model("Random Forest",
                            RandomForestClassifier(n_estimators=100, random_state=42)))
        models.append(Model("Gradient Boosting",
                            GradientBoostingClassifier(n_estimators=100, random_state=42)))
        if xgboost_available:
            models.append(Model("XGBoost",
                                XGBClassifier(n_estimators=100, random_state=42)))
        return models


# -----------------------
# Hyperparameter Tuning for MLPClassifier
# -----------------------
class MLPHyperparameterTuner:
    'deixei em 2 pelo o calculo das labels n'
    """Performs hyperparameter tuning for MLPClassifier using hold-out and k-fold CV."""
    @staticmethod
    def tune(X_train, y_train, cv_folds: int = 2) -> MLPClassifier:
        from sklearn.neural_network import MLPClassifier
        # Define a hyperparameter grid for the MLPClassifier -> reduzir para melhor complexiadade
        param_grid = {
            'hidden_layer_sizes': [(50,)],
            'activation': ['relu', 'tanh'],
            'solver': ['adam', 'sgd'],
            'alpha': [0.0001, 0.001],
            'learning_rate': ['constant', 'adaptive']
        }
        # Setup k-fold cross validation
        cv = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
        mlp = MLPClassifier(max_iter=1000, random_state=42)
        grid_search = GridSearchCV(estimator=mlp,
                                   param_grid=param_grid,
                                   cv=cv,
                                   scoring='f1_weighted',
                                   n_jobs=-1,
                                   verbose=2)
        grid_search.fit(X_train, y_train)
        print("Tuned MLPClassifier Best Parameters:")
        print(grid_search.best_params_)
        print("Best Cross-Validation F1 Score: {:.3f}".format(grid_search.best_score_))
        return grid_search.best_estimator_


# -----------------------
# Model Evaluation with Metric Timing
# -----------------------
class ModelEvaluator:
    """Evaluates models using sklearn.metrics and measures timing."""
    @staticmethod
    def evaluate_model(model: Model, X_test, y_test) -> None:
        y_pred = model.predict(X_test)
        try:
            y_pred_proba = model.predict_proba(X_test)[:, 1]
        except Exception:
            y_pred_proba = np.zeros(len(y_test))
        acc, acc_time = MetricTimer.time_function(accuracy_score, y_test, y_pred)
        prec, prec_time = MetricTimer.time_function(precision_score, y_test, y_pred, average='weighted', zero_division=0)
        rec, rec_time = MetricTimer.time_function(recall_score, y_test, y_pred, average='weighted', zero_division=0)
        f1, f1_time = MetricTimer.time_function(f1_score, y_test, y_pred, average='weighted', zero_division=0)
        fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
        roc_auc_val = auc(fpr, tpr)

        model.metrics = {
            "Accuracy": acc,
            "Precision": prec,
            "Recall": rec,
            "F1 Score": f1,
            "ROC AUC": roc_auc_val,
            "Timing (s)": {
                "Accuracy": acc_time,
                "Precision": prec_time,
                "Recall": rec_time,
                "F1 Score": f1_time
            }
        }
        model.roc_data = pd.DataFrame({
            'FPR': fpr,
            'TPR': tpr,
            'Model': [model.get_name()] * len(fpr)
        })

        print(f"\n--- Evaluation Report for {model.get_name()} ---")
        print("Metrics:")
        for metric, value in model.metrics.items():
            if metric != "Timing (s)":
                print(f"{metric}: {value:.3f}")
        print("Timing (in seconds):")
        for m, t in model.metrics["Timing (s)"].items():
            print(f"{m}: {t:.5f}")
        print("\nClassification Report:")
        print(classification_report(y_test, y_pred, zero_division=0))


# -----------------------
# Result Visualization
# -----------------------
class ResultVisualizer:
    """Plots performance metrics and algorithm-specific illustrations."""

    @staticmethod
    def plot_metrics_comparison(results_df: pd.DataFrame) -> None:
        metrics = ["Accuracy", "Precision", "Recall", "F1 Score", "ROC AUC"]
        sorted_df = results_df.sort_values(by="F1 Score", ascending=False)
        melted = pd.melt(sorted_df, id_vars=["Model"], value_vars=metrics,
                         var_name="Metric", value_name="Score")
        plt.figure(figsize=(12, 8))
        sns.barplot(x="Score", y="Model", hue="Metric", data=melted, palette="viridis")
        plt.title("Comparison of Classification Metrics Across Models")
        plt.xlabel("Score")
        plt.ylabel("Model (ordered by F1 Score)")
        plt.xlim(0, 1)
        plt.legend(loc="lower right")
        plt.tight_layout()
        plt.show()

    @staticmethod
    def plot_algorithm_process(model: Model, X_test, y_test) -> None:
        """General plots: ROC curve and Confusion Matrix."""
        y_pred = model.predict(X_test)
        try:
            y_pred_proba = model.predict_proba(X_test)[:, 1]
        except Exception:
            y_pred_proba = np.zeros(len(y_test))
        fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
        roc_auc_val = auc(fpr, tpr)
        cm = confusion_matrix(y_test, y_pred)

        fig, axes = plt.subplots(1, 2, figsize=(14, 6))
        fig.suptitle(f"Process Plot for {model.get_name()}", fontsize=16)
        axes[0].plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC (AUC = {roc_auc_val:.2f})")
        axes[0].plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
        axes[0].set_xlim([0.0, 1.0])
        axes[0].set_ylim([0.0, 1.05])
        axes[0].set_xlabel("False Positive Rate")
        axes[0].set_ylabel("True Positive Rate")
        axes[0].set_title("ROC Curve")
        axes[0].legend(loc="lower right")

        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", ax=axes[1])
        axes[1].set_xlabel("Predicted Label")
        axes[1].set_ylabel("True Label")
        axes[1].set_title("Confusion Matrix")

        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()

    @staticmethod
    def plot_algorithm_specific(model: Model, X_test, y_test) -> None:
        """Creates a specific plot based on the algorithm type."""
        name = model.get_name()
        X_plot = X_test[:, :2]
        y_pred = model.predict(X_test)
        if name == "KNN":
            h = .02
            x_min, x_max = X_plot[:, 0].min() - 1, X_plot[:, 0].max() + 1
            y_min, y_max = X_plot[:, 1].min() - 1, X_plot[:, 1].max() + 1
            xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                                 np.arange(y_min, y_max, h))
            try:
                Z = model.model.predict(np.c_[xx.ravel(), yy.ravel()])
            except Exception:
                Z = np.zeros(xx.ravel().shape)
            Z = Z.reshape(xx.shape)
            plt.figure(figsize=(8, 6))
            plt.contourf(xx, yy, Z, alpha=0.4, cmap="coolwarm")
            plt.scatter(X_plot[:, 0], X_plot[:, 1], c=y_test, s=20, edgecolor='k')
            plt.xlabel("Feature 1")
            plt.ylabel("Feature 2")
            plt.title("KNN Decision Boundary")
            plt.show()
        elif name == "Regressão Logística":
            h = .02
            x_min, x_max = X_plot[:, 0].min() - 1, X_plot[:, 0].max() + 1
            y_min, y_max = X_plot[:, 1].min() - 1, X_plot[:, 1].max() + 1
            xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                                 np.arange(y_min, y_max, h))
            try:
                Z = model.model.predict(np.c_[xx.ravel(), yy.ravel()])
            except Exception:
                Z = np.zeros(xx.ravel().shape)
            Z = Z.reshape(xx.shape)
            plt.figure(figsize=(8,6))
            plt.contourf(xx, yy, Z, alpha=0.4, cmap="RdBu")
            plt.scatter(X_plot[:, 0], X_plot[:, 1], c=y_test, s=20, edgecolor="k")
            plt.xlabel("Feature 1")
            plt.ylabel("Feature 2")
            plt.title("Logistic Regression Decision Boundary")
            plt.show()
        elif name == "Árvore de Decisão":
            plt.figure(figsize=(12, 8))
            plot_tree(model.model, filled=True, fontsize=8)
            plt.title("Decision Tree Diagram")
            plt.show()
        elif name == "Naive Bayes (Gaussian)":
            try:
                y_pred_proba = model.predict_proba(X_test)[:, 1]
                plt.figure(figsize=(8,6))
                plt.hist(y_pred_proba, bins=20, color="skyblue", edgecolor="black")
                plt.title("Histogram of Predicted Probabilities (Gaussian NB)")
                plt.xlabel("Predicted Probability")
                plt.ylabel("Frequency")
                plt.show()
            except Exception:
                print("Predicted probabilities not available for Naive Bayes.")
        elif name == "SVM":
            if hasattr(model.model, "support_vectors_"):
                plt.figure(figsize=(8,6))
                plt.scatter(X_plot[:, 0], X_plot[:, 1], c=y_test, s=30, cmap="coolwarm", edgecolor="k")
                sv = model.model.support_vectors_
                plt.scatter(sv[:, 0], sv[:, 1], s=100, facecolors='none', edgecolors='k', label="Support Vectors")
                plt.xlabel("Feature 1")
                plt.ylabel("Feature 2")
                plt.title("SVM Support Vectors")
                plt.legend()
                plt.show()
            else:
                print("Support vectors not available or not applicable.")
        elif name == "Rede Neural (MLP)":
            if hasattr(model.model, "loss_curve_"):
                plt.figure(figsize=(8,6))
                plt.plot(model.model.loss_curve_, marker="o")
                plt.title("Neural Network Loss Curve")
                plt.xlabel("Iteration")
                plt.ylabel("Loss")
                plt.show()
            else:
                print("Loss curve not available for this neural network.")
        elif name == "Random Forest":
            try:
                importances = model.model.feature_importances_
                indices = np.argsort(importances)[::-1]
                features = np.array(model.model.feature_names_in_)
                plt.figure(figsize=(10,6))
                plt.title("Random Forest Feature Importances")
                plt.bar(range(len(importances)), importances[indices], color="lightblue", align="center")
                plt.xticks(range(len(importances)), features[indices], rotation=45)
                plt.tight_layout()
                plt.show()
            except Exception:
                print("Feature importances not available for Random Forest.")
        elif name == "Gradient Boosting":
            try:
                importances = model.model.feature_importances_
                indices = np.argsort(importances)[::-1]
                features = np.array(model.model.feature_names_in_)
                plt.figure(figsize=(10,6))
                plt.title("Gradient Boosting Feature Importances")
                plt.bar(range(len(importances)), importances[indices], color="salmon", align="center")
                plt.xticks(range(len(importances)), features[indices], rotation=45)
                plt.tight_layout()
                plt.show()
            except Exception:
                print("Feature importances not available for Gradient Boosting.")
        elif name == "XGBoost":
            try:
                importances = model.model.feature_importances_
                indices = np.argsort(importances)[::-1]
                features = np.array(model.model.feature_names_in_)
                plt.figure(figsize=(10,6))
                plt.title("XGBoost Feature Importances")
                plt.bar(range(len(importances)), importances[indices], color="green", align="center")
                plt.xticks(range(len(importances)), features[indices], rotation=45)
                plt.tight_layout()
                plt.show()
            except Exception:
                print("Feature importances not available for XGBoost.")
        else:
            ResultVisualizer.plot_algorithm_process(model, X_test, y_test)


# -----------------------
# Overall System: Model Comparison
# -----------------------
class ModelComparisonSystem:
    """Orchestrates loading CSV data from Google Colab, preprocessing, training, evaluating, and comparing models."""
    def __init__(self, random_state: int = 42):
        self.data_processor = DataProcessor(random_state=random_state)
        self.model_factory = ModelFactory()
        self.model_evaluator = ModelEvaluator()
        self.results_visualizer = ResultVisualizer()
        self.df = None
        self.target_column = None
        self.models: List[Model] = []
        self.results_df = None

    def load_data(self) -> pd.DataFrame:
        self.df = DataLoader.load_data_csv()
        return self.df

    def analyze_data(self) -> None:
        if self.df is None:
            raise ValueError("Data has not been loaded yet.")
        print("\nFirst few rows:")
        print(self.df.head())
        print("\nData Info:")
        print(self.df.info())
        print("\nDescriptive Statistics:")
        print(self.df.describe())

    def select_target(self, target_index: int) -> str:
        if self.df is None:
            raise ValueError("Data not loaded.")
        self.target_column = self.df.columns[target_index]
        print(f"\nSelected target column: {self.target_column}")
        return self.target_column

    def preprocess_data(self, test_size: float = 0.25) -> Tuple:
        if self.df is None or self.target_column is None:
            raise ValueError("Data or target column not set.")
        X_train, X_test, y_train, y_test = self.data_processor.preprocess(
            self.df, self.target_column, test_size
        )
        return X_train, X_test, y_train, y_test

    def tune_and_replace_mlp(self, X_train, y_train) -> None:
        """Tune the MLPClassifier hyperparameters and replace the default MLP in the model list."""
        tuned_mlp = MLPHyperparameterTuner.tune(X_train, y_train, cv_folds=5)
        for model in self.models:
            if model.get_name() == "Rede Neural (MLP)":
                model.model = tuned_mlp
                print("Replaced default MLPClassifier with tuned model.")
                break

    def train_and_evaluate_models(self, X_train, X_test, y_train, y_test) -> pd.DataFrame:
        self.models = self.model_factory.create_models()
        # Optionally, tune the MLPClassifier using hyperparameter tuning
        self.tune_and_replace_mlp(X_train, y_train)
        results = []
        for model in self.models:
            print(f"\nTraining {model.get_name()}...")
            model.fit(X_train, y_train)
            self.model_evaluator.evaluate_model(model, X_test, y_test)
            result = {"Model": model.get_name(), **model.metrics}
            results.append(result)
            self.results_visualizer.plot_algorithm_specific(model, X_test, y_test)
        self.results_df = pd.DataFrame(results)
        return self.results_df

    def get_best_model(self, metric: str = 'F1 Score') -> Model:
        best_model_name = self.results_df.loc[self.results_df[metric].idxmax(), 'Model']
        for model in self.models:
            if model.get_name() == best_model_name:
                return model
        raise ValueError(f"Model '{best_model_name}' not found.")

    def visualize_results(self) -> None:
        if self.results_df is None:
            raise ValueError("No results available. Run train_and_evaluate_models() first.")
        self.results_visualizer.plot_metrics_comparison(self.results_df)


# -----------------------
# Run the Interactive Pipeline
# -----------------------
def run_interactive():
    """Executes the entire pipeline interactively in Google Colab using CSV input."""
    system = ModelComparisonSystem(random_state=42)
    print("Welcome to the ML Model Comparison System (CSV input via Google Colab)!")
    system.load_data()
    system.analyze_data()
    print("\nAvailable columns:")
    for i, col in enumerate(system.df.columns):
        print(f"{i}: {col}")
    target_index = int(input("\nEnter the index of the target column: "))
    system.select_target(target_index)
    X_train, X_test, y_train, y_test = system.preprocess_data(test_size=0.3)
    system.train_and_evaluate_models(X_train, X_test, y_train, y_test)
    print("\nModel Performance Comparison:")
    sorted_results = system.results_df.sort_values(by="F1 Score", ascending=False)
    print(sorted_results)
    system.visualize_results()
    best_model = system.get_best_model(metric="F1 Score")
    print(f"\nBest Model: {best_model.get_name()} with F1 Score: {best_model.get_metrics()['F1 Score']:.3f}")


if __name__ == "__main__":
    run_interactive()