## Training MNIST using conventional CNN

In [None]:
# import keras library
import keras

In [None]:
# define constants including batch size, number of classes, epochs and image dimensions.

BATCH_SIZE = 128
NUM_CLASSES = 10
EPOCHS = 30

img_rows, img_cols = 28, 28

In [None]:
# import and load the MNIST dataset

from keras.datasets import mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [None]:
# use the keras backend module to check the image data format and reshape the data accordingly.
# if the format is 'channels_fist', we reshape the data to have the channel dimension first.

from keras import backend as K

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

In [None]:
# convert the pixel values of the input images to floating-point format and normalize the images

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


In [None]:
# convert class vectors to binary class matrices

y_train = keras.utils.to_categorical(y_train, NUM_CLASSES)
y_test = keras.utils.to_categorical(y_test, NUM_CLASSES)

In [None]:
# define the architechture of the model

from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(NUM_CLASSES, activation='softmax'))

In [None]:
# compile the model

model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adam(),
              metrics=['accuracy'])

In [None]:
# fit the model

model.fit(x_train, y_train,
          batch_size=BATCH_SIZE,
          epochs=EPOCHS,
          verbose=1,
          validation_data=(x_test, y_test))

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100

KeyboardInterrupt: ignored

In [None]:
# evaluate the model

score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

## Implementing MPSO

In [None]:
# import libraries

import tensorflow as tf
import numpy as np
import random

In [None]:
# define search space
search_space = {'nC': [2, 4, 6], 'nP': [2, 4, 6], 'nF': [2, 4, 6],
                'c_nf': [8, 16, 32], 'c_fs': [3, 5, 7], 'c_pp': [1, 2, 3],
                'c_ss': [1, 2, 3], 'p_fs': [2, 3, 4], 'p_ss': [1, 2, 3],
                'p_pp': [1, 2, 3], 'op': [64, 128, 256]}

In [None]:
class MPSOCNN:
    def __init__(self, search_space, n_particles=5, n_levels=2,
                 n_iterations=10, learning_rate=0.001):
        self.search_space = search_space
        self.n_particles = n_particles
        self.n_levels = n_levels
        self.n_iterations = n_iterations
        self.learning_rate = learning_rate
        self.inertia_weight = 0.9
        self.c1, self.c2 = 2, 2
        self.n_input = 28
        self.n_classes = 10

    def initialize_swarm(self):
        # Initialize level-1 swarm
        swarm_l1 = [[{'nC': random.choice(self.search_space['nC']),
                      'nP': random.choice(self.search_space['nP']),
                      'nF': random.choice(self.search_space['nF'])}
                     for _ in range(self.n_particles)]]

        # Initialize level-2 swarms
        swarm_l2 = [[{'c_nf': random.choice(self.search_space['c_nf']),
                      'c_fs': random.choice(self.search_space['c_fs']),
                      'c_pp': random.choice(self.search_space['c_pp']),
                      'c_ss': random.choice(self.search_space['c_ss']),
                      'p_fs': random.choice(self.search_space['p_fs']),
                      'p_ss': random.choice(self.search_space['p_ss']),
                      'p_pp': random.choice(self.search_space['p_pp']),
                      'op': random.choice(self.search_space['op'])}
                     for _ in range(self.n_particles)]
                    for _ in range(self.n_particles)]

        return swarm_l1 + swarm_l2

    def evaluate_fitness(self, swarm):
        fitness = np.zeros(shape=(len(swarm), len(swarm[0])))
        for i in range(len(swarm)):
            for j in range(len(swarm[0])):
                cnn = self.build_cnn(swarm[i][j])
                accuracy = self.train_and_evaluate(cnn)
                fitness[i][j] = accuracy
        return fitness

    def update_velocity_and_position(self, swarm, fitness, pbest, gbest):
        weight = self.inertia_weight
        for i in range(len(swarm)):
            for j in range(len(swarm[0])):
                for k in range(len(swarm[0][0])):
                    r1, r2 = random.uniform(0, 1), random.uniform(0, 1)
                    cognitive_velocity = self.c1 * r1 * (pbest[i][j][k] - swarm[i][j][k])
                    social_velocity = self.c1 * r2 * (gbest[i][k] - swarm[i][j][k])
                    new_velocity = weight * swarm[i][j][k] + cognitive_velocity + social_velocity
                    swarm[i][j][k] = np.clip(swarm[i][j][k] + self.learning_rate * new_velocity,
                                             self.search_space[list(self.search_space.keys())[k]][0],
                                             self.search_space[list(self.search_space.keys())[k]][-1])
        return swarm

    def optimize(self):
        swarm = self.initialize_swarm()
        pbest_l1, pbest_l2, gbest = swarm.copy(), swarm.copy(), swarm.copy()
        for it in range(self.n_iterations):
            fitness = self.evaluate_fitness(swarm)
            for i in range(len(swarm)):
                for j in range(len(swarm[0])):
                    if fitness[i][j] > pbest_l1[i][j][-1]:
                        pbest_l1[i][j], pbest_l2[i][j], gb = swarm[i][j][:3], swarm[i][j][3:], np.max(fitness[i])
                        gbest[i] = [gb] * len(swarm[i])
            swarm = self.update_velocity_and_position(swarm, fitness, pbest_l2, gbest)
        return pbest_l1, gbest

    def train_and_evaluate(self, cnn):
        # Build the CNN model
        n_filters = [cnn[f'c_nf_{i+1}'] for i in range(cnn['nC'])]
        filter_size = [cnn[f'c_fs_{i+1}'] for i in range(cnn['nC'])]
        max_pool_size = [cnn[f'p_fs_{i+1}'] for i in range(cnn['nP'])]
        fc_layers = [cnn[f'op_{i+1}'] for i in range(cnn['nF'])]
        model = tf.keras.Sequential([
            tf.keras.layers.Input(shape=(self.n_input, self.n_input, 1)),
            *[tf.keras.layers.Conv2D(n_filters[i], filter_size[i],
                                     padding='valid', activation='relu')
              for i in range(cnn['nC'])],
            *[tf.keras.layers.MaxPooling2D(pool_size=max_pool_size[i],
                                            strides=max_pool_size[i])
              for i in range(cnn['nP'])],
            tf.keras.layers.Flatten(),
            *[tf.keras.layers.Dense(fc_layers[i], activation='relu')
              for i in range(cnn['nF'])],
            tf.keras.layers.Dense(self.n_classes, activation='softmax')
        ])
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

        # Train the CNN model
        mnist = tf.keras.datasets.mnist
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train, x_test = x_train / 255.0, x_test / 255.0
        x_train, x_test = x_train[..., tf.newaxis], x_test[..., tf.newaxis]
        y_train, y_test = tf.one_hot(y_train, depth=self.n_classes), tf.one_hot(y_test, depth=self.n_classes)
        history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=8)

        # Evaluate the CNN model
        return history.history['val_accuracy'][-1]

    def build_cnn(self, particle):
        cnn = {}
        nC, nP, nF = particle['nC'], particle['nP'], particle['nF']
        cnn['nC'], cnn['nP'], cnn['nF'] = nC, nP, nF
        for i in range(nC):
            cnn[f'c_nf_{i+1}'] = particle[f'c_nf_{i+1}']
            cnn[f'c_fs_{i+1}'] = particle[f'c_fs_{i+1}']
            cnn[f'c_pp_{i+1}'] = particle[f'c_pp_{i+1}']
            cnn[f'c_ss_{i+1}'] = particle[f'c_ss_{i+1}']
        for i in range(nP):
            cnn[f'p_fs_{i+1}'] = particle[f'p_fs_{i+1}']
            cnn[f'p_ss_{i+1}'] = particle[f'p_ss_{i+1}']
            cnn[f'p_pp_{i+1}'] = particle[f'p_pp_{i+1}']
        for i in range(nF):
            cnn[f'op_{i+1}'] = particle[f'op_{i+1}']
        return cnn

In [None]:
mpso_cnn = MPSOCNN(search_space, n_particles=5, n_levels=2, n_iterations=10, learning_rate=0.001)
best_pbest_l1, best_gbest = mpso_cnn.optimize()