In [None]:
# ANN_classification_with_random_search_modified_updated.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import L1L2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

def create_model_classification(n_features, unique_classes, dropout_rate=0.2, activation_function='relu', 
                 optimizer='adam', num_hidden_layers=2, num_neurons_per_layer=64,
                 kernel_regularizer=None, early_stop_patience=None, verbose=None):
    """
    Create a deep learning model.

    Args:
    n_features (int): Number of input features.
    unique_classes (int): Number of unique target classes.
    dropout_rate (float): Dropout rate.
    activation_function (str): Activation function.
    optimizer (str): Deep learning optimizer.
    num_hidden_layers (int): Number of hidden layers.
    num_neurons_per_layer (int): Number of neurons in hidden layers.
    kernel_regularizer (Optional[Keras Regularizer]): Regularization function for the layers.
    early_stop_patience (Optional[int]): Number of epochs with no improvement after which training will be stopped.
    verbose (Optional[int]): Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch.
    
    Returns:
    Keras model: A deep learning model with the specified architecture.
    """

    if unique_classes == 2:
        loss = 'binary_crossentropy'
        output_activation = 'sigmoid'
        n_output_neurons = 1
    else:
        loss = 'categorical_crossentropy'
        output_activation = 'softmax'
        n_output_neurons = unique_classes

    model = Sequential()

    model.add(Dense(n_features, activation=activation_function, input_dim=n_features, kernel_regularizer=kernel_regularizer))
    model.add(Dropout(dropout_rate))

    for _ in range(num_hidden_layers):
        model.add(Dense(num_neurons_per_layer, activation=activation_function, kernel_regularizer=kernel_regularizer))
        model.add(Dropout(dropout_rate))

    model.add(Dense(n_output_neurons, activation=output_activation))
    model.compile(optimizer=optimizer, loss=loss, metrics='accuracy')

    return model

def ANN_classification_with_random_search(data, dep_var, drop_columns=[], n_iter=50, cv=3, verbose=1,
                                                    activation_functions=["relu", "tanh"],
                                                    dropout_rates=[0.1, 0.2, 0.3, 0.4],
                                                    optimizers=["adam", "rmsprop", "adagrad"],
                                                    batch_sizes=[16, 32, 64, 128], epochs=[50, 100, 200],
                                                    early_stop_patience=[5, 10, 15],
                                                    num_hidden_layers_range = range(1, 4),
                                                    neurons_per_layer_range = [16, 32, 48, 64],
                                                    learning_rate_range = [0.001, 0.01, 0.1],
                                                    model_save=True, save_directory=None, plot_loss=True,
                                                    test_size=0.2, random_state=42, new_data=None):
    """
    Runs a deep learning classification process with Randomized Search for hyperparameter optimization.

    Args:
    Required:
    data (pd.DataFrame): Data to be used for model training.
    dep_var (str): Target variable.

    Optional:
    drop_columns (List[str]): Columns to be dropped from dataframe.
    n_iter (int): Number of iterations for RandomizedSearchCV.
    cv (int): Number of cross validation folds.
    verbose (int): Print log level.
    activation_functions (List[str]): Activation functions to be used.
    dropout_rates (List[float]): Dropout rates.
    optimizers (List[str]): Deep learning optimizers to be used.
    batch_sizes (List[int]): Batch sizes to be used.
    epochs (List[int]): Number of epochs to be used.
    early_stop_patience (List[int]): Early stopping patience values.
    num_hidden_layers_range (List[int]): Number of hidden layer values to search.
    neurons_per_layer_range (List[int]): Number of neurons per layer values.
    learning_rate_range (List[float]): Learning rates.
    model_save (bool): If True, save the model.
    save_directory (Optional[str]): Path to the directory to save the model.
    test_size (float): Test set proportion.
    random_state (int): Random state for train_test_split.
    new_data (pd.DataFrame): If provided, predict target values for this data.

    Returns:
    pd.DataFrame: A dataframe with model results.
    """
    # drop columns
    X = data.drop([dep_var] + drop_columns, axis=1).values
    y = data[dep_var].values
    # split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
    
    # one hot encode target variable if more than 2 classes
    unique_classes = len(np.unique(data[dep_var].values))
    if unique_classes > 2:
        y_train = to_categorical(y_train, num_classes=unique_classes)
        y_test = to_categorical(y_test, num_classes=unique_classes)
    
    # scale data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    # get number of features
    n_features = X_train_scaled.shape[1]
    
    # create model
    deep_learning_model = KerasClassifier(model=create_model_classification, model__n_features=n_features,
                                          model__unique_classes=unique_classes, verbose=verbose)
    
    # Define the random search parameters
    param_dist = {
        "model__activation_function": activation_functions,
        "model__dropout_rate": dropout_rates,
        "model__optimizer": optimizers,
        "model__num_hidden_layers": num_hidden_layers_range,
        "model__num_neurons_per_layer": neurons_per_layer_range,
        "model__kernel_regularizer": [L1L2(l1=0, l2=regularization_strength) for regularization_strength in learning_rate_range],
        "batch_size": batch_sizes,
        "epochs": epochs,
        "model__early_stop_patience": early_stop_patience
    }
    
    # Create the RandomizedSearchCV object
    random_search = RandomizedSearchCV(deep_learning_model, param_distributions=param_dist, n_iter=n_iter,
                                       cv=cv, error_score=np.nan, verbose=verbose)
    
    # Create EarlyStopping and ModelCheckpoint callbacks
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=verbose, patience=early_stop_patience)
    mc = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', verbose=verbose, save_best_only=True)
    
    # Fit to the training data
    random_search.fit(X_train_scaled, y_train, validation_data=(X_test_scaled, y_test),callbacks=[es, mc])
    
    # Get the best model from the RandomizedSearchCV
    best_model = random_search.best_estimator_
    
    # Get the best parameters from the RandomizedSearchCV
    best_params = random_search.best_params_

    # create and fit final model with best parameters
    final_model = create_model_classification(n_features=n_features, unique_classes=unique_classes, 
                                            dropout_rate=best_params['model__dropout_rate'],
                                           activation_function=best_params['model__activation_function'],
                                           optimizer=best_params['model__optimizer'],
                                           num_hidden_layers=best_params['model__num_hidden_layers'],
                                           num_neurons_per_layer=best_params['model__num_neurons_per_layer'],
                                           kernel_regularizer=best_params['model__kernel_regularizer'],
                                           early_stop_patience=best_params['model__early_stop_patience'])
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=verbose, patience=best_params['model__early_stop_patience'])
    mc = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', verbose=verbose, save_best_only=True)
    history = final_model.fit(X_train_scaled, y_train, validation_data=(X_test_scaled, y_test),
                              epochs=best_params['epochs'], batch_size=best_params['batch_size'], callbacks=[es, mc])
    
    # Plotting the training and validation loss
    if plot_loss:
        plt.figure(figsize=(10, 6))
        plt.plot(history.history['loss'], label='Training Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.title('Loss Plot')
        plt.show()

    # Save the model
    if model_save:
        model_save_path = os.path.join(save_directory, 'best_classification_ANN_model_RandomSearch.h5') if save_directory else 'best_classification_ANN_model_RandomSearch.h5'
        final_model.save(model_save_path)

    # Predict target values for training and test data
    if unique_classes > 2:
        y_pred_train = np.argmax(final_model.predict(X_train_scaled), axis=1)
        y_pred_test = np.argmax(final_model.predict(X_test_scaled), axis=1)
        # Use inverse_transform only if you use OneHotEncoder during preprocessing
        y_train_original = np.argmax(y_train, axis=1)
        y_test_original = np.argmax(y_test, axis=1)
    else:
        y_pred_train = (final_model.predict(X_train_scaled) > 0.5).astype('int32')
        y_pred_test = (final_model.predict(X_test_scaled) > 0.5).astype('int32')
        # No need for inverse_transform, use original y_train and y_test
        y_train_original = y_train
        y_test_original = y_test
    
    # Calculate metrics
    train_accuracy = accuracy_score(y_train_original, y_pred_train)
    test_accuracy = accuracy_score(y_test_original, y_pred_test)
    conf_mat = confusion_matrix(y_test_original, y_pred_test)
    class_report = classification_report(y_test_original, y_pred_test, output_dict=True, zero_division=1)
    
    # Predict values for new data
    if new_data is not None:
        new_X = new_data.drop([dep_var] + drop_columns, axis=1)
        new_y = new_data[dep_var].values

        new_X_scaled = scaler.fit_transform(new_X)
        # Predict target values for new data
        if unique_classes > 2:
            new_y_pred = np.argmax(final_model.predict(new_X_scaled), axis=1)
        else:
            new_y_pred = (final_model.predict(new_X_scaled) > 0.5).astype('int32')

        new_y_residual = new_y - new_y_pred.flatten()
    else:
        new_y_pred = None
        new_y_residual = None
    
    # Create a simple report
    simple_report = {
        'Accuracy': round(class_report['accuracy'], 3),
        'Precision': round(class_report['macro avg']['precision'], 3),
        'Recall': round(class_report['macro avg']['recall'], 3),
        'F1-score': round(class_report['macro avg']['f1-score'], 3),
        'Supoort': round(class_report['macro avg']['support'], 3)
    }
    
    # Create a dataframe with results
    results = {
        "Model": ["ANN Classification with Random Search(Best Model)"],
        "Train Accuracy": [train_accuracy],
        "Test Accuracy": [test_accuracy],
        "Confusion Matrix": [conf_mat.tolist()],
        "Classification Report": [simple_report],
        "Hyperparameters": [best_params],
        "New Predicted": [new_y_pred] if new_y_pred is not None else [None],
        "New Residual": [new_y_residual] if new_y_residual is not None else [None]
    }

    return pd.DataFrame(results)


In [None]:
# Grid search best deep learning classification model

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import L1L2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

def create_model_classification(n_features, unique_classes, dropout_rate=0.2, activation_function='relu', 
                 optimizer='adam', num_hidden_layers=2, num_neurons_per_layer=64,
                 kernel_regularizer=None, early_stop_patience=None, verbose=None):
    """
    Create a deep learning model.

    Args:
    n_features (int): Number of input features.
    unique_classes (int): Number of unique target classes.
    dropout_rate (float): Dropout rate.
    activation_function (str): Activation function.
    optimizer (str): Deep learning optimizer.
    num_hidden_layers (int): Number of hidden layers.
    num_neurons_per_layer (int): Number of neurons in hidden layers.
    kernel_regularizer (Optional[Keras Regularizer]): Regularization function for the layers.
    early_stop_patience (Optional[int]): Number of epochs with no improvement after which training will be stopped.
    verbose (Optional[int]): Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch.
    
    Returns:
    Keras model: A deep learning model with the specified architecture.
    """

    if unique_classes == 2:
        loss = 'binary_crossentropy'
        output_activation = 'sigmoid'
        n_output_neurons = 1
    else:
        loss = 'categorical_crossentropy'
        output_activation = 'softmax'
        n_output_neurons = unique_classes

    model = Sequential()

    model.add(Dense(n_features, activation=activation_function, input_dim=n_features, kernel_regularizer=kernel_regularizer))
    model.add(Dropout(dropout_rate))

    for _ in range(num_hidden_layers):
        model.add(Dense(num_neurons_per_layer, activation=activation_function, kernel_regularizer=kernel_regularizer))
        model.add(Dropout(dropout_rate))

    model.add(Dense(n_output_neurons, activation=output_activation))
    model.compile(optimizer=optimizer, loss=loss, metrics='accuracy')

    return model

def ANN_classification_with_grid_search(data, dep_var, drop_columns=[], n_iter=50, cv=3, verbose=1,
                                                    activation_functions=["relu", "tanh"],
                                                    dropout_rates=[0.1, 0.2, 0.3, 0.4],
                                                    optimizers=["adam", "rmsprop", "adagrad"],
                                                    batch_sizes=[16, 32, 64, 128], epochs=[50, 100, 200],
                                                    early_stop_patience=[5, 10, 15],
                                                    num_hidden_layers_range = range(1, 4),
                                                    neurons_per_layer_range = [16, 32, 48, 64],
                                                    learning_rate_range = [0.001, 0.01, 0.1],
                                                    model_save=True, save_directory=None, plot_loss=True,
                                                    test_size=0.2, random_state=42, new_data=None):
    """
    Runs a deep learning classification process with Grid Search for hyperparameter optimization.

    Args:
    Required:
    data (pd.DataFrame): Data to be used for model training.
    dep_var (str): Target variable.

    Optional:
    drop_columns (List[str]): Columns to be dropped from dataframe.
    n_iter (int): Number of iterations for RandomizedSearchCV.
    cv (int): Number of cross validation folds.
    verbose (int): Print log level.
    activation_functions (List[str]): Activation functions to be used.
    dropout_rates (List[float]): Dropout rates.
    optimizers (List[str]): Deep learning optimizers to be used.
    batch_sizes (List[int]): Batch sizes to be used.
    epochs (List[int]): Number of epochs to be used.
    early_stop_patience (List[int]): Early stopping patience values.
    num_hidden_layers_range (List[int]): Number of hidden layer values to search.
    neurons_per_layer_range (List[int]): Number of neurons per layer values.
    learning_rate_range (List[float]): Learning rates.
    save_directory (Optional[str]): Path to the directory to save the model.
    test_size (float): Test set proportion.
    random_state (int): Random state for train_test_split.
    new_data (pd.DataFrame): If provided, predict target values for this data.

    Returns:
    pd.DataFrame: A dataframe with model results.
    """
    # drop columns
    X = data.drop([dep_var] + drop_columns, axis=1).values
    y = data[dep_var].values
    # split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
    
    # one hot encode target variable if more than 2 classes
    unique_classes = len(np.unique(data[dep_var].values))
    if unique_classes > 2:
        y_train = to_categorical(y_train, num_classes=unique_classes)
        y_test = to_categorical(y_test, num_classes=unique_classes)
    
    # scale data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    # get number of features
    n_features = X_train_scaled.shape[1]
    
    # create model
    deep_learning_model = KerasClassifier(model=create_model_classification, model__n_features=n_features,
                                          model__unique_classes=unique_classes, verbose=verbose)
    
    # Define the grid search parameters
    param_grid = {
        "model__activation_function": activation_functions,
        "model__dropout_rate": dropout_rates,
        "model__optimizer": optimizers,
        "model__num_hidden_layers": num_hidden_layers_range,
        "model__num_neurons_per_layer": neurons_per_layer_range,
        "model__kernel_regularizer": [L1L2(l1=0, l2=regularization_strength) for regularization_strength in learning_rate_range],
        "batch_size": batch_sizes,
        "epochs": epochs,
        "model__early_stop_patience": early_stop_patience
    }
    
    # Create the RandomizedSearchCV object
    random_search = GridSearchCV(deep_learning_model, param_grid=param_grid, 
                                       cv=cv, error_score=np.nan, verbose=verbose)
    
    # Create EarlyStopping and ModelCheckpoint callbacks
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=verbose, patience=early_stop_patience)
    mc = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', verbose=verbose, save_best_only=True)
    
    # Fit to the training data
    random_search.fit(X_train_scaled, y_train, validation_data=(X_test_scaled, y_test),callbacks=[es, mc])
    
    # Get the best model from the RandomizedSearchCV
    best_model = random_search.best_estimator_
    
    # Get the best parameters from the RandomizedSearchCV
    best_params = random_search.best_params_

    # create and fit final model with best parameters
    final_model = create_model_classification(n_features=n_features, unique_classes=unique_classes, 
                                            dropout_rate=best_params['model__dropout_rate'],
                                           activation_function=best_params['model__activation_function'],
                                           optimizer=best_params['model__optimizer'],
                                           num_hidden_layers=best_params['model__num_hidden_layers'],
                                           num_neurons_per_layer=best_params['model__num_neurons_per_layer'],
                                           kernel_regularizer=best_params['model__kernel_regularizer'],
                                           early_stop_patience=best_params['model__early_stop_patience'])
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=verbose, patience=best_params['model__early_stop_patience'])
    mc = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', verbose=verbose, save_best_only=True)
    history = final_model.fit(X_train_scaled, y_train, validation_data=(X_test_scaled, y_test),
                              epochs=best_params['epochs'], batch_size=best_params['batch_size'], callbacks=[es, mc])
    
    # Plotting the training and validation loss
    if plot_loss:
        plt.figure(figsize=(10, 6))
        plt.plot(history.history['loss'], label='Training Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.title('Loss Plot')
        plt.show()

    # Save the model
    if model_save:
        model_save_path = os.path.join(save_directory, 'best_classification_ANN_model_with_GridSearch.h5') if save_directory else 'best_classification_ANN_model_GridSearch.h5'
        final_model.save(model_save_path)

    # Predict target values for training and test data
    if unique_classes > 2:
        y_pred_train = np.argmax(final_model.predict(X_train_scaled), axis=1)
        y_pred_test = np.argmax(final_model.predict(X_test_scaled), axis=1)
        # Use inverse_transform only if you use OneHotEncoder during preprocessing
        y_train_original = np.argmax(y_train, axis=1)
        y_test_original = np.argmax(y_test, axis=1)
    else:
        y_pred_train = (final_model.predict(X_train_scaled) > 0.5).astype('int32')
        y_pred_test = (final_model.predict(X_test_scaled) > 0.5).astype('int32')
        # No need for inverse_transform, use original y_train and y_test
        y_train_original = y_train
        y_test_original = y_test
    
    # Calculate metrics
    train_accuracy = accuracy_score(y_train_original, y_pred_train)
    test_accuracy = accuracy_score(y_test_original, y_pred_test)
    conf_mat = confusion_matrix(y_test_original, y_pred_test)
    class_report = classification_report(y_test_original, y_pred_test, output_dict=True, zero_division=1)
    
    # Predict values for new data
    if new_data is not None:
        new_X = new_data.drop([dep_var] + drop_columns, axis=1).values
        new_y = new_data[dep_var].values

        new_X_scaled = scaler.fit_transform(new_X)
        # Predict target values for new data
        if unique_classes > 2:
            new_y_pred = np.argmax(final_model.predict(new_X_scaled), axis=1)
        else:
            new_y_pred = (final_model.predict(new_X_scaled) > 0.5).astype('int32')

        new_y_residual = new_y - new_y_pred.flatten()
    else:
        new_y_pred = None
        new_y_residual = None
    
    # Create a simple report
    simple_report = {
        'Accuracy': round(class_report['accuracy'], 3),
        'Precision': round(class_report['macro avg']['precision'], 3),
        'Recall': round(class_report['macro avg']['recall'], 3),
        'F1-score': round(class_report['macro avg']['f1-score'], 3),
        'Supoort': round(class_report['macro avg']['support'], 3)
    }
    
    # Create a dataframe with results
    results = {
        "Model": ["ANN Classification with Grid Search(Best Model)"],
        "Train Accuracy": [train_accuracy],
        "Test Accuracy": [test_accuracy],
        "Confusion Matrix": [conf_mat.tolist()],
        "Classification Report": [simple_report],
        "Hyperparameters": [best_params],
        "New Predicted": [new_y_pred] if new_y_pred is not None else [None],
        "New Residual": [new_y_residual] if new_y_residual is not None else [None]
    }

    return pd.DataFrame(results)



In [None]:
df = pd.read_csv('/home/young78703/Data_Science_Project/data/iris.csv')
new_data = df.sample(5)
# Preprocess a categorical column (ordinal variable) using mapping
# Define a dictionary to map the categorical values to integers
mapping = {'setosa': 0, 'versicolor': 1, 'virginica': 2}

# Use the map method to apply the dictionary to the column
df['species'] = df['species'].map(mapping)

result=ANN_classification_with_random_search(df, 'species', new_data=new_data)