<a href="https://colab.research.google.com/github/jazbengu/COS711-ASSIGNMENT-TWO/blob/main/Joy_Bengu_25000307_COS711_Assignment_Two_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install scikeras
#!pip install --force-reinstall tensorflow



Collecting scikeras
  Downloading scikeras-0.13.0-py3-none-any.whl.metadata (3.1 kB)
Downloading scikeras-0.13.0-py3-none-any.whl (26 kB)
Installing collected packages: scikeras
Successfully installed scikeras-0.13.0


In [4]:
import numpy as np
import pandas as pd
import seaborn as sns

from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.optimizers import Optimizer
from tensorflow.keras.optimizers import RMSprop
from scikeras.wrappers import KerasClassifier
import matplotlib.pyplot as plt




In [5]:

def load_data(file_path):
    data = pd.read_csv(file_path)
    imputer = SimpleImputer(strategy='mean')
    data.iloc[:, :-1] = imputer.fit_transform(data.iloc[:, :-1])  # Assuming last column is labels

    X = data.iloc[:, :-1]
    y = data.iloc[:, -1]

    y = pd.get_dummies(y).values

    return X, y

def preprocess_data(X_train, X_test):
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    return X_train, X_test


In [6]:
class RProp(Optimizer):
    def __init__(self, learning_rate=0.01, **kwargs):
        super(RProp, self).__init__(learning_rate=learning_rate, **kwargs)
        self.learning_rate = learning_rate
        self.eta_plus = 1.2
        self.eta_minus = 0.5
        self.delta_min = 1e-6
        self.delta_max = 50
        self.delta_zero = 0.1

    def get_updates(self, loss, params):
        grads = self.get_gradients(loss, params)
        updates = []
        for param, grad in zip(params, grads):
            delta = K.variable(self.delta_zero, dtype=K.dtype(param))
            delta_prev = K.variable(self.delta_zero, dtype=K.dtype(param))
            grad_prev = K.variable(K.zeros_like(param), dtype=K.dtype(param))
            updates.append((delta, K.switch(K.greater(grad * grad_prev, 0), K.minimum(delta * self.eta_plus, self.delta_max), K.maximum(delta * self.eta_minus, self.delta_min))))
            updates.append((delta_prev, delta))
            updates.append((grad_prev, grad))
            updates.append((param, param - K.sign(grad) * delta))
        return updates

    def update_step(self, gradient, variable, learning_rate):
        delta = K.variable(self.delta_zero, dtype=K.dtype(variable))
        delta_prev = K.variable(self.delta_zero, dtype=K.dtype(variable))
        grad_prev = K.variable(K.zeros_like(variable), dtype=K.dtype(variable))
        delta_new = K.switch(K.greater(gradient * grad_prev, 0), K.minimum(delta * self.eta_plus, self.delta_max), K.maximum(delta * self.eta_minus, self.delta_min))
        variable_new = variable - K.sign(gradient) * delta_new
        return variable_new, delta_new, grad_prev

    def get_config(self):
        config = {'learning_rate': self.learning_rate}
        base_config = super(RProp, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

In [7]:
def create_model(optimizer='adam', learning_rate=0.001, activation='relu'):
    model = Sequential()
    model.add(Dense(128, input_dim=13, activation=activation))  # 12 input features
    model.add(Dense(64, activation=activation))
    model.add(Dense(3, activation='softmax'))  # 3 classes

    # Initialize the optimizer based on the chosen or default optimizer type
    if optimizer == 'adam':
        # Providing a default learning rate for Adam if not specified
        learning_rate = learning_rate or 0.001
        opt = Adam(learning_rate=learning_rate)
    elif optimizer == 'sgd':
        # Providing a default learning rate for SGD if not specified
        learning_rate = learning_rate or 0.001
        opt = SGD(learning_rate=learning_rate)
    elif optimizer == 'rprop':  # Use RProp Optimizer
       opt = RProp(learning_rate=learning_rate)
    else:
        raise ValueError("Unsupported optimizer type")

    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    return model


In [8]:
from scikeras.wrappers import KerasClassifier
def perform_grid_search(X_train, y_train):
    model = KerasClassifier(build_fn=create_model, verbose=0)
    param_grid = {
        'batch_size': [10, 20],
        'epochs': [5, 10],
        'optimizer': ['adam', 'sgd', 'rprop'],  # Add 'rprop'
        'model__learning_rate': [0.001, 0.01],  # Note: RProp doesn't use learning rate
        'model__activation': ['relu']
    }

    grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3)
    grid_result = grid.fit(X_train, y_train)

    print(f"Best: {grid_result.best_score_} using {grid_result.best_params_}")

    # Plotting results
    results_df = pd.DataFrame(grid_result.cv_results_)
    results_pivot = results_df.pivot_table(index='param_batch_size', columns='param_epochs', values='mean_test_score')

    plt.figure(figsize=(8, 6))
    sns.heatmap(results_pivot, annot=True, fmt=".3f", cmap="YlGnBu")
    plt.title("Hyperparameter Tuning: Batch Size vs Epochs")
    plt.xlabel("Epochs")
    plt.ylabel("Batch Size")
    plt.show()

    return grid_result.best_estimator_

In [None]:
def train_and_compare_algorithms(X_train, y_train, X_test, y_test):
    adam_model = create_model(optimizer='adam', learning_rate=0.001)
    history_adam = adam_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=20)

    sgd_model = create_model(optimizer='sgd', learning_rate=0.001)
    history_sgd = sgd_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=20)

    rprop_model = create_model(optimizer='rprop', learning_rate=0.001)
    history_rprop = rprop_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=20)

    plt.plot(history_adam.history['val_loss'], label='Adam')
    plt.plot(history_sgd.history['val_loss'], label='SGD')
    plt.plot(history_rprop.history['val_loss'], label='Rprop')
    plt.title('Validation Loss Comparison')
    plt.xlabel('Epochs')
    plt.ylabel('Validation Loss')
    plt.legend()
    plt.show()



In [9]:
def hybrid_learning(X_train, y_train, X_test, y_test):
    # Train individual models
    adam_model = create_model(optimizer='adam', learning_rate=0.001)
    adam_model.fit(X_train, y_train, epochs=20, batch_size=20, verbose=0)

    sgd_model = create_model(optimizer='sgd', learning_rate=0.001)
    sgd_model.fit(X_train, y_train, epochs=20, batch_size=20, verbose=0)

    rprop_model = create_model(optimizer='rprop')
    rprop_model.fit(X_train, y_train, epochs=20, batch_size=20, verbose=0)

    # Ensemble predictions by averaging
    adam_pred = adam_model.predict(X_test)
    sgd_pred = sgd_model.predict(X_test)
    rprop_pred = rprop_model.predict(X_test)

    # Average predictions from the models
    ensemble_pred = (adam_pred + sgd_pred + rprop_pred) / 3
    ensemble_pred = np.argmax(ensemble_pred, axis=1)  # Assuming classification

    y_test_labels = np.argmax(y_test, axis=1)  # Get true labels
    accuracy = np.mean(ensemble_pred == y_test_labels)

    print(f"Ensemble model accuracy: {accuracy}")


In [None]:
def main():
    X, y = load_data('Almond.csv')
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    X_train, X_test = preprocess_data(X_train, X_test)

    best_model = perform_grid_search(X_train, y_train)
    train_and_compare_algorithms(X_train, y_train, X_test, y_test)
    hybrid_learning(X_train, y_train, X_test, y_test)

if __name__ == "__main__":
    main()