In [None]:
###### scikit-learn/classification ######
# 0. Visualize dataset according to data characteristics
# 1. Import Required Libraries
# 2. Load the Dataset (Covertype dataset)
# 3. Split the Data
# 4. Define Classification Models
# 5. Train and Evaluate Models
# 6. Return the Best Model and Visualize Performance Metrics
# 7. Predicting with the best model
###### scikit-learn/classification ######

In [None]:
# 0. Visualize dataset 

print("Starting to create Pair Plot...")

# 0.1 Pairplot
sns.pairplot(data_plot, hue=target_variable, palette='bwr')
plt.title = "1. Pair Plot\nsk-learn"
plt.show()

# 0.2 Histograms
# Optimized Histograms (top features)
top_features = feature_importance_df['feature'].tolist()[:top_importances]  # Assuming num_top_features is defined
# top_features = feature_importance_df['feature'].tolist()

selected_data_without_y = transformed_data[top_features]
data_train, data_plot = train_test_split(selected_data_without_y, test_size=0.01, random_state=42)
data_plot.hist(bins=15, figsize=(15, 10))
plt.title = "2. Histograms\nsk-learn"
plt.tight_layout()
plt.show()

# 0.3 Heatmap
data_corr = data_plot.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(data_corr, annot=True, cmap='coolwarm', linewidths=0.5)
plt.title='3. Correlation Heatmap(Top Features)\nsk-learn'
plt.show()


In [None]:
# 1. Import Required Libraries
from ISLP import load_data
from ISLP.models import ModelSpec as MS
import warnings 
warnings.filterwarnings('ignore') # mute warning messages

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import PolynomialFeatures, StandardScaler, OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, matthews_corrcoef
from sklearn.pipeline import Pipeline
from joblib import dump, load
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.calibration import calibration_curve
from sklearn.metrics import roc_curve, auc, confusion_matrix, precision_recall_curve,precision_recall_fscore_support
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os
import io
from sklearn.datasets import fetch_covtype
from sklearn.utils import Bunch
from sklearn.datasets import fetch_openml
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
import time
from sklearn.decomposition import PCA
from sklearn.base import clone
from sklearn.inspection import DecisionBoundaryDisplay
from mpl_toolkits import mplot3d
from matplotlib.colors import ListedColormap


In [None]:
# 2. Load the dataset
# Subfunction to load the dataset
def capitalize_first_letter(word):
    # Capitalize the first letter and convert the rest of the word to lowercase
    return word[:1].upper() + word[1:].lower()

def load_dataset(dataset_name):
    try: 
        capitalized_name = capitalize_first_letter(dataset_name)
        if capitalized_name == 'Covertype':
            data = fetch_covtype()
        elif capitalized_name == 'Higgs':
            data = fetch_openml(name='higgs', version=2)
        else: 
            data = load_data(capitalized_name)
        return data
    except:
        return 'No dataset named ' + dataset_name
    

In [None]:
# 3. Pipeline fitting
# 3.1 Subfunctions
def get_importances(best_model_name,model_step_name):
    # Load pipeline before use
    pipeline = load(f'{best_model_name}.joblib')
    model = pipeline.named_steps[model_step_name]

    # Get all attributes from the model object
    attributes = dir(model)

    # for attribute in attributes:
    #     print(attribute)
    importances = None
    if 'feature_importances_' in attributes:
        print(f"The attribute has been found: feature_importances_")
        return model.feature_importances_

    if 'coef_' in attributes:
        if len(model.coef_) == 1:  # For binary classification
            print(f"The attribute has been found: coef_ (binary classification)")
            return np.abs(model.coef_[0])
        else:  # For multiclass classification, take the average
            print(f"The attribute has been found: coef_ (multiclass classification)")
            return np.abs(model.coef_.mean(axis=0))

    if 'best_estimator_' in attributes:
        best_estimator = model.best_estimator_
        best_estimator_attributes = dir(best_estimator)
        # for best_estimator_attribute in best_estimator_attributes:
        #     print(best_estimator_attribute)
            
        if 'feature_importances_' in best_estimator_attributes:
            print(f"The attribute has been found in best_estimator_: feature_importances_")
            return model.best_estimator_.feature_importances_

        if 'coef_' in best_estimator_attributes:
            if len(best_estimator.coef_) == 1:  # For binary classification
                print(f"The attribute has been found in best_estimator_: coef_ (binary classification)")
                return np.abs(model.best_estimator_.coef_[0])
               
            else:  # For multiclass classification, take the average
                print(f"The attribute has been found in best_estimator_: coef_ (multiclass classification)")
                return np.abs(model.best_estimator_.coef_.mean(axis=0))
    

In [None]:
# 3.2 Pipeline fitting
def fit_classification_models(X,y,testsize,model_step_name,preprocessor, target_variable, random_seed=42, top_importances=4):
    # Split the Data
    try:
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testsize, stratify=y, random_state=42)

        print(f"X_train contains {len(X_train)} rows.")
        print(f"X_test contains {len(X_test)} rows.")
    except Exception as e:
        print(f"An error occurred while splitting the data: {e}")
        exit()

    # 4. Define Classification Models
    models = {
        'Logistic Regression': LogisticRegression(max_iter=10000),
        'Decision Tree Classifier': DecisionTreeClassifier(),
        'Random Forest Classifier': RandomForestClassifier(),
        'Gradient Boosting Classifier': GradientBoostingClassifier(),
        'Support Vector Classifier': SVC(probability=True),
        'K-Neighbors Classifier': KNeighborsClassifier(),
        'Neural Network': MLPClassifier(),  # Example, use appropriate neural network model
        'Regularized Linear Model': LogisticRegression(max_iter=10000, penalty='l2'),  # Example, use appropriate regularized linear model
    }
    # Define hyperparameters for models where applicable
    # Determine if the dataset is large or small based on the length of X_train
    if len(X_train) > 1000:  # Adjust this threshold based on your specific dataset size
        start, stop, step = 10, 51, 10  # Start from 10, go up to 50 (inclusive), with a step of 10
    else:
        start, stop, step = 3, 10, 2 # Start from 3, go up to 9 (inclusive), with a step of 2
    # Generate the continuous k_range
    k_range = list(range(start, stop, step))

    def generate_hidden_layer_sizes(X_train):
        # Determine the size of the training data
        num_features = X_train.shape[1]
        
        # Define a range of hidden layer sizes based on the size of the training data
        hidden_layer_sizes = [
            tuple(np.random.randint(50, 101, size=np.random.randint(1, num_features + 1))) 
            for _ in range(3)
        ]
        return hidden_layer_sizes
    
    # Define a function to generate dynamic parameter values
    def generate_param_range(start, end, step):
        return np.arange(start, end + step, step)

    # Define the dynamic parameter ranges
    alpha_range = generate_param_range(0.1, 10, 0.1)

    # add poly and interaction for logistic regression:
    # Define the dynamic parameter ranges for logistic regression
  
    degree_list = [1]
    model_params = {
        'Logistic Regression': {'C': [10**i for i in range(-3, 4)]},  # C values ranging from 0.001 to 1000,  
        'Decision Tree Classifier': {'max_depth': [3, 5, None]},
        'Random Forest Classifier': {'n_estimators': [50, 100, 200]},
        'Gradient Boosting Classifier': {'n_estimators': [50, 100, 200], 'learning_rate': [0.1, 0.5, 1.0]},
        'Support Vector Classifier': {'C': [10**i for i in range(-3, 4)], 'kernel': ['linear', 'rbf']},
        'K-Neighbors Classifier': {'n_neighbors': k_range} ,
        'Neural Network': {'hidden_layer_sizes': generate_hidden_layer_sizes(X_train)} ,
        'Regularized Linear Model': {'C': alpha_range}
    }
    # 5. Train and Evaluate Models
    best_model_info = None
    best_k =None
    classification_pipelines ={}
    for name, model in models.items():
        if name == 'Logistic Regression':
            for degree in degree_list:
                classification_pipelines[f'{name} (Degree {degree})'] = Pipeline([
                    ('preprocessor',preprocessor),
                    ("poly_features", PolynomialFeatures(degree=degree, include_bias=False)),
                    (f"{model_step_name}",  GridSearchCV(model, model_params[name], cv=5))
                ])
   
        else:
            classification_pipelines[f'{name} (Degree 1)'] = Pipeline([
                ('preprocessor', preprocessor),
                (f"{model_step_name}", GridSearchCV(model, model_params[name], cv=5))
            ])
       
    metrics_list =[]   
    for name, pipeline in classification_pipelines.items():
        try:
            # 5. Train the model
    
            print(f"Start training {name}...")
            start_time =time.time()
            pipeline.fit(X_train, y_train)
            print("Training Complete.")
            # Predict on X_test and save the pipeline in the disk
            y_pred = pipeline.predict(X_test)
            end_time = time.time()
            execution_time = end_time - start_time
            minutes = int(execution_time // 60)
            seconds = int(execution_time % 60)

            print("Execution time:", minutes, "minutes and", seconds, "seconds")
            # Calculate the metrics
            # print("Start predicting on test data and calculating metris...")

            def calculate_metrics(name, y_test, y_pred):
                metrics_dict = {}
                if len(set(y_test)) == 2 and set(y_test) == {0, 1}:
                    # Binary classification
                    accuracy = accuracy_score(y_test, y_pred)
                    precision = precision_score(y_test, y_pred)
                    recall = recall_score(y_test, y_pred)
                    f1 = f1_score(y_test, y_pred)
                    roc_auc = roc_auc_score(y_test, y_pred)
                    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
                    specificity = tn / (tn + fp) if tn + fp != 0 else None  # Handle division by zero
                    cm = confusion_matrix(y_test, y_pred)
                    mcc = matthews_corrcoef(y_test, y_pred)
                else:
                    # non-binary, multiclass classification
                    accuracy = accuracy_score(y_test, y_pred)
                    precision = precision_score(y_test, y_pred, average='weighted')
                    recall = recall_score(y_test, y_pred, average='weighted')
                    f1 = f1_score(y_test, y_pred, average='weighted')
                    roc_auc = specificity = cm = mcc = None
                # Add all metrics to the dictionary
                metrics_dict = {
                    'model': name,
                    'Accuracy': accuracy if accuracy is not None else float('-inf'),
                    'Precision': precision if precision is not None else float('-inf'),
                    'Recall': recall if recall is not None else float('-inf'),
                    'F1 Score': f1 if f1 is not None else float('-inf'),
                    'ROC-AUC Score': roc_auc if roc_auc is not None else float('-inf'),
                    'Specificity': specificity if specificity is not None else float('-inf'),
                    'Confusion Matrix': cm if cm is not None else float('-inf'),
                    'MCC': mcc if mcc is not None else float('-inf')
                }
                return metrics_dict
        
            metrics_list.append(calculate_metrics(name, y_test, y_pred))
            # Save the pipeline in the disk
            dump(pipeline, f"{name}.joblib")
   
        except Exception as e:
            print(f"An error occurred while training and evaluating the {name} model: {e}")

    # Sort the scores list based on the 'Accuracy' metric in descending order
    sorted_scores = sorted(metrics_list, key=lambda x: (x['Accuracy'], x['Precision'], x['Recall'], x['F1 Score'], x['ROC-AUC Score'], x['Specificity'], x['MCC']), reverse=True)
    print(sorted_scores)
    
    # 6. Return the Best Model and Visualize Performance Metrics

    best_model_info = sorted_scores[0]
    best_model_name = best_model_info['model']
    print(f'Best model found: {best_model_name}')
    print(best_model_info)

    # Get the best value of K from the pipeline
    if  'K-Neighbors Classifier' in best_model_name:
        # Load pipeline before use
        pipeline = load(f'{best_model_name}.joblib')
        best_k = pipeline.named_steps[model_step_name].best_params_['n_neighbors']
        print(f'best-k is {best_k}')
     

    # # Delete all other joblib files
    # for name, pipeline in classification_pipelines.items():
    #     for model_name in best_models_name: 
    #         if name == model_name:
    #             pass
    #         else: 
    #             joblib_file = f'{name}.joblib'
    #             if os.path.exists(joblib_file):
    #                 os.remove(joblib_file)

    #  Attributes in machine learning models, particularly those from libraries like scikit-learn, 
    # typically do not include constants or non-functional attributes directly related to the internal logic of classification models. 

    import re
    match = re.search(r'Degree (\d+)', best_model_name)
    degree = int(match.group(1))
    # print(f"Degree = {degree}")
    if degree ==1:
        feature_names =X_train.columns
    else: 
        # Load pipeline before use
        pipeline = load(f'{best_model_name}.joblib')
        poly_features = pipeline.named_steps['poly_features']
        column_names = X_train.columns.tolist() 
        feature_names = poly_features.get_feature_names_out(input_features=column_names)

    importances = get_importances(best_model_name, model_step_name)
    if importances is not None and len(importances)>0 :
        # print(importances)
        num_original_features = X_train.shape[1]
        # Remove 'x0', 'x1' prefix from feature names
        feature_names = [name.replace('x0', 'x1').replace('x1', 'x2') for name in feature_names]
        print(f"Originally, X_train contains {num_original_features} columns.")
        print(f"after fitting the model, features become {len(feature_names)},")
        print(f" and the feature importances contain {len(importances)} values.")

        if len(importances) <= len(feature_names)+1 :
            if len(importances) == len(feature_names)+1:
                print("Removing const importance...")
                importances = importances[1:]
            # Create a DataFrame to sort and display feature importances
            # Reshape importances array if necessary
            importances = np.ravel(importances)
        else:
            # calculate the interaction terms from the feature_names 
            # importances are just importances for the original columns from X_train
            importances_original = importances
            importances_interaction = []

            for name in feature_names[num_original_features:]:
                # Split the term into components if there are spaces (' ')
                components = name.split(' ')
                importance_interaction=1
                for component in components:
                    if '^' in component:
                        base, power =component.split('^')
                    else: 
                        base =component
                        power =1 

                    base_index= feature_names.index(base)
                
                    importance_component = importances_original[base_index] ** int(power)
                    
                    importance_interaction *= importance_component
                # Append the importance of the interaction term to the list
                importances_interaction.append(importance_interaction)

            # Combine the importances for original columns and interaction terms
            importances = np.concatenate((importances_original, importances_interaction))

        feature_importance_df = pd.DataFrame({
            'feature': feature_names,
            'importance': importances
        }).sort_values(by='importance', ascending=False)
        # Get the top rows
        # feature_importance_df = feature_importance_df.head(top_importances)
    else: 
        feature_importance_df = pd.DataFrame({
            'feature': feature_names,
            'importance': 0
        })
        print("The model does not have feature importances or coefficients.")
    # print(feature_importance_df)

    return best_model_name, best_model_info, feature_names, feature_importance_df

In [None]:
# 4. Model Evaluation
# 4.1 Subfuntion
def get_pipeline_model(model_name, model_step_name):
# best_model_name="Random Forest Classifier (Degree 1)"
# model_step_name = 'classification_model'
    pipeline = load(f'{model_name}.joblib')
    for step_name in pipeline.named_steps.keys():        
        if model_step_name == step_name: 
            model = pipeline.named_steps[model_step_name]
            return pipeline, model
        
    print(f"{model_step_name} is not valid in {pipeline.named_steps.keys()}")
    # importances = model.best_estimator_.feature_importances_
    # importances
    
pipeline_decision, _ = get_pipeline_model('pipeline_decision','classification_model')
# pipeline_decision = load('pipeline_decision.joblib')
print(len(pipeline_decision.steps))

In [None]:
# 4.2 Plot for model performance
def plot_all_metrics(best_model_name, best_model_info, X,y, testsize,model_step_name,feature_importance_df, top_importances, random_seed, categorical_columns ):
    
    # 1. Split the data using the same random_seed
    try:
        X_train_1, X_test_1, y_train_1, y_test_1 = train_test_split(X, y, test_size=testsize,  random_state=random_seed)
        X_train, X_test, y_train, y_test = train_test_split(X_test_1, y_test_1, test_size=testsize, stratify=y_test_1, random_state=42)

        # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testsize, stratify=y, random_state=random_seed)
        print(f"X_train contains {len(X_train)} rows.")
        print(f"X_test contains {len(X_test)} rows.")
    except Exception as e:
        print(f"An error occurred while splitting the data: {e}")
        exit()

    # train_data = X[X['Year']<=random_seed]
    # test_data = X[X['Year']>random_seed]
    # X_train = train_data.drop(columns=[target_variable])
    # y_train = train_data[target_variable]
    # X_test = test_data.drop(columns=[target_variable])
    # y_test = test_data[target_variable]
    
    
    metrics_to_plot = [
        # (plot_confusion_matrix, 'Multiclass Confusion Matrix'),
        # (plot_roc_curve, 'Receiver Operating Characteristic (ROC) Curve'),
        # (plot_precision_recall_curve, 'Precision-Recall Curve'),
        # (plot_calibration_curve, 'Calibration Curve'),
        # (plot_feature_importances, 'Feature Importances'),
        # (plot_combined_metrics, 'F1 Score, Specificity and MCC')
        # (plot_f1_score, 'F1 Score'),
        # (plot_specificity, 'Specificity'),
        # (plot_mcc, 'Matthews Correlation Coefficient (MCC)')       
    ]
    # 2. Load pipeline before plotting
    pipeline, _ = get_pipeline_model(best_model_name, model_step_name)
    # Define metrics to plot
    # plot_class_distribution(categorical_columns, X)
    # plot_classification_report(pipeline,X_test, y_test)
    # plot_decision_boundary(pipeline, X_train,y_train,model_step_name)
    plot_decision_boundary_2(pipeline, X_train,y_train,X_test, y_test,model_step_name)
    
    # 3. Create the figure and axes
    num_metrics = len(metrics_to_plot)
    num_cols = 3
    num_rows = (num_metrics + num_cols - 1) // num_cols
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(10*num_cols, 6*num_rows))

    # Adjust the spacing between subplots
    plt.subplots_adjust(wspace=1, hspace=1) 

    # Flatten the axes array
    axes = axes.flatten()

    # Loop over the metrics and titles, plotting each one
    for i, (plot_function, function_title) in enumerate(metrics_to_plot):
        # if 'ROC' in function_title:
        #     legend_elements =[]
        #     for a, single_model in enumerate(['Random Forest Classifier (Degree 1)', 'Decision Tree Classifier (Degree 1)', 'K-Neighbors Classifier (Degree 1)']):
        #         # Plot multiple models in one graph
        #         single_pipeline, _ = get_pipeline_model(single_model, model_step_name)
        #         plot_color = get_color(a)
        #         legend_elements.append(Line2D([0], [0], color=plot_color, lw=2, label=single_model))
        #         plot_function(single_pipeline,X_test, y_test, plot_color, 0.3+0.1*a,legend_elements, ax=axes[i])
                
        if function_title == 'Feature Importances':
            plot_function(pipeline,X_test, y_test, feature_importance_df, top_importances,ax=axes[i])
        elif function_title =='F1 Score, Specificity and MCC':
            
            plot_function(best_model_info, ax=axes[i])
        else:
            plot_function(pipeline,X_test, y_test, ax=axes[i])
        # Set title for the subplot
        axes[i].set_title(f'{i+1}. {function_title}\nsk-learn')

    # Hide any empty grid cells
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    plt.tight_layout()
    plt.show()

# Plot 4.1 - Class Distribution
def plot_class_distribution(categorical_columns,X):
    class_distribution = pd.DataFrame(columns=['Column', 'Class', 'Percentage'])

    for column in categorical_columns:
        categorical_columns_collected = [col for col in X.columns if col.startswith(column)]
        # print(categorical_columns_collected)
        for column_name in categorical_columns_collected:
            # Extract the class from the column name
            class_label = column_name.split('_')[-1]
            # Count the values for the current column
            value_count = X[column_name].value_counts().get(1, 0)
            # Calculate percentages
            percentage= (value_count/len(X))*100
         
            # Combine column name, class, and percentage into a DataFrame
            temp_df = pd.DataFrame({'Column': column_name, 'Class': class_label, 'Percentage': percentage},  index=[0])
            # print(f"temp_df is {temp_df}")
            # Append to the main DataFrame
            class_distribution = pd.concat([class_distribution, temp_df], ignore_index=True)

        # Plot the class distribution
        plt.figure(figsize=(10, 6))
        plt.bar(class_distribution['Class'], class_distribution['Percentage'])
        # sns.barplot(x='Class', y='Percentage', data=class_distribution, palette='viridis')

        plt.xlabel(f'Class Label of {column}')
        plt.ylabel('Percentage')
        # plt.title(f'Classification Distribution of {column_name}')
        plt.xticks(rotation=45)  # Rotate x-axis labels for better readability
        plt.show()

# Plot 4.2 - Classification Report
def plot_classification_report(pipeline,X_test, y_test):
    
    y_pred = pipeline.predict(X_test)
    # Generate the classification report
    report = classification_report(y_test, y_pred)
    print(report)

# Plot 4.3 - Multiclass Confusion Matrix
def plot_confusion_matrix(pipeline ,X_test, y_test, ax):
    # Load pipeline before use
    # pipeline = load(f'{best_model_name}.joblib')
 
    y_pred = pipeline.predict(X_test)
    # Extract unique class names from y_true
    class_names = np.unique(y_test)
    class_names_str = [f'Class {int(name)}' for name in class_names]  # Convert to string if needed
    print(class_names)
    # Generate the confusion matrix
    cm = confusion_matrix(y_test, y_pred, labels=class_names)
    print(cm)

    if ax is None: 
        ax =plt.gca()  # Get the current Axes instance if none is provided
    # sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', cbar=False, ax=ax)
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names_str, yticklabels=class_names_str, ax =ax)

    ax.set_xlabel('Predicted')
    ax.set_ylabel('Actual')    
    
    # Free up memory after use
    del pipeline 

# Plot 4.4 - ROC Curve
# def plot_roc_curve(pipeline,X_test,y_test,  plot_color = get_color(0), plot_alpha =1 ,legend_elements = None, ax):
def plot_roc_curve(pipeline,X_test,y_test,ax):
    # Load pipeline before use
    # pipeline = load(f'{best_model_name}.joblib')
    # y_pred = pipeline.predict(X_test)

    # Predict probabilities for each class
    if hasattr(pipeline, "predict_proba"):  # Check if model has predict_proba method
        y_prob = pipeline.predict_proba(X_test)
    else:
        y_prob = pipeline.decision_function(X_test)
    # y_prob = pipeline.predict_proba(X_test)
    class_names = np.unique(y_test)
    # print(f"Unique values in y_test : {class_names}")
    num_classes = len(class_names)
    # Compute ROC Curve and ROC area for each class
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    # Convert multi-class labels to binary labels for each class
    
    # Initialize LabelBinarizer
    binarizer = LabelBinarizer()
    # Fit and transform y_test to obtain binary labels
    binary_labels = binarizer.fit_transform(y_test)
    # column_counts = np.sum(binary_labels, axis =0)
    # for class_index, count in enumerate(column_counts):
    #     print(f'Class {class_index+1} : {count}')

    for i in range(num_classes):
        fpr[i], tpr[i], _ = roc_curve(binary_labels[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Plot ROC Curve for each class
    for i in range(num_classes):
        ax.plot(fpr[i], tpr[i], label=f'Class {i+1} (AUC = {roc_auc[i]:.2f})', alpha=1)
        ax.fill_between(fpr[i], tpr[i])
   
    ax.plot([0, 1], [0, 1], 'k--', lw=2)  # Plot diagonal line for reference
    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    # ax.legend(handles = legend_elements, loc='best', title ='Models')
    ax.legend(loc='best' , title = 'Random Forest Classifier')

    del pipeline

# Plot 4.5 - Precision-Recall Curve
def plot_precision_recall_curve(pipeline,X_test, y_test, ax):
    if hasattr(pipeline, "predict_proba"):  # Check if model has predict_proba method
        y_prob = pipeline.predict_proba(X_test)
    else:
        y_prob = pipeline.decision_function(X_test)

    class_names = np.unique(y_test)
    # print(f"Unique values in y_test : {class_names}")
    num_classes = len(class_names)
    # Initialize LabelBinarizer
    binarizer = LabelBinarizer()
    # Fit and transform y_test to obtain binary labels
    binary_labels = binarizer.fit_transform(y_test)
    precision = dict()
    recall = dict()
    for i in range(num_classes):
        precision[i], recall[i], _ = precision_recall_curve(binary_labels[:, i], y_prob[:, i])
        ax.plot(recall[i], precision[i], label='Class %s' % class_names[i])

    ax.set_xlabel('Recall')
    ax.set_ylabel('Precision')
    ax.legend()

    del pipeline

# Plot 4.6 - Calibration_curve
def plot_calibration_curve(pipeline,X_test,y_test, ax):
   
    if hasattr(pipeline, "predict_proba"):  # Check if model has predict_proba method
        y_prob = pipeline.predict_proba(X_test)
    else:
        y_prob = pipeline.decision_function(X_test)

    class_names = np.unique(y_test)
    # print(f"Unique values in y_test : {class_names}")
    num_classes = len(class_names)
    # Initialize LabelBinarizer
    binarizer = LabelBinarizer()
    # Fit and transform y_test to obtain binary labels
    binary_labels = binarizer.fit_transform(y_test)
  
    # Divide Data into Bins
    bins = np.linspace(0,1,10)
    # Calculate Calibration Curves for Each Class
    for i, class_name in enumerate(class_names):
        prob_true, prob_pred = calibration_curve(binary_labels[:, i], y_prob[:, i], n_bins=10)
        
        # Plot Calibration Curve for Current Class
        ax.plot(prob_pred, prob_true, marker='o', linestyle='-', label=f'Class {class_name} Calibration')
        ax.plot([0, 1], [0, 1], linestyle='--', color='gray')  # Diagonal line for perfect calibration

    ax.set_xlabel('Mean Predicted Probability')
    ax.set_ylabel('Fraction of Positives')
    ax.legend(loc='upper left')
    ax.grid(True)
    
    del pipeline

# Plot 4.7 Feature Importances or Coefficients
def plot_feature_importances(pipeline, X_test, y_test,feature_importance_df,top_importances, ax):
   
    top_feature_importance = feature_importance_df.head(12)
    ax.hlines(y=top_feature_importance['feature'], xmin=0, xmax=top_feature_importance['importance'], color='skyblue')
    ax.plot(top_feature_importance['importance'], top_feature_importance['feature'], 'o', color='skyblue')
    ax.set_xlabel(f'Top {top_importances} Feature')
    ax.set_ylabel('Feature')
    ax.tick_params(axis='x', rotation=45)  # Rotate x-axis labels for better readability
    
# Plot 4.8 Combine below 3 metric into a single chart
# Combine all plots into one
def plot_combined_metrics(best_model_info, ax):

    # Replace these placeholders with your actual metric values
    metrics = {
        'F1 Score': best_model_info['F1 Score'],
        'Specificity': best_model_info['Specificity'],
        'MCC': best_model_info['MCC']
    }

    colors = ['green', 'orange', 'purple']
    positions = np.arange(len(metrics))

    for i, (metric_name, metric_value) in enumerate(metrics.items()):
        ax.bar(positions[i], metric_value, color=colors[i], label=metric_name)

    ax.set_ylabel('Score')
    ax.set_xticks(positions)
    ax.set_xticklabels(metrics.keys())
    ax.legend()

# Plot 4.9 - Desicion Boundary 
def plot_decision_boundary(pipeline, X_train,y_train,model_step_name, ax=None):
    
    # The original best model we chose previously only accepts X as valid input
    # So we will add pca step into the pipeline after the preprocessor

    pipeline_clone=clone(pipeline)
    # Add PCA step to the existing pipeline
    existing_steps = pipeline_clone.steps
    pca = PCA(n_components=2)
    # Insert pca before the last step --> best estimator --> (model + grid search)
    new_steps = existing_steps[:-1] + [('pca', pca)] + existing_steps[-1:]

    pipeline_decision = Pipeline(new_steps)
    # print(pipeline_decision.steps)
    
    print("Starting training pipeline for desicion boundary...")
    start_time =time.time()
    pipeline_decision.fit(X_train, y_train)
    print("Training Complete.")
    end_time = time.time()
    execution_time = end_time - start_time
    minutes = int(execution_time // 60)
    seconds = int(execution_time % 60)
    print("Execution time:", minutes, "minutes and", seconds, "seconds")
    dump(pipeline_decision, "pipeline_decision.joblib")
    steps = pipeline_decision.steps
    X_train_transformed = X_train.copy()  # Make a copy of X_train
    for step_name, transformer in steps:
        print(step_name, ":", transformer)
        if step_name == model_step_name:
            continue
        # Transform the data using the current transformer
        X_train_transformed = transformer.transform(X_train_transformed)

    # Extract the best estimator from the pipeline
    best_estimator_with_grid = pipeline_decision.named_steps[model_step_name].best_estimator_
    
    # Plot decision boundaries
    if ax is None:
        fig, ax= plt.subplots(figsize=(10,8))
    else:
        fig = ax.figure
    DecisionBoundaryDisplay.from_estimator(
        best_estimator_with_grid,
        X_train_transformed,
        cmap = plt.cm.Paired,
        ax = ax, 
        response_method= 'predict',
        xlabel="Principal Component 1",
        ylabel="Principal Component 2"    
    )
    plt.axis('tight')
    ax.set_xlim(X_train_transformed[:, 0].min() - 0.5, X_train_transformed[:, 0].max() + 0.5)  # Adjust x-axis limits
    ax.set_ylim(X_train_transformed[:, 1].min() - 0.5, X_train_transformed[:, 1].max() + 0.5)  # Adjust y-axis limits

    # Increase point size and reduce transparency for better visibility
    ax.scatter(X_train_transformed[:, 0], X_train_transformed[:, 1], c=y_train, cmap=plt.cm.Paired, edgecolor="black", s=40, alpha=0.7)

    plt.title("Decision boundary of Random Forest classifier on covertype dataset (PCA)")
    plt.legend()
    plt.show()


In [None]:
# 5. Main Scripts
def main():
    # 2. Load the Dataset ()
    try:
        dataset_name ='covertype'
        data = load_dataset(dataset_name)
    except Exception as e:
        print(f"An error occurred while loading the dataset: {e}")
        exit()
   
    # Specify the target variable
    target_variable = 'Cover_Type'
    # Check if data is already a DataFrame
   
    try:
        if not isinstance(data, pd.DataFrame):
            print(f"Converting {dataset_name} from {type(data)} to a data frame.")
            if isinstance(data, str):
                data_frame = pd.read_csv(io.StringIO(data))
            elif isinstance(data, Bunch):
                # Convert the data to a DataFrame
                data_frame = pd.DataFrame(data.data, columns=data.feature_names)
                data_frame[target_variable]= data.target
                
            else:
                data_frame = pd.DataFrame(data)
        else:
            print(f"Data from {dataset_name} is already a data frame.")
            data_frame =data
    except Exception as e:
        print("Error:", e)

    transformed_data = data_frame.copy()
    y = transformed_data[target_variable]

    # Identifying the feature types
    numeric_features =[]
    categorical_columns = ['Soil_Type_', 'Wilderness_Area_']
    # Loop through columns to find numeric features
    for col in transformed_data.columns:
        if col.startswith(tuple(categorical_columns)):
            continue  # Skip categorical columns
        if transformed_data[col].dtype in ['int64', 'float64']:
            numeric_features.append(col)
   

    # Check and convert Binary columns
    soil_type_columns = [col for col in transformed_data.columns if col.startswith('Soil_Type')]
    wilderness_area_columns = [col for col in transformed_data.columns if col.startswith('Wilderness_Area')]

    # Function to check if columns are binary or non-binary, including handling of null values
    def check_binary_columns(data, columns):
        binary_columns = []
        non_binary_columns = []
        for col in columns:
            unique_values = data[col].dropna().unique()
            if set(unique_values) <= {0, 1}:
                binary_columns.append(col)
            else:
                non_binary_columns.append(col)
        return binary_columns, non_binary_columns

    soil_type_binary, soil_type_non_binary = check_binary_columns(transformed_data, soil_type_columns)
    wilderness_area_binary, wilderness_area_non_binary = check_binary_columns(transformed_data, wilderness_area_columns)

    categorical_binary_features = soil_type_binary + wilderness_area_binary
    categorical_non_binary_features = soil_type_non_binary + wilderness_area_non_binary
    print(categorical_non_binary_features)
    # Includes all features
    all_features = numeric_features + categorical_binary_features + categorical_non_binary_features

    # Define X
    X = transformed_data[all_features]

    # Handling missing values and scaling numerical features
    numerical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    # Transformer for binary categorical features
    binary_categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent'))  # Impute missing values for binary categorical features
    ])

    # Combine imputer and onehot for non-binary categorical features
    non_binary_categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    # Create Proprecessor
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numeric_features),
            ('binary_cat', binary_categorical_transformer, categorical_binary_features),  
            ('non_binary_cat', non_binary_categorical_transformer, categorical_non_binary_features)
        ]
    )  
      
    top_importances =12
    testsize=0.2
    # random_seed=42
    # Or devide the datasets similar to 'Smarket' by year
    random_seed = 2004
    model_step_name = 'classification_model'
    best_model_name = None
    best_model_name, best_model_info, feature_names, feature_importance_df = fit_classification_models(X,y,testsize,model_step_name, preprocessor, target_variable,random_seed, top_importances)

    # feature_importance_df contains importances for all 40 soil types, aggregate them all
    feature_importance_df = get_feature_importance_df()
    # Loop through each prefix in the categorical_list
    for prefix in categorical_columns:
        # Filter the DataFrame to select features starting with the current prefix
        categorical_features = feature_importance_df[feature_importance_df['feature'].str.startswith(prefix)]
        # Insert the prefix as a new row into the DataFrame
        feature_importance_df.loc[len(feature_importance_df)] = {
            'feature': prefix, 
            'importance': categorical_features['importance'].sum()
        }
        
    # Sort the DataFrame based on the new values
    feature_importance_df = feature_importance_df.sort_values(by='importance', ascending=False)

    # as there are aggregation values in the feature_importance_df,
    # it is not suitable to plot for these aggregated columns
    # Extract the top_features from feature_importance_df
    top_features =feature_importance_df['feature'].tolist()[:top_importances]
    # Loop through each prefix in the categorical_columns
    for prefix in categorical_columns:
        # Filter the DataFrame to select rows where 'feature' column matches the current prefix
        if prefix in top_features:
            top_features.remove(prefix)

    # Display the updated DataFrame
    print(f"top_features are {top_features}")

    # 6. Visualize Model Performance 
    if best_model_name is None:
        best_model_info = "1"
        best_model_name = "Random Forest Classifier (Degree 1)"
    plot_all_metrics(best_model_name, best_model_info, X,y, testsize,model_step_name,feature_importance_df, top_importances, random_seed, categorical_columns)
    
    # 7. Predicting with the best model
    if best_model_name: 
        # Load pipeline before use
        pipeline = load(f'{best_model_name}.joblib')
        new_data = X[0:5]  # Just an example of new data
        predictions = pipeline.predict(new_data)
        print("Predictions on new data:", predictions)
    else:
        print("Can't find the best model.")
    del pipeline
if __name__ == "__main__":
    main()
###### scikit-learn/classification ######