In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import random
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import glob
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight, shuffle
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, classification_report
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing import ImageDataGenerator
from tensorflow.keras.layers import Dense, Activation, Flatten, Dropout, BatchNormalization, GlobalAveragePooling2D, Conv2D, MaxPooling2D
from tensorflow.keras import regularizers, optimizers
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2

from tqdm import tqdm
from google.colab.patches import cv2_imshow
%matplotlib inline

In [None]:
# Path
data_path = '/content/drive/My Drive/fyp/data'
input_csv = os.path.join(data_path, 'classification_data/algae_classification.csv')
image_path = os.path.join(data_path, 'images')
test_data = os.path.join(data_path, 'test_data')
output_path = "/content/drive/MyDrive/fyp/model/"
model_path = os.path.join(output_path, 'algae_classification_mobilenetv2_fine_tuned_output')

# Train Model

In [None]:
all_data = pd.read_csv(input_csv)

In [None]:
train, testval = train_test_split(all_data, test_size=0.3, random_state=22)
val, test = train_test_split(testval, test_size=0.5, random_state=23)

In [None]:
train = train.reset_index(drop=True)
val = val.reset_index(drop=True)
test = test.reset_index(drop=True)

In [None]:
onehotencoder = OneHotEncoder()
trainY = np.array(train.status.tolist())
trainY = onehotencoder.fit_transform(trainY.reshape(-1, 1)).toarray()
valY = np.array(val.status.tolist())
valY = onehotencoder.fit_transform(valY.reshape(-1, 1)).toarray()
testY = np.array(test.status.tolist())
testY = onehotencoder.fit_transform(testY.reshape(-1, 1)).toarray()

In [None]:
# Data augmentation functions

def fill(img, h, w):
    img = cv2.resize(img, (h, w), cv2.INTER_CUBIC)
    return img

def horizontal_flip(img, flag):
    if flag:
        return cv2.flip(img, 1)
    else:
        return img

def random_rotate(img, min_angle, max_angle):
    rotation_angle = random.uniform(min_angle, max_angle)
    rotated_img = ImageDataGenerator().apply_transform(x=img, transform_parameters={'theta': rotation_angle}) 
    return rotated_img

In [None]:
def generator(data, Y, batch_size = 32, flip = False, rotation = False):
    while True:
        for start in range(0, len(data), batch_size):
            x_batch = []
            y_batch = []
            end = min(start + batch_size, len(data))
            for i in range(start, end):
                img = cv2.imread(os.path.join(image_path,data['filename'][i]))
                img = cv2.resize(img, (224, 224))
                x_batch.append(img)
                y_batch.append(Y[i])
                if flip:
                    flip_img = horizontal_flip(img, True)
                    x_batch.append(flip_img)
                    y_batch.append(Y[i])
                if rotation:
                    rotated_img = random_rotate(img, -90.0, 90.0)
                    x_batch.append(rotated_img)
                    y_batch.append(Y[i])
            x_batch, y_batch = shuffle(x_batch, y_batch)
            yield np.array(x_batch),np.array(y_batch)

In [None]:
train_label_list = train["status"].tolist()
class_weights = class_weight.compute_class_weight('balanced',
                                                 np.unique(train_label_list),
                                                 train_label_list)
calculated_weights = {
    0: class_weights[0],
    1: class_weights[1],
}

In [None]:
base_model = MobileNetV2(
    weights='imagenet',
    include_top=False,
    input_shape=(224,224,3)
    )

base_model.trainable = False

inputs = keras.Input(shape=(224,224,3))

x = base_model(inputs, training=False)
x = GlobalAveragePooling2D()(x)
x = keras.layers.Dropout(0.2)(x)
outputs = Dense(2, activation='softmax')(x)
model = Model(inputs, outputs)

model.compile(
    optimizer = keras.optimizers.Adam(),
    loss=keras.losses.CategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.CategoricalAccuracy()]
    )

model.summary()

In [None]:
hist = model.fit(
    x=generator(train, trainY, 16, flip=True, rotation=True),
    epochs=20,
    steps_per_epoch=10,
    class_weight= calculated_weights,
    validation_data=generator(val, valY, 16, rotation=True),
    validation_steps=3
    ).history

In [None]:
plt.figure()
plt.ylabel("Loss (training and validation)")
plt.xlabel("Training Steps")
plt.ylim([0,2])
plt.plot(hist["loss"] ,label='train')
plt.plot(hist["val_loss"], label='val')
plt.legend()

plt.figure()
plt.ylabel("Accuracy (training and validation)")
plt.xlabel("Training Steps")
plt.ylim([0,1])
plt.plot(hist["categorical_accuracy"],label='train')
plt.plot(hist["val_categorical_accuracy"],label='val')
plt.legend()

In [None]:
pred = model.predict(
    x = generator(test, testY, 1),
    steps = len(test),
    verbose = 1
)
test_preds = np.argmax(pred, axis=1)
test_trues = np.argmax(testY, axis=-1)
print('Confusion matrix:\n', confusion_matrix(test_trues, test_preds))
print('F1 score:\n', classification_report(test_trues, test_preds))

In [None]:
base_model.trainable = True
model.summary()

model.compile(
    optimizer=keras.optimizers.Adam(1e-5),  # Very low learning rate
    loss=keras.losses.CategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.CategoricalAccuracy()],
)

In [None]:
hist = model.fit(
    x=generator(train, trainY, 16, shift=False, flip=True, channel=False, zoom=False, brightness=False, rotation=True),
    epochs=10,
    steps_per_epoch=10,
    class_weight= calculated_weights,
    validation_data=generator(val, valY, 16, rotation=True),
    validation_steps=3
    ).history

In [None]:
plt.figure()
plt.ylabel("Loss (training and validation)")
plt.xlabel("Training Steps")
plt.ylim([0,2])
plt.plot(hist["loss"] ,label='train')
plt.plot(hist["val_loss"], label='val')
plt.legend()

plt.figure()
plt.ylabel("Accuracy (training and validation)")
plt.xlabel("Training Steps")
plt.ylim([0,1])
plt.plot(hist["categorical_accuracy"],label='train')
plt.plot(hist["val_categorical_accuracy"],label='val')
plt.legend()

In [None]:
pred = model.predict(
    x = generator(test, testY, 1),
    steps = len(test),
    verbose = 1
)
test_preds = np.argmax(pred, axis=1)
test_trues = np.argmax(testY, axis=-1)
print('Confusion matrix:\n', confusion_matrix(test_trues, test_preds))
print('F1 score:\n', classification_report(test_trues, test_preds))

In [None]:
# Save model checkpoint
model.save(model_path)

# Load saved model and test

In [None]:
model = tf.keras.models.load_model(model_path)

In [None]:
test_data = []
for image in glob.glob(os.path.join(test_data, "*")): 
    img = cv2.imread(image)
    img = cv2.resize(img, (224, 224))
    test_data.append(img)
test_preds = model.predict(
    np.array(test_data),
    steps=len(test_data)
)
test_preds = np.argmax(test_preds, axis=1)