In [None]:
%load_ext nb_black
%load_ext lab_black

This is my project for computational intelligence based on kaggle dataset of monkeys.

Import packages

In [None]:
import os
from pathlib import Path

import cv2
import keras
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from keras.utils import np_utils
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras.callbacks import ReduceLROnPlateau
from tensorflow.python.keras.layers import (
    Conv2D,
    Activation,
    MaxPooling2D,
    Dropout,
    Flatten,
    Dense,
)
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.preprocessing.image import ImageDataGenerator

%matplotlib inline

Set data folders

In [None]:
data_folder = "data"
training_directory = os.path.join(data_folder, "training", "training")
test_directory = os.path.join(data_folder, "validation", "validation")
labels_file = os.path.join(data_folder, "monkey_labels.txt")

Read labels

In [None]:
labels_df = pd.read_csv(labels_file)
labels_df = labels_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
labels_df.columns = labels_df.columns.str.strip()
labels = labels_df["Common Name"]

Read images from files

In [None]:
def convert_image_to_vector(input_image, size=(32, 32)):
    resized_image = cv2.resize(input_image, size)
    return resized_image


def convert_image_to_vector_rgb(input_image, size=(32, 32)):
    resized_image = cv2.resize(input_image, size)
    img_rgb = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
    return img_rgb


def convert_image_to_vector_cubic(input_image, size=(32, 32)):
    resized_image = cv2.resize(input_image, size, interpolation=cv2.INTER_CUBIC)
    return resized_image


def convert_image_to_vector_both(input_image, size=(32, 32)):
    resized_image = cv2.resize(input_image, size, interpolation=cv2.INTER_CUBIC)
    img_rgb_cubic = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
    return img_rgb_cubic


def normalize(input_image):
    mean, std = input_image.mean(), input_image.std()
    input_image = (input_image - mean) / std
    return input_image


def process_image(file):
    image_file = cv2.imread(file)
    image_pixels = convert_image_to_vector_both(image_file, size=(32, 32))
    image_pixels = normalize(image_pixels)
    image_label = file.split(os.path.sep)[-2]
    return image_pixels, image_label

In [None]:
training_images = []
training_images_knn = []
training_labels = []

for path in Path(training_directory).rglob("*.jpg"):
     image, label = process_image(str(path))
     training_images.append(image)
     training_images_knn.append(image.flatten())

     training_labels.append(label)

test_images = []
test_images_knn = []
test_labels = []

for path in Path(test_directory).rglob("*.jpg"):
     image, label = process_image(str(path))
     test_images.append(image)
     test_labels.append(label)
     test_images_knn.append(image.flatten())


In [None]:
df = pd.DataFrame()
df["labels"]=training_labels
lab = df["labels"]
counts = lab.value_counts()
sns.countplot(counts)
print(labels)

In [None]:
test_images = np.array(test_images)
test_labels = np.array(test_labels)
test_images_knn = np.array(test_images_knn)

training_images = np.array(training_images)
training_labels = np.array(training_labels)
training_images_knn = np.array(training_images_knn)

num_classes = len(np.unique(training_labels))
label_encoder = LabelEncoder()
training_labels = label_encoder.fit_transform(training_labels)
test_labels = label_encoder.fit_transform(test_labels)
test_labels = np_utils.to_categorical(test_labels, num_classes)
training_labels = np_utils.to_categorical(training_labels, num_classes)

In [None]:
model = KNeighborsClassifier(n_neighbors=7)
model.fit(training_images_knn, training_labels)
acc = model.score(test_images_knn, test_labels)
print("[INFO] histogram accuracy: {:.2f}%".format(acc * 100))


Prepare data

In [None]:
# Only needed when flattening the image before
# training_images = training_images.reshape(-1, 32, 32, 3)
# test_images = test_images.reshape(-1, 32, 32, 3)
# normalizing the data to help with the training

In [None]:
model = Sequential()
model.add(Conv2D(32, (3, 3), input_shape=(32, 32, 3)))
model.add(Activation("relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(32, (3, 3)))
model.add(Activation("relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(64, (3, 3), padding="same"))
model.add(Activation("relu"))
model.add(Conv2D(64, (3, 3)))
model.add(Activation("relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(512))
model.add(Activation("relu"))
model.add(Dropout(0.5))
model.add(Dense(10))
model.add(Activation("softmax"))

model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

# model.compile(
#     loss="categorical_crossentropy",
#     optimizer="rmsprop",
#     metrics=["accuracy"]
# )
model.summary()


In [None]:
reduce_learning_rate = ReduceLROnPlateau(monitor="loss",
                                         factor=0.1,
                                         patience=2,
                                         cooldown=1,
                                         min_lr=0.00001,
                                         verbose=1)

checkpoint_filepath = "checkpoint.h5"
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor="accuracy",
    mode="max",
    save_best_only=True)

# model.fit(
#     training_images,
#     training_labels,
#     epochs=50,
#     callbacks=[reduce_learning_rate],
#     steps_per_epoch= training_images.size
# )
model.fit(training_images, training_labels, epochs=30, callbacks=[model_checkpoint_callback, ])
model.load_weights(checkpoint_filepath)

Test accuracy

In [None]:
test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)
print("\nTest accuracy:", test_acc)

Test on one image

In [None]:
n = 200
test_image = test_images[n]
test_image = np.expand_dims(test_image, axis=0)
prediction = model.predict(test_image, batch_size=1)
print(labels[np.argmax(prediction)])
print(labels)

In [None]:

names = [
    "Alouatta Palliata - Mantled Howler",
    "Trythrocebus Patas - Patas Monkey",
    "Cacajao Calvus - Bald Uakari",
    "Macaca Fuscata - Japanese Macaque",
    "Cebuella Pygmea - Pygmy Marmoset",
    "Cebus Capucinus - White Headed Capuchin",
    "Mico Argentatus - Silvery Marmoset",
    "Saimiri Sciureus - Common Squirrel Monkey",
    "Aotus Nigriceps - Black Headed Night Monkey",
    "Trachypithecus Johnii - Nilgiri Langur"
]


IMG_SIZE = 32
size = (IMG_SIZE,IMG_SIZE)
n_CLASS = 10

datagen = ImageDataGenerator(
    preprocessing_function = tf.keras.applications.efficientnet.preprocess_input,
    rotation_range = 40,
    width_shift_range = 0.2,
    height_shift_range = 0.2,
    shear_range = 0.2,
    zoom_range = 0.2,
    horizontal_flip = True,
    vertical_flip = True,
    fill_mode = "nearest",
    validation_split=0.2
)

train_set = datagen.flow_from_directory(
    training_directory,
    target_size = size,
    batch_size=32,
    seed = 42,
    subset="training",
    shuffle = True,
    class_mode="categorical"
)

val_set = datagen.flow_from_directory(
    training_directory,
    target_size = size,
    batch_size=32,
    seed = 42,
    subset="validation",
    shuffle = True,
    class_mode="categorical"
)


test_set = datagen.flow_from_directory(
    test_directory,
    target_size = size,
    batch_size = 32,
    seed = 42,
    class_mode = "categorical",
    shuffle = False
)

from keras.layers import GlobalAveragePooling2D
from keras.optimizers import  Adam
from keras.callbacks import  ReduceLROnPlateau
from tensorflow.keras.applications import EfficientNetB3


In [None]:
def create_model():
    model = Sequential()
    model.add(EfficientNetB3(input_shape = (IMG_SIZE, IMG_SIZE, 3), include_top = False, weights = "imagenet"))
    model.add(GlobalAveragePooling2D())
    model.add(Flatten())
    model.add(Dense(512, activation = "relu", bias_regularizer=tf.keras.regularizers.L1L2(l1=0.01, l2=0.001)))
    model.add(Dropout(0.7))
    model.add(Dense(n_CLASS, activation = "softmax"))

    return model

monkey_model = create_model()
monkey_model.summary()

In [None]:
keras.utils.plot_model(monkey_model)

In [None]:
EPOCHS = 50
STEP_SIZE_TRAIN = train_set.n//train_set.batch_size
STEP_SIZE_VALID = val_set.n//val_set.batch_size


In [None]:
def Model_fit():

    monkey_model = create_model()

    """Compiling the model"""

    monkey_model.compile(
        optimizer = Adam(learning_rate = 1e-3),
                        loss ="categorical_crossentropy",
                        metrics = ["acc"])
    es = EarlyStopping(monitor="val_loss", mode="min", patience=5,
                       restore_best_weights=True, verbose=1)

    checkpoint_cb = ModelCheckpoint("Cassava_best_model.h5",
                                    save_best_only=True,
                                    monitor = "val_loss",
                                    mode="min")

    reduce_lr = ReduceLROnPlateau(monitor = "val_loss",
                                  factor = 0.3,
                                  patience = 3,
                                  min_lr = 1e-5,
                                  mode = "min",
                                  verbose = 1)

    history = monkey_model.fit(train_set,
                             validation_data = val_set,
                             epochs= EPOCHS,
                             batch_size = 32,
                             steps_per_epoch = STEP_SIZE_TRAIN,
                             validation_steps = STEP_SIZE_VALID,
                             callbacks=[es, checkpoint_cb, reduce_lr])

    monkey_model.save("Cassava_model"+".h5")

    return history

results = Model_fit()

In [None]:
print("Train_Cat-Acc: ", max(results.history["acc"]))
print("Val_Cat-Acc: ", max(results.history["val_acc"]))

In [None]:
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Activation, Dropout, Flatten, Dense
from keras.callbacks import ModelCheckpoint, EarlyStopping

train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=40,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode="nearest")

train_generator = train_datagen.flow_from_directory(training_directory,
                                                    target_size=(32,32),
                                                    batch_size= 64,
                                                    seed=1,
                                                    shuffle=True,
                                                    class_mode="categorical")

# Test generator
test_datagen = ImageDataGenerator(rescale=1./255)
validation_generator = test_datagen.flow_from_directory(test_directory,
                                                  target_size=(32,32),
                                                  batch_size=64,
                                                  seed=1,
                                                  shuffle=False,
                                                  class_mode="categorical")

train_num = train_generator.samples
validation_num = validation_generator.samples

In [None]:
num_classes = 10

monkey_model = Sequential()
monkey_model.add(Conv2D(32,(3,3), input_shape=(32,32,3), activation="relu"))
monkey_model.add(MaxPooling2D(pool_size=(2,2)))

monkey_model.add(Conv2D(32,(3,3), activation="relu"))
monkey_model.add(MaxPooling2D(pool_size=(2,2)))

monkey_model.add(Conv2D(64,(3,3), padding="same", activation="relu"))
monkey_model.add(Conv2D(64,(3,3), activation="relu"))
monkey_model.add(MaxPooling2D(pool_size=(2,2)))
monkey_model.add(Dropout(0.25))

monkey_model.add(Flatten())
monkey_model.add(Dense(512))
monkey_model.add(Activation("relu"))
monkey_model.add(Dropout(0.5))
monkey_model.add(Dense(num_classes))
monkey_model.add(Activation("softmax"))

In [None]:
monkey_model.compile(optimizer="adam",
                    loss="categorical_crossentropy",
                    metrics=["acc"])
monkey_model.summary()

In [None]:
filepath=str(os.getcwd()+"/model.h5f")
checkpoint = ModelCheckpoint(
    filepath,
    monitor="val_acc",
    verbose=1,
    save_best_only=True, mode="max"
)
callbacks_list = [checkpoint]
batch_size = 64

monkey_generator = monkey_model.fit(
    train_generator,
    steps_per_epoch = train_num // batch_size,
    epochs = 100,
    validation_data = train_generator,
    validation_steps = validation_num // batch_size,
    callbacks = callbacks_list,
    verbose = 1
)