# Libraries

In [1]:
! pip install imutils

import tensorflow as tf
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import matplotlib.pyplot as plt
import os
import cv2
import imutils
import pathlib
import time
import PIL as pil
import shutil

IMAGE_SIZE = (256, 256)
BASE_LR = 1e-3
EPOCH = 150



# Preprocessing datasets

for this dataset
https://www.kaggle.com/datasets/masoudnickparvar/brain-tumor-mri-dataset

In [2]:
def crop_img(img, image_size=(256, 256)):
    """
    Finds the extreme points on the image and crops the rectangular out of them
    """
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    gray = cv2.GaussianBlur(gray, (3, 3), 0)

    # threshold the image, then perform a series of erosions +
    # dilations to remove any small regions of noise
    thresh = cv2.threshold(gray, 45, 255, cv2.THRESH_BINARY)[1]
    thresh = cv2.erode(thresh, None, iterations=2)
    thresh = cv2.dilate(thresh, None, iterations=2)

    # find contours in thresholded image, then grab the largest one
    cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cnts = imutils.grab_contours(cnts)
    c = max(cnts, key=cv2.contourArea)

    # find the extreme points
    extLeft = tuple(c[c[:, :, 0].argmin()][0])
    extRight = tuple(c[c[:, :, 0].argmax()][0])
    extTop = tuple(c[c[:, :, 1].argmin()][0])
    extBot = tuple(c[c[:, :, 1].argmax()][0])
    ADD_PIXELS = 0
    new_img = img[extTop[1]-ADD_PIXELS:extBot[1]+ADD_PIXELS, extLeft[0]-ADD_PIXELS:extRight[0]+ADD_PIXELS].copy()
    
    # resize image
    new_img = cv2.resize(new_img, image_size)
    
    return new_img

In [3]:
def image_preprocessing(source_dir, saved_root_dir, image_size=(256,256), channels=3):
    if not os.path.exists(source_dir):
        raise Exception(f"source directory: {source_dir} does not exists")
    else:
        if not os.path.isdir(source_dir):
            raise Exception(f"source directory: {source_dir} is not a directory")
            
    if not os.path.exists(saved_root_dir):
        os.makedirs(saved_root_dir)
        
    source_dir_path = pathlib.Path(source_dir)
    
    # transform and save
    for p in source_dir_path.iterdir():
        dir_name = str(p).split("/")[-1]
        for fp in p.iterdir():
            filename = str(fp).split("/")[-1]
            img = tf.io.read_file(str(fp))
            img = tf.image.decode_jpeg(img, channels=channels)
            img = crop_img(img.numpy(), image_size)
            img = pil.Image.fromarray(img)
            saved_dist_dir = os.path.join(saved_root_dir, dir_name)
            if not os.path.exists(saved_dist_dir):
                os.makedirs(saved_dist_dir)
            img_dist_path = saved_dist_dir+"/"+f"{filename}"
            img.save(img_dist_path)

In [5]:
image_preprocessing("/input/Training",
                   "/working/Training",
                   image_size=IMAGE_SIZE)

image_preprocessing("/input/Testing",
                   "/working/Testing",
                   image_size=IMAGE_SIZE)

Exception: source directory: /input/Training does not exists

# Datasets

In [None]:
root_dir_path = "/kaggle/working/brain-tumor-mri-dataset"

In [None]:
train_ds = tf.keras.utils.image_dataset_from_directory(os.path.join(root_dir_path, "Training"),
                                                               label_mode="categorical",
                                                                batch_size=16,
                                                                image_size=IMAGE_SIZE,
                                                                seed=42)

test_ds, val_ds = tf.keras.utils.image_dataset_from_directory(os.path.join(root_dir_path, "Testing"),
                                                               label_mode="categorical",
                                                                image_size=IMAGE_SIZE,
                                                                seed=42,
                                                                validation_split=0.2,
                                                                subset="both")

In [None]:
print(train_ds.class_names)
print(test_ds.class_names)

cls_to_id = {c:i for i, c in enumerate(train_ds.class_names)}
print(cls_to_id)
id_to_cls = {i:c for i, c in enumerate(train_ds.class_names)}
print(id_to_cls)

In [None]:
with open("class_to_id.txt", "w") as f:
    for k, v in cls_to_id.items():
        f.write(f"{k}\t{v}\n")

with open("id_to_class.txt", "w") as f:
    for k, v in id_to_cls.items():
        f.write(f"{k}\t{v}\n")

In [None]:
with open("class_to_id.txt", "r") as f:
    for line in f.readlines():
        cls, label = line.replace("\n","").split("\t")
        print(cls, int(label))
print("\n")
with open("id_to_class.txt", "r") as f:
    for line in f.readlines():
        label, cls = line.replace("\n","").split("\t")
        print(int(label), cls)

In [None]:
def class_weight_from_one_hot(ds):
    class_labels = []
    if ds.__class__.__name__ == "_BatchDataset":
        ds = ds.unbatch()
    
    for _, onehot in ds:
        class_labels.append(tf.argmax(onehot).numpy())
    
    unique_classes = np.unique(class_labels)
    class_weights = compute_class_weight(class_weight="balanced", 
                                         classes=unique_classes,
                                         y=class_labels)
    return {i:w for i, w in enumerate(class_weights)}

    
class_weights = class_weight_from_one_hot(train_ds)
print(class_weights)

In [None]:
for images, labels in train_ds.take(1):
    image, label = images[0], labels[0]
    plt.figure()
    plt.imshow(tf.cast(image, tf.uint8))
    plt.title(train_ds.class_names[tf.argmax(label).numpy()])
    plt.show()

# Model 

EfficientNetV2

In [None]:
def create_model():
    input_shape = (IMAGE_SIZE[0], IMAGE_SIZE[1], 3)
    input = tf.keras.layers.Input(shape=input_shape, name="input")
    x = tf.keras.layers.RandomFlip("horizontal_and_vertical")(input)
    x = tf.keras.layers.RandomBrightness(0.2)(x)
    x = tf.keras.layers.RandomZoom(0.2, 0.2)(x)
    x = tf.keras.layers.RandomTranslation(0.2, 0.2)(x)
    x = tf.keras.layers.RandomRotation(0.2)(x)
    x = tf.keras.applications.efficientnet_v2.EfficientNetV2B0(input_shape=input_shape,
                                                               include_top=False)(x)
    x = tf.keras.layers.GlobalMaxPooling2D(name="max_pooling")(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    output = tf.keras.layers.Dense(4, "softmax", name="output")(x)

    return tf.keras.Model(input, output)

In [None]:
model = create_model()
model.summary(show_trainable=True)

In [None]:
pretrain_model_layer = 6

# freeze pretrain model
model.layers[pretrain_model_layer].trainable = False
model.summary(show_trainable=True)

# Training

In [None]:
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
             optimizer=tf.keras.optimizers.Adam(BASE_LR),
             metrics=["categorical_accuracy"])

In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

cbs = [
    ReduceLROnPlateau(patience=3, min_lr=1e-6),
    EarlyStopping(patience=8, restore_best_weights=True)
]

history = model.fit(train_ds,
                   epochs=EPOCH,
                   validation_data=val_ds,
                   class_weight=class_weights,
                   callbacks=cbs)

# Fine-Tune

In [None]:
# unfreeze top layers for training on pretrain model
fine_tune_at = 100
pretrain_model = model.layers[pretrain_model_layer]
pretrain_model.trainable = True
for layer in pretrain_model.layers[:fine_tune_at]:
    layer.trainable = False
for layer in pretrain_model.layers:    
    print(layer.name, layer.output.shape, layer.trainable)

In [None]:
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
             optimizer=tf.keras.optimizers.Adam(BASE_LR/10),
             metrics=["categorical_accuracy"])

In [None]:
last_epoch = history.epoch[-1]

history_fine = model.fit(train_ds,
                           epochs=last_epoch+100,
                           initial_epoch=last_epoch,
                           validation_data=val_ds,
                           class_weight=class_weights,
                           callbacks=cbs)

# Metrics

In [None]:
model.evaluate(test_ds)

In [None]:
plt.figure()
epochs = list(range(len(history.history["loss"])))
plt.plot(epochs, history.history["loss"], label="train_loss")
plt.plot(epochs, history.history["val_loss"], label="val_loss")
plt.xticks(ticks=epochs)
plt.title("Loss")
plt.legend()
plt.show()

In [None]:
plt.figure()
epochs = list(range(len(history.history["categorical_accuracy"])))
plt.plot(epochs, history.history["categorical_accuracy"], label="train_accuracy")
plt.plot(epochs, history.history["val_categorical_accuracy"], label="val_accuracy")
plt.xticks(ticks=epochs)
plt.title("Accuracy")
plt.legend()
plt.show()

In [None]:
plt.figure()
epochs = list(range(len(history_fine.history["loss"])))
plt.plot(epochs, history_fine.history["loss"], label="train_loss")
plt.plot(epochs, history_fine.history["val_loss"], label="val_loss")
plt.xticks(ticks=epochs)
plt.title("Fine-Tune Loss")
plt.legend()
plt.show()

In [None]:
plt.figure()
epochs = list(range(len(history_fine.history["categorical_accuracy"])))
plt.plot(epochs, history_fine.history["categorical_accuracy"], label="train_accuracy")
plt.plot(epochs, history_fine.history["val_categorical_accuracy"], label="val_accuracy")
plt.xticks(ticks=epochs)
plt.title("Fine-Tune Accuracy")
plt.legend()
plt.show()

In [None]:
# inspect model performance
from sklearn.metrics import classification_report

def classi_report(test_ds):
    if test_ds.__class__.__name__ == "_BatchDataset":
        test_ds = test_ds.unbatch()
    
    y_true = []
    y_pred = []
    pred_times = []
    
    for img, one_hot_label in test_ds:
        label = tf.argmax(one_hot_label).numpy()
        t1 = time.time()
        output = model.predict(tf.expand_dims(img, axis=0), verbose=0)
        t2 = time.time()
        pred = tf.squeeze(output)
        pred = tf.argmax(pred).numpy()
        y_true.append(label)
        y_pred.append(pred)
        pred_times.append(t2-t1)
    
    print(f"average prediction time: {np.mean(pred_times)} seconds")
    return classification_report(y_true, y_pred)

print(classi_report(test_ds))

# Save Model

In [None]:
model.save("brain_tumor_detector.keras")
model.save("brain_tumor_detector.h5")

# Optimize model for tflite

In [None]:
# for float fallback quantization
def representative_data_gen():
    rep_ds = train_ds
    if rep_ds.__class__.__name__ == "_BatchDataset":
        rep_ds = rep_ds.unbatch()
        rep_ds = rep_ds.batch(1)
        
    for input_value, _ in rep_ds.take(100):
        # Model has only one input so each data point has one element.
        yield [input_value]

In [None]:
TFLITE_FILE_PATH = "brain_tumor_detector.tflite"

# optimize model and convert to tflite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# converter.representative_dataset = representative_data_gen
tflite_quant_model = converter.convert()

# save model as tflite
with open(TFLITE_FILE_PATH, 'wb') as f:
  f.write(tflite_quant_model)

In [None]:
# analyze tflite model
# tf.lite.experimental.Analyzer.analyze(model_content=tflite_quant_model, gpu_compatibility=True)

In [None]:
# inspect tflite model 
interpreter = interpreter = tf.lite.Interpreter(TFLITE_FILE_PATH)
input_details = interpreter.get_input_details()
signatures = interpreter.get_signature_list()
print(signatures)
print(input_details)

In [None]:
signature = interpreter.get_signature_runner()
print(signature)

In [None]:
# test our tflite model
pred_times = []

if test_ds.__class__.__name__ == "_BatchDataset":
    test_ds = test_ds.unbatch()

y_true = []
y_pred = []

for img, one_hot_label in test_ds:
    label = tf.argmax(one_hot_label).numpy()
    y_true.append(label)
    t1 = time.time()
    output = signature(input=tf.expand_dims(img, axis=0))
    t2 = time.time()
    pred = tf.squeeze(output["output"], axis=0)
    y_pred.append(tf.argmax(pred, axis=0).numpy())
    pred_times.append(t2-t1)

print(f"average prediction time: {np.mean(pred_times)} seconds")    
print(classification_report(y_true, y_pred))

# Clean up

In [None]:
shutil.rmtree('/kaggle/working/brain-tumor-mri-dataset')