In [2]:
import math
import random
import numpy as np
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.datasets import cifar10, cifar100, mnist
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dropout
import pywt
from datetime import datetime

# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
# Reshape and normalize data
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255
# Convert labels to one-hot encoded format
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)
# Split the training data into training and validation sets
train_images, val_images, train_labels, val_labels = train_test_split(train_images, train_labels, test_size=0.2)

gbests_fitnesses_l2 = []

def sigmoid(t):
    return 1 / (1 + math.exp(10*t - 2))

def wavelet_mutation(particle):
    # Here we use 'db2' wavelet, but different wavelets can be experimented with
    (cA, cD) = pywt.dwt(particle, 'db2')
    # Adding Gaussian noise to the approximation coefficients
    cA += np.random.normal(0, 0.1, len(cA))
    # Inverse DWT to get the mutated particle
    return pywt.idwt(cA, cD, 'db2')

def create_model(params, conv_layers, pool_layers, dense_layers):
    stride_size_conv = int(params[0])
    filter_size_conv = int(params[1])
    stride_size_pool = int(params[2])
    filter_size_pool = int(params[3])
    padding_pixels_conv = 'same' if int(params[4]) == 1 else 'valid'
    padding_pixels_pool = 'same' if int(params[5]) == 1 else 'valid'
    num_filters = int(params[6])
    num_neurons = int(params[7])

    input_size = 28  # size of the input images

    model = Sequential()

    # Add convolutional layers
    for _ in range(conv_layers):
      if input_size - filter_size_conv < 0:
            break
      model.add(Conv2D(filters=num_filters, kernel_size=(filter_size_conv, filter_size_conv), strides=(stride_size_conv, stride_size_conv), padding=padding_pixels_conv, activation='relu'))
      model.add(Dropout(0.2))  # 20% Dropout
      input_size = (input_size - filter_size_conv) // stride_size_conv + 1

    # Add pooling layers
    for _ in range(pool_layers):
      if input_size - filter_size_pool < 0:
            break
      model.add(MaxPooling2D(pool_size=(filter_size_pool, filter_size_pool), strides=(stride_size_pool, stride_size_pool), padding=padding_pixels_pool))
      input_size = (input_size - filter_size_pool) // stride_size_pool + 1

    if input_size > 0:
        model.add(Flatten())

        # Fully Connected Layers
        for _ in range(dense_layers):
            model.add(Dense(units=num_neurons, activation='relu'))
            model.add(Dropout(0.2))  # 20% Dropout

        model.add(Dense(10, activation='softmax'))

        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    else:
        model = None

    return model

def compute_fitness(model,it,par,t,j):
    print("iteration1:",it," particle1:",par,"iteration2:",t," particle2:",j)
    history = model.fit(train_images, train_labels, epochs=1, batch_size=128, validation_data=(val_images, val_labels))  # Use mini-batch size of 128
    accuracy = history.history['val_accuracy'][-1]  # Get the validation accuracy of the last epoch
    return accuracy

def update_velocity_position(particle, pbest, gbest, omega, fitness, fitness_values):
    r_p, r_g = np.random.rand(len(particle)), np.random.rand(len(particle))
    velocity = omega * particle + 2 * r_p * (pbest - particle) + 2 * r_g * (gbest - particle)
    new_particle = particle + velocity

    # Ensure the particle parameters remain within valid bounds
    new_particle[0:4] = np.clip(new_particle[0:4], 1, 13)  # stride_size_conv, filter_size_conv, stride_size_pool, filter_size_pool
    new_particle[0] = np.clip(new_particle[0], 1, 5)
    new_particle[2] = np.clip(new_particle[2], 1, 5)
    new_particle[4:6] = np.clip(new_particle[4:6], 0, 1)   # padding_pixels_conv, padding_pixels_pool
    new_particle[6] = np.clip(new_particle[6], 1, 64)      # num_filters
    new_particle[7] = np.clip(new_particle[7], 1, 1024)    # num_neurons

    if fitness < np.percentile(fitness_values, 25):
        new_particle = wavelet_mutation(new_particle)

    new_particle = np.round(new_particle)

    return new_particle

def update_personal_best(particles, fitness_values, personal_best_positions, personal_best_fitnesses):
    better_mask = fitness_values > personal_best_fitnesses
    personal_best_positions[better_mask] = particles[better_mask]
    personal_best_fitnesses[better_mask] = fitness_values[better_mask]
    return personal_best_positions, personal_best_fitnesses

def hybrid_MPSO_CNN_level2(particle, t_max2, omega, conv_layers, pool_layers, dense_layers, it, par):
    n = 4
    low = [1, 1, 1, 1, 0, 0, 1, 1]
    high = [5, 13, 5, 13, 1, 1, 64, 1024]
    particles = np.random.uniform(low=low, high=high, size=(n, len(low)))
    fitness_values = np.zeros(n)
    personal_best_positions = particles.copy()
    personal_best_fitnesses = fitness_values.copy()

    gbest_index = np.random.randint(n)  # Choose a random index for gbest
    gbest = particles[gbest_index].copy()
    gbest_fitness = 0

    for t in range(t_max2):
        for j in range(n):
            particles[j] = update_velocity_position(particles[j], personal_best_positions[j], gbest, omega, fitness_values[j], fitness_values)
            particles[j]=np.round(particles[j])
            CNN_model = create_model(particles[j], conv_layers, pool_layers, dense_layers)
            if CNN_model is None:
                continue
            fitness_values[j] = compute_fitness(CNN_model,it,par,t,j)
            personal_best_positions, personal_best_fitnesses = update_personal_best(particles, fitness_values, personal_best_positions, personal_best_fitnesses)
            if np.max(personal_best_fitnesses) > gbest_fitness:
                gbest = personal_best_positions[np.argmax(personal_best_fitnesses)]
                gbest_fitness = np.max(personal_best_fitnesses)
                gbests_fitnesses_l2.append(gbest_fitness)

    return gbest, gbest_fitness

def hybrid_MPSO_CNN_level1(t_max1, m):
    alpha = 0.5
    low = [1, 1, 1]
    high = [5, 5, 5]
    particles = np.random.uniform(low=low, high=high, size=(m, len(low)))  # Initialize particle positions in specified range
    fitness_values = np.zeros(m)
    personal_best_positions = particles.copy()
    personal_best_fitnesses = fitness_values.copy()

    gbest_index = np.random.randint(m)  # Choose a random index for gbest
    gbest = particles[gbest_index].copy()
    gbest_fitness = 0

    # Initializing list to store gbest_fitness for each iteration
    gbests_fitnesses_l1 = []

    for i in range(t_max1):
        if i < alpha*t_max1:
            omega = 0.9
        else:
            omega = sigmoid(i/t_max1)
        for j in range(m):
            particles = np.round(particles)
            num_conv_layers = int(particles[j][0])
            num_pool_layers = int(particles[j][1])
            num_dense_layers = int(particles[j][2])
            gbestl2, temp_fitness = hybrid_MPSO_CNN_level2(particles[j], t_max2=4, omega=omega, conv_layers=num_conv_layers, pool_layers=num_pool_layers, dense_layers=num_dense_layers, it=i, par=j)
            fitness_values[j] = temp_fitness
            personal_best_positions, personal_best_fitnesses = update_personal_best(particles, fitness_values, personal_best_positions, personal_best_fitnesses)
            if np.max(personal_best_fitnesses) > gbest_fitness:
                gbest = personal_best_positions[np.argmax(personal_best_fitnesses)]
                gbest_fitness = np.max(personal_best_fitnesses)
                gbests_fitnesses_l1.append(gbest_fitness)
                gbest_l2 = gbestl2

            r_p, r_g = np.random.rand(len(particles[j])), np.random.rand(len(particles[j]))
            velocity = omega * particles[j] + 2 * r_p * (personal_best_positions[j] - particles[j]) + 2 * r_g * (gbest - particles[j])
            particles[j] = particles[j] + velocity

            particles[j] = np.round(particles[j])

            particles[j] = np.clip(particles[j], 1, 5)

    return gbest, gbest_l2, gbest_fitness, gbests_fitnesses_l1

In [3]:
# Running the hybrid PSO-CNN optimization
start_time = datetime.now()
gbest, gbest_l2, gbest_fitness, gbests_fitnesses_l1 = hybrid_MPSO_CNN_level1(t_max1=4, m=4)
end_time = datetime.now()
print(f"Global Best Particle: {gbest} {gbest_l2}\nGlobal Best Fitness (Accuracy): {gbest_fitness}")
print("total time is %s" % (str(end_time - start_time)))


iteration1: 0  particle1: 0 iteration2: 0  particle2: 0
iteration1: 0  particle1: 0 iteration2: 0  particle2: 1
iteration1: 0  particle1: 0 iteration2: 0  particle2: 2
iteration1: 0  particle1: 0 iteration2: 0  particle2: 3
iteration1: 0  particle1: 0 iteration2: 1  particle2: 0
iteration1: 0  particle1: 0 iteration2: 1  particle2: 1
iteration1: 0  particle1: 0 iteration2: 1  particle2: 2
iteration1: 0  particle1: 0 iteration2: 1  particle2: 3
iteration1: 0  particle1: 0 iteration2: 2  particle2: 0
iteration1: 0  particle1: 0 iteration2: 2  particle2: 1
iteration1: 0  particle1: 0 iteration2: 2  particle2: 2
iteration1: 0  particle1: 0 iteration2: 2  particle2: 3
iteration1: 0  particle1: 0 iteration2: 3  particle2: 0
iteration1: 0  particle1: 0 iteration2: 3  particle2: 1
iteration1: 0  particle1: 0 iteration2: 3  particle2: 2
iteration1: 0  particle1: 0 iteration2: 3  particle2: 3
iteration1: 0  particle1: 1 iteration2: 0  particle2: 0
iteration1: 0  particle1: 1 iteration2: 0  parti

In [4]:
# Load CIFAR-10 dataset
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()
# Reshape and normalize data
train_images = train_images.reshape((50000, 32, 32, 3)).astype('float32') / 255
test_images = test_images.reshape((10000, 32, 32, 3)).astype('float32') / 255
# Convert labels to one-hot encoded format
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)
# Split the training data into training and validation sets
train_images, val_images, train_labels, val_labels = train_test_split(train_images, train_labels, test_size=0.2)
def create_model(params, conv_layers, pool_layers, dense_layers):
    stride_size_conv = int(params[0])
    filter_size_conv = int(params[1])
    stride_size_pool = int(params[2])
    filter_size_pool = int(params[3])
    padding_pixels_conv = 'same' if int(params[4]) == 1 else 'valid'
    padding_pixels_pool = 'same' if int(params[5]) == 1 else 'valid'
    num_filters = int(params[6])
    num_neurons = int(params[7])

    input_size = 32  # size of the input images

    model = Sequential()

    # Add convolutional layers
    for _ in range(conv_layers):
      if input_size - filter_size_conv < 0:
            break
      model.add(Conv2D(filters=num_filters, kernel_size=(filter_size_conv, filter_size_conv), strides=(stride_size_conv, stride_size_conv), padding=padding_pixels_conv, activation='relu'))
      model.add(Dropout(0.2))  # 20% Dropout
      input_size = (input_size - filter_size_conv) // stride_size_conv + 1

    # Add pooling layers
    for _ in range(pool_layers):
      if input_size - filter_size_pool < 0:
            break
      model.add(MaxPooling2D(pool_size=(filter_size_pool, filter_size_pool), strides=(stride_size_pool, stride_size_pool), padding=padding_pixels_pool))
      input_size = (input_size - filter_size_pool) // stride_size_pool + 1

    if input_size > 0:
        model.add(Flatten())

        # Fully Connected Layers
        for _ in range(dense_layers):
            model.add(Dense(units=num_neurons, activation='relu'))
            model.add(Dropout(0.2))  # 20% Dropout

        model.add(Dense(10, activation='softmax'))

        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    else:
        model = None

    return model
# Running the hybrid PSO-CNN optimization
start_time = datetime.now()
gbest, gbest_l2, gbest_fitness, gbests_fitnesses_l1 = hybrid_MPSO_CNN_level1(t_max1=4, m=4)
end_time = datetime.now()
print(f"Global Best Particle: {gbest} {gbest_l2}\nGlobal Best Fitness (Accuracy): {gbest_fitness}")
print("total time is %s" % (str(end_time - start_time)))

iteration1: 0  particle1: 0 iteration2: 0  particle2: 0
iteration1: 0  particle1: 0 iteration2: 0  particle2: 1
iteration1: 0  particle1: 0 iteration2: 0  particle2: 2
iteration1: 0  particle1: 0 iteration2: 0  particle2: 3
iteration1: 0  particle1: 0 iteration2: 1  particle2: 0
iteration1: 0  particle1: 0 iteration2: 1  particle2: 1
iteration1: 0  particle1: 0 iteration2: 1  particle2: 2
iteration1: 0  particle1: 0 iteration2: 1  particle2: 3
iteration1: 0  particle1: 0 iteration2: 2  particle2: 0
iteration1: 0  particle1: 0 iteration2: 2  particle2: 1
iteration1: 0  particle1: 0 iteration2: 2  particle2: 2
iteration1: 0  particle1: 0 iteration2: 2  particle2: 3
iteration1: 0  particle1: 0 iteration2: 3  particle2: 0
iteration1: 0  particle1: 0 iteration2: 3  particle2: 1
iteration1: 0  particle1: 0 iteration2: 3  particle2: 2
iteration1: 0  particle1: 0 iteration2: 3  particle2: 3
iteration1: 0  particle1: 1 iteration2: 0  particle2: 0
iteration1: 0  particle1: 1 iteration2: 0  parti