In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import os
np.random.seed(1234)

from keras.callbacks import EarlyStopping, ModelCheckpoint
# from keras.utils.vis_utils import plot_model
# from scikeras.wrappers import KerasClassifier, KerasRegressor
from sklearn.model_selection import RandomizedSearchCV,KFold

## File processing

In [None]:
directory = "../CNN/Dataset for CA1 part A/"

In [None]:
BATCH_SIZE = 32
IMG_SIZE = (224, 224)
# Load train data
train_dataset = tf.keras.utils.image_dataset_from_directory(directory=f'{directory}/train',
                                                            shuffle=True,
                                                            batch_size=BATCH_SIZE,
                                                            image_size=IMG_SIZE)

# Load validation data
validation_dataset = tf.keras.utils.image_dataset_from_directory(directory=f'{directory}/validation',
                                                                 shuffle=True,    
                                                                 batch_size=BATCH_SIZE,
                                                                 image_size=IMG_SIZE)

# Load test data
test_dataset = tf.keras.utils.image_dataset_from_directory(directory=f'{directory}/test',
                                                                 shuffle=True,    
                                                                 batch_size=BATCH_SIZE,
                                                                 image_size=IMG_SIZE)                                                                 

In [None]:
# List vegetable classes
class_names = train_dataset.class_names
class_names

In [None]:
# Print total no. of training batches
print('Number of training batches: %d' % tf.data.experimental.cardinality(train_dataset).numpy())

## EDA

In [None]:
# Split datasets 
X_train = []
y_train = []

for images, labels in train_dataset:
    X_train.extend(images.numpy())
    y_train.extend(labels.numpy())

X_train = np.array(X_train)
y_train = np.array(y_train)

X_test = []
y_test = []

for images, labels in test_dataset:
    X_test.extend(images.numpy())
    y_test.extend(labels.numpy())

X_test = np.array(X_test)
y_test = np.array(y_test)


In [None]:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(3, 5, figsize=(20, 10))

for idx, subplot in enumerate(ax.ravel()):
    avg_image = np.mean(X_train[np.squeeze(y_train == idx)], axis=0) / 255
    subplot.imshow(avg_image, cmap="Greys")
    subplot.set_title(f"{class_names[idx]}")
    subplot.axis("off")

In [None]:
# Find value counts
labels, counts = np.unique(y_train, return_counts=True)
for label, count in zip(labels, counts):
    print(f"{class_names[label]}: {count}")

In [None]:
# Plot value counts
plt.barh(labels, counts, tick_label=list(class_names))
plt.show()


Radishes, Potatoes, Carrots, Capsicums, Cabbages, Papayas and Bottle Gourds are especially undersampled, perform augmentation to get more of such images

In [None]:
# Map iamge to label
train_dict = {tf.Tensor.ref(img): label for img, label in train_dataset.unbatch()}

In [None]:
# Subset undersampled classes
radishes = [img for img, label in train_dict.items() if label == 13]
radish_labels = [label for img, label in train_dict.items() if label == 13]


potatoes = [img for img, label in train_dict.items() if label == 11]
potato_labels = [label for img, label in train_dict.items() if label == 11]
carrots = [img for img, label in train_dict.items() if label == 7]
carrot_labels = [label for img, label in train_dict.items() if label == 7]
capsicums = [img for img, label in train_dict.items() if label == 6]
capsicum_labels = [label for img, label in train_dict.items() if label == 6]
cabbages = [img for img, label in train_dict.items() if label == 5]
cabbage_labels = [label for img, label in train_dict.items() if label == 5]
papayas = [img for img, label in train_dict.items() if label == 10]
papaya_labels = [label for img, label in train_dict.items() if label == 10]
bottle_gourds = [img for img, label in train_dict.items() if label == 2]
bottle_gourd_labels = [label for img, label in train_dict.items() if label == 2]

In [None]:
radishes = [radish.deref() for radish in radishes]
potatoes = [potato.deref() for potato in potatoes]
carrots = [carrot.deref() for carrot in carrots]
capsicums = [capsicum.deref() for capsicum in capsicums]
cabbages = [cabbage.deref() for cabbage in cabbages]
papayas = [papaya.deref() for papaya in papayas]
bottle_gourds = [bottle_gourd.deref() for bottle_gourd in bottle_gourds]

In [None]:
undersampled_vegs = np.concatenate((radishes, potatoes, carrots, capsicums, cabbages, papayas, bottle_gourds), axis=0)
undersampled_labels = np.concatenate((radish_labels, potato_labels, carrot_labels, capsicum_labels, cabbage_labels, papaya_labels, bottle_gourd_labels))
def data_augmentation(data):
    imageArr = []
    for images in data:
        image = tf.image.random_flip_left_right(images)
        image = tf.image.random_crop(
            image, size=(224,224,3)
        )
        imageArr.append(tf.reshape(image, (224, 224, 3)))
    return np.array(imageArr)
veg_train_aug = np.copy(undersampled_vegs)
veg_train_aug = data_augmentation(veg_train_aug)

In [None]:
X_train_aug = np.concatenate((X_train , veg_train_aug), axis=0)
y_train_aug = np.concatenate((y_train, undersampled_labels), axis=0)

In [None]:
with tf.device('/device:CPU:0'):
    train_ds = tf.data.Dataset.from_tensor_slices((X_train_aug, y_train_aug))
    train_ds = train_ds.shuffle(buffer_size=len(X_train_aug))  # Shuffle the data
    train_ds = train_ds.batch(32)

In [None]:
grayscale_train_ds = train_ds.map(lambda x, y: (tf.image.rgb_to_grayscale(x), y))  
train_ds = grayscale_train_ds.map(lambda x, y: (tf.image.resize(x, [31, 31]), y))
grayscale_val_ds = validation_dataset.map(lambda x, y: (tf.image.rgb_to_grayscale(x), y))
grayscale_test_ds = test_dataset.map(lambda x, y: (tf.image.rgb_to_grayscale(x), y))

val_ds_31 = grayscale_val_ds.map(lambda x, y: (tf.image.resize(x, [31, 31]), y))
test_ds_31 = grayscale_test_ds.map(lambda x, y: (tf.image.resize(x, [31, 31]), y))

## Model Improvements

In [None]:
epochs = 50

Optimising batchsize

In [None]:
# Run parametric experiment to tune dropout percentage
def createAndTrain_model_BatchSize(batchsize):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (7, 7), input_shape=(31, 31, 1), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(3, 3)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Conv2D(64, (5, 5), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dense(len(class_names), activation='softmax'))

    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    history = model.fit(train_ds, 
                        validation_data=val_ds_31, 
                        epochs=epochs, 
                        batch_size=batchsize, 
                        shuffle=True,
                        callbacks=[EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)])

    # Evaluate model on unseen data
    scores = model.evaluate(test_ds_31)
    testError = 100-scores[1]*100

    return history, scores[1], testError

In [None]:
# range of batch sizes
batchsizes = 2**np.arange(4, 9)

# initialize output results matrices
accuracyResultsTrain = np.zeros((epochs, len(batchsizes)))
accuracyResultsVal = np.zeros((epochs, len(batchsizes)))
accuracyResultsTest  = np.zeros((epochs, len(batchsizes)))

# loop over batch sizes
for bi in range(len(batchsizes)):
    history, score, testError = createAndTrain_model_BatchSize(batchsizes[bi])

    # store results
    accuracyResultsTrain[:,bi] = history.history['accuracy']
    accuracyResultsVal[:,bi] = history.history['val_accuracy']
    accuracyResultsTest[:,bi] = score

In [None]:
# plot accuracy results
fig,ax = plt.subplots(1,3,figsize=(17,7))

ax[0].plot(accuracyResultsTrain)
ax[0].set_title('Train accuracy')
ax[1].plot(accuracyResultsVal)
ax[1].set_title('Validation accuracy')
ax[2].plot(accuracyResultsTest)
ax[2].set_title('Test accuracy')

# common features
for i in range(3):
  ax[i].legend(batchsizes)
  ax[i].set_xlabel('Epoch')
  ax[i].set_ylabel('Accuracy (%)')
  ax[i].grid()

plt.show()

Tuning dropout

In [None]:
# Run parametric experiment to tune dropout percentage
def createAndTrain_model_Dropout(dropout):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (7, 7), input_shape=(31, 31, 1), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(3, 3)))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.Conv2D(64, (5, 5), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dense(len(class_names), activation='softmax'))

    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    history = model.fit(train_ds, 
                        validation_data=val_ds_31, 
                        epochs=epochs, 
                        batch_size=16,
                        shuffle=True,
                        callbacks=[EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)])

    # Evaluate model on unseen data
    scores = model.evaluate(test_ds_31)
    testError = 100-scores[1]*100

    return history, scores, testError

In [None]:
# specify number of hidden units
dropout_percentages = np.arange(0, 1, 0.1)

# initialize results matrix
results = np.zeros((len(dropout_percentages),4))

for i,npercent in enumerate(dropout_percentages):
  history, scores, testError = createAndTrain_model_Dropout(int(npercent))
  results[i,:] = [history.history['accuracy'][-1], history.history['val_accuracy'][-1], scores[1], testError ]


In [None]:
# Plot parametric experiment results
fig,ax = plt.subplots(1,2,figsize=(16,5))

ax[0].plot(dropout_percentages,results[:,:2],'s-')
ax[0].set_xlabel('Dropout percentage')
ax[0].set_ylabel('Accuracy (%)')
ax[0].set_title('Final model accuracy')
ax[0].legend(['Train','Val'])

ax[1].plot(dropout_percentages,results[:,2],'s-')
ax[1].set_xlabel('Dropout percentage')
ax[1].set_ylabel('Accuracy (%)')
ax[1].set_title('Final model scoring metrics')
ax[1].legend(['Test_Accuracy'])
# ax[1].set_ylim([0.87, 0.9])

plt.show()

Dropout of 0.3 seems to work best on the dataset

Tuning number of hidden units

In [None]:
# Run parametric experiment to tune number of hidden units on Dense layer
def createAndTrain_model_NumUnits(numUnits):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (7, 7), input_shape=(31, 31, 1), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(3, 3)))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Conv2D(64, (5, 5), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(numUnits, activation='relu'))
    model.add(tf.keras.layers.Dense(len(class_names), activation='softmax'))

    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    history = model.fit(train_ds, 
                        validation_data=val_ds_31, 
                        epochs=epochs, 
                        batch_size=16, 
                        shuffle=True,
                        callbacks=[EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)])

    # Evaluate model on unseen data
    scores = model.evaluate(test_ds_31)
    testError = 100-scores[1]*100

    return history, scores, testError

In [None]:
# specify number of hidden units
numberOfLinearUnits = np.round(np.linspace(5,500,20))

# initialize results matrix
results = np.zeros((len(numberOfLinearUnits),4))

for i,nunits in enumerate(numberOfLinearUnits):
  history, scores, testError = createAndTrain_model_NumUnits(int(nunits))
  results[i,:] = [ history.history['accuracy'][-1], history.history['val_accuracy'][-1], scores[1], testError ]

In [None]:
# Plot parametric experiment results
fig,ax = plt.subplots(1,2,figsize=(16,5))

ax[0].plot(numberOfLinearUnits,results[:,:2],'s-')
ax[0].set_xlabel('Number of units in final linear layer')
ax[0].set_ylabel('Accuracy (%)')
ax[0].set_title('Final model accuracy')
ax[0].legend(['Train','Val'])

ax[1].plot(numberOfLinearUnits,results[:,2],'s-')
ax[1].set_xlabel('Number of units in final linear layer')
ax[1].set_ylabel('Accuracy (%)')
ax[1].set_title('Final model scoring metrics')
ax[1].legend(['Test_Accuracy'])
# ax[1].set_ylim([0.87, 0.9])

plt.show()

In [None]:
numberOfLinearUnits

Seems like 396 units in the hidden layer yields the greatest test accuracy (of around 87%)

But, after playing around a little further with other hyperparameters, we get an even higher test accuracy of:

In [None]:
monitor_val_acc = EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True)
model_checkpoint_31 = ModelCheckpoint('best_31_CNN_version2.h5', save_best_only = True)

def make_finalModel():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (7, 7), input_shape=(31, 31, 1), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.1))
    model.add(tf.keras.layers.MaxPool2D(pool_size=(3, 3)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Conv2D(64, (5, 5), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.1))
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Conv2D(64, (5, 5), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.1))
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Conv2D(64, (5, 5), activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.1))
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(500, activation='relu'))
    model.add(tf.keras.layers.Dense(len(class_names), activation='softmax'))

    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

model = make_finalModel()
history = model.fit(train_ds, 
                    validation_data=val_ds_31, 
                    epochs=epochs, 
                    batch_size=16, 
                    shuffle=True, 
                    callbacks=[monitor_val_acc, model_checkpoint_31])
# Evaluate model on unseen data
scores = model.evaluate(test_ds_31)
testError = 100-scores[1]*100
print(f'Model test accuracy on 31x31 images: {round(scores[1]*100,2)}%\nModel test error on 31x31 images: {round(testError,2)}%')

In [None]:
final_model = make_finalModel()
final_model.load_weights('best_31_CNN_version2.h5')
# Re-evaluate the model
loss, acc = final_model.evaluate(test_ds_31, verbose=2)
print("Restored model, accuracy: {:5.2f}%".format(100 * acc))

In [None]:
from sklearn.metrics import classification_report
y_pred = final_model.predict(X_test)

report = classification_report(
    np.argmax(tf.keras.utils.to_categorical(y_test), axis=1), np.argmax(y_pred, axis=1), target_names=list(class_names)
)

In [None]:
print(report)

In [None]:
import seaborn as sns
plt.figure(1, figsize=(10, 10))
plt.title("Confusion Matrix")
sns.heatmap(tf.math.confusion_matrix(
    np.argmax(tf.keras.utils.to_categorical(y_test), axis=1),
    np.argmax(y_pred, axis=1),
    num_classes=15,
    dtype=tf.dtypes.int32,
    name=None
), annot=True, fmt="", cbar=False, cmap="YlOrRd", yticklabels=list(class_names), xticklabels=list(class_names))
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.show()

In [None]:
# summarize filter shapes
for layer in final_model.layers:
    # check for convolutional layer
    if 'conv' not in layer.name:
        continue
    # get filter weights
    filters, biases = layer.get_weights()
    print(layer.name, filters.shape)

In [None]:
# plot first few filters
n_filters, ix = 2, 1
plt.suptitle(f'Feature map examples from convolution layers')
for i in range(n_filters):
    # get the filter
    f = filters[:, :, :, i]
    # plot each channel separately
    for j in range(3):
        # specify subplot and turn of axis
        ax = plt.subplot(n_filters, 3, ix)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_title(f'Kernel {j+1}, Conv {i+1}')
        # plot filter channel in grayscale
        plt.imshow(f[:, :, j], cmap='gray')
        ix += 1
    # show the figure
    plt.show()