This code was used to optimise our cnn with optuna, resulting in 2 high performing models:

1. cnn_optimised_adam.keras
- 4 Conv2D Layers 32->64->64->128 with batch normalisation and maxpool between layers
- dropout of around 0.39 after last Conv2D layer
- Global Maxpool
- Dense(64) with Dropout of aroung 0.29
- optimizer = adam
- learning rate of 0.000724

2. cnn_optimised_sgd.keras
- 3 Conv2D Layers 32->128->128 with batch normalisation and maxpool between layers
- dropout of around 0.42 after last Conv2D layer
- Global Maxpool
- Dense(256) with Dropout of aroung 0.25
- optimizer = sgd
- learning rate of 0.000139

In [None]:
from pathlib import Path
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Dense, Dropout, BatchNormalization, GlobalAveragePooling2D, GlobalMaxPooling2D, Input
from tensorflow.keras.optimizers import Adam, RMSprop, SGD
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import f1_score, classification_report, confusion_matrix
import optuna
import optuna.visualization as vis
from datetime import datetime

# Parameters
N_TRIALS = 25
EPOCHS = 30
KFOLD_SEED = 42
N_SPLITS = 5
#BATCH_SIZE = 16
LABELS = ['no', 'yes']
IMG_SIZE = 250

np.random.seed(42)
tf.random.set_seed(42)


def get_data(data_dir):
    X, y = [], []
    data_dir = Path(data_dir)
    for label in LABELS:
        path = data_dir / label
        class_num = LABELS.index(label)
        for img_file in path.iterdir():
            try:
                img_arr = cv2.imread(str(img_file))[..., ::-1]
                resized_arr = cv2.resize(img_arr, (IMG_SIZE, IMG_SIZE))
                X.append(resized_arr)
                y.append(class_num)
            except Exception as e:
                print(f"Error reading {img_file.name}: {e}")
    X = np.array(X)
    y = np.array(y)
    X = X / 255.0
    y = y.astype('float32')

    return X, y


def augment_images(x, y):
    x_aug, y_aug = [], []
    for img, label in zip(x, y):
        # 0¬∞ rotation (original)
        x_aug.append(img)
        y_aug.append(label)
        x_aug.append(np.fliplr(img))  # horizontal flip
        y_aug.append(label)
        x_aug.append(np.flipud(img))  # vertical flip
        y_aug.append(label)

        # 90¬∞ rotation + flips
        rot_90 = np.rot90(img, 1)
        x_aug.append(rot_90)
        y_aug.append(label)
        x_aug.append(np.fliplr(rot_90))  # horizontal flip of 90¬∞
        y_aug.append(label)
        x_aug.append(np.flipud(rot_90))  # vertical flip of 90¬∞
        y_aug.append(label)

        # 180¬∞ rotation (no flips)
        rot_180 = np.rot90(img, 2)
        x_aug.append(rot_180)
        y_aug.append(label)

        # 270¬∞ rotation (no flips)
        rot_270 = np.rot90(img, 3)
        x_aug.append(rot_270)
        y_aug.append(label)

    return np.array(x_aug), np.array(y_aug)



def create_model(params):
    model = Sequential()
    model.add(Input(shape=(IMG_SIZE, IMG_SIZE, 3)))

    # Convert conv_filters string to tuple of ints
    conv_filters = tuple(map(int, params['conv_filters'].split(',')))

    for filters in conv_filters:
        model.add(Conv2D(filters, (3, 3), padding="same", activation="relu"))
        model.add(BatchNormalization())
        model.add(MaxPool2D())

    model.add(Dropout(params['dropout_conv']))

    if params['pooling_type'] == 'avg':
        model.add(GlobalAveragePooling2D())
    else:
        model.add(GlobalMaxPooling2D())

    model.add(Dense(params['dense_units'], activation='relu'))
    model.add(Dropout(params['dropout_dense']))
    model.add(Dense(1, activation='sigmoid'))

    optimizer = {
        'adam': Adam,
        'rmsprop': RMSprop,
        'sgd': SGD
    }[params['optimizer_name']](learning_rate=params['learning_rate'])

    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    return model

def train_model(model, x_train, y_train, x_val, y_val, params, trial_number=None, fold_number=None):
    early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-5)
    # get date and time
    now = datetime.now()
    date_time = now.strftime("%Y%m%d")
    # Create checkpoint filepath with trial and fold info
    if trial_number is not None and fold_number is not None:
        checkpoint_filepath = f'Model_{date_time}_trial_{trial_number}.h5'
    else:
        checkpoint_filepath = f'{date_time}_model_checkpoint.h5'

    checkpoint_cb = ModelCheckpoint(checkpoint_filepath, save_best_only=True, monitor='val_loss', mode='min', verbose=1)

    history = model.fit(x_train, y_train,
                        batch_size=params['batch_size'],
                        epochs=EPOCHS,
                        validation_data=(x_val, y_val),
                        callbacks=[early_stop, lr_scheduler, checkpoint_cb],
                        verbose=1,
                        shuffle=True)
    return model


def objective(trial, x_data=None, y_data=None):
    print(f"\nüîç Starting trial {trial.number}...")

    x_train, _, y_train, _ = train_test_split(x_data, y_data, test_size=0.2, stratify=y_data, random_state=42)
    print(f"  Split data: {x_train.shape[0]} training samples")
    kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=KFOLD_SEED)

    params = {
        'optimizer_name': trial.suggest_categorical('optimizer_name', ('adam', 'rmsprop', 'sgd')),
        'learning_rate': trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True),
        'conv_filters': trial.suggest_categorical('conv_filters', [
                '32,64,128',
                '32,128,128',
                '64,128,256',
                '32,64,64,128',
                '32,64,128,128',
                '64,64,128,256',
            ]),
        'dense_units': trial.suggest_categorical('dense_units', (64, 128, 256)),
        'dropout_conv': trial.suggest_float('dropout_conv', 0.25, 0.5),
        'dropout_dense': trial.suggest_float('dropout_dense', 0.25, 0.5),
        'pooling_type': trial.suggest_categorical('pooling_type', ('avg', 'max')),
        'batch_size': trial.suggest_categorical('batch_size', (8, 16, 32)),
    }

    print(f"  Hyperparameters: {params}")
    f1_scores = []
    all_y_val = []
    all_y_pred = []

    for fold_num, (train_idx, val_idx) in enumerate(kf.split(x_train), 1):
        print(f"  ‚û°Ô∏è Fold {fold_num}/{N_SPLITS}")
        x_tr, x_val = x_train[train_idx], x_train[val_idx]
        y_tr, y_val = y_train[train_idx], y_train[val_idx]
        x_tr, y_tr = augment_images(x_tr, y_tr)
        print(f"    Augmented training data: {x_tr.shape[0]} samples")

        model = create_model(params)
        print(f"    Model created. Starting training...")
        model = train_model(model, x_tr, y_tr, x_val, y_val, params, trial_number=trial.number, fold_number=fold_num)
        print(f"    Training completed. Evaluating model...")

        y_pred_prob = model.predict(x_val)
        y_pred = (y_pred_prob > 0.5).astype(int).flatten()
        fold_f1 = f1_score(y_val, y_pred)
        print(f"    Fold {fold_num} F1 score: {fold_f1:.4f}")
        f1_scores.append(fold_f1)

        # Collect all validation labels and predictions
        all_y_val.extend(y_val)
        all_y_pred.extend(y_pred)

    # After all folds, print confusion matrix and classification report
    print("\nConfusion Matrix:")
    print(confusion_matrix(all_y_val, all_y_pred))

    print("\nClassification Report:")
    print(classification_report(all_y_val, all_y_pred, target_names=LABELS))

    avg_f1 = np.mean(f1_scores)
    print(f"  üèÅ Trial {trial.number} average F1 score: {avg_f1:.4f}")
    return avg_f1



def run_optuna_pipeline(x_data=None, y_data=None):
    study = optuna.create_study(direction='maximize')
    study.optimize(lambda trial: objective(trial, x_data, y_data), n_trials=N_TRIALS)
    print("\nüéâ Optimization finished. Best trial:")
    trial = study.best_trial
    print(f"  F1 Score: {trial.value:.4f}")
    print("  Params:")
    for key, value in trial.params.items():
        print(f"    {key}: {value}")

    # After your study is done:
    fig = vis.plot_parallel_coordinate(study, target_name="F1 Score")
    fig.show()

    fig = vis.plot_param_importances(study)
    fig.show()

    fig = vis.plot_slice(study, target_name="F1 Score")
    fig.show()

    fig = vis.plot_optimization_history(study)
    fig.show()

    fig = vis.plot_intermediate_values(study)
    fig.show()

    fig = vis.plot_edf(study)
    fig.show()

    fig = vis.plot_contour(study, target_name="F1 Score")
    fig.show()

def main():
    print("üîç Starting data loading...")
    x_data, y_data = get_data('data')
    print(f"  Loaded data: {x_data.shape[0]} samples")
    run_optuna_pipeline(x_data=x_data, y_data=y_data)

if __name__ == '__main__':
    main()