In [15]:
import os
import glob
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf

import sklearn
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay, roc_auc_score

import wandb
from wandb.keras import WandbCallback

DATA_ROOT = "/home/gpu1/Desktop/kamilio/Pneumothorax"
MODEL_ROOT = "/home/gpu1/Desktop/kamilio/models"
IMG_SIZE = 256

## Load necessary functions

In [16]:
import albumentations as A
AUTOTUNE = tf.data.AUTOTUNE
from functools import partial

transforms = A.Compose([
    A.Affine(scale = (1.4, 1.8), keep_ratio=True, p=0.95),
    A.GridDistortion(distort_limit=0.5, p=0.75)
])

def get_classes(dataset_name):
    #TODO add functionality to pop error if classes in data splits do not match
    class_names = os.listdir(os.path.join(DATA_ROOT, dataset_name, "train"))
    class_names = [int(name) for name in class_names]
    class_names.sort()
    class_names = [str(name) for name in class_names]
    return class_names

def get_classes_dict(dataset_name):
    class_names = get_classes(dataset_name)
    class_names_dict = {}
    for class_name in class_names:
        image_sample_name = os.path.basename(glob.glob(os.path.join(DATA_ROOT, dataset_name, "train", str(class_name),'*'))[0])
        class_names_dict[str(class_name)] = image_sample_name.split('_')[-2]
    return class_names_dict

def get_classes_dict_empty_lists(dataset_name):
    class_names = get_classes(dataset_name)
    class_names_dict = {}
    for class_name in class_names:
        class_names_dict[str(class_name)] = []
    return class_names_dict

def get_label(file_path):
    # Convert the path to a list of path components
    parts = tf.strings.split(file_path, os.path.sep)
    # The second to last is the class-directory
    # TODO: pass CLASS_NAMES as function parameter
    one_hot = parts[-2] == CLASS_NAMES
    # Integer encode the label
    return int(one_hot)

def decode_img(img):
    # Convert the compressed string to a 3D uint8 tensor
    img = tf.io.decode_jpeg(img, channels=3)
    return img

def aug_fn(image):
    data = {"image":image}
    aug_data = transforms(**data)
    aug_img = aug_data["image"]
    return aug_img

def apply_transformations(image):
    aug_img = tf.numpy_function(func=aug_fn, inp=[image], Tout=tf.float32)
    return aug_img

def process_path(file_path, use_augmentations: bool):
    label = get_label(file_path)
    # Load the raw data from the file as a string
    image = tf.io.read_file(file_path)
    image = decode_img(image)
    # Resize and norm the image
    image = tf.image.resize(image, size=[IMG_SIZE, IMG_SIZE])
    image = tf.cast(image/255, tf.float32)
    if use_augmentations:
        image = apply_transformations(image)
    return image, label

def load_dataset(dataset_name:str, split_type: str, use_transformations: bool, mini_batch_size: int, shuffle: bool):
    global CLASS_NAMES
    CLASS_NAMES = get_classes(dataset_name)
    dataset = tf.data.Dataset.list_files(f"{DATA_ROOT}/{dataset_name}/{split_type}/*/*", shuffle=False)
    if shuffle:
        dataset = dataset.shuffle(len(dataset), reshuffle_each_iteration=False)
    dataset = dataset.map(partial(process_path, use_augmentations=use_transformations), num_parallel_calls=AUTOTUNE).batch(mini_batch_size)
    return dataset

In [17]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras.applications import EfficientNetB5
from tensorflow.keras import layers

def get_initial_weights_type(imagenet_pretrained_backbone: bool):
    if imagenet_pretrained_backbone:
        weights='imagenet'
    else:
        weights=None
    return weights

def rebuild_top(model, class_count: int, top_dropout_rate: float = 0):
    # Rebuild top
    x = layers.GlobalAveragePooling2D(name="avg_pool")(model.output)
    x = layers.BatchNormalization(name='batch_normalization')(x)
    if top_dropout_rate != 0:
        x = layers.Dropout(top_dropout_rate, name="top_dropout")(x)
    outputs = layers.Dense(class_count, activation="softmax", name="pred")(x)
    return outputs

def build_EffNetB0(class_count: int, imagenet_pretrained_backbone: bool, top_dropout_rate: float = 0):
    
    weights_type = get_initial_weights_type(imagenet_pretrained_backbone)
    
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    model = EfficientNetB0(include_top=False, input_tensor=inputs, weights=weights_type)
    outputs = rebuild_top(model, class_count, top_dropout_rate)

    model = tf.keras.Model(inputs, outputs, name='EfficientNetB0')
    print("EffNet0 model build successfull")
    
    return model

def build_EffNetB3(class_count: int, imagenet_pretrained_backbone: bool, top_dropout_rate: float = 0):
    
    weights_type = get_initial_weights_type(imagenet_pretrained_backbone)
    
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    model = EfficientNetB3(include_top=False, input_tensor=inputs, weights=weights_type)
    outputs = rebuild_top(model, class_count, top_dropout_rate)

    model = tf.keras.Model(inputs, outputs, name='EfficientNetB3')
    print("EffNet3 model build successfull")
    
    return model

def build_EffNetB5(class_count: int, imagenet_pretrained_backbone: bool, top_dropout_rate: float = 0):
    
    weights_type = get_initial_weights_type(imagenet_pretrained_backbone)
    
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    model = EfficientNetB5(include_top=False, input_tensor=inputs, weights=weights_type)
    outputs = rebuild_top(model, class_count, top_dropout_rate)

    model = tf.keras.Model(inputs, outputs, name='EfficientNetB5')
    print("EffNet5 model build successfull")
    
    return model

def build_model(class_count: int, imagenet_pretrained_backbone: bool, architecture: str,  top_dropout_rate: float = 0):
    if architecture == "EffNetB0":
        model = build_EffNetB0(class_count, imagenet_pretrained_backbone, top_dropout_rate)
    elif architecture == "EffNetB3":
        model = build_EffNetB3(class_count, imagenet_pretrained_backbone, top_dropout_rate)
    elif architecture == "EffNetB5":
        model = build_EffNetB5(class_count, imagenet_pretrained_backbone, top_dropout_rate)
    else:
        raise Exception(f"Specified model architecture is not available")
        
    return model
        

## Visiaulize samples from downstream taskdataset

In [None]:
dataset_name = "data_v10.0.0"

map_dict = get_classes_dict(dataset_name)
sample_dict = get_classes_dict_empty_lists(dataset_name)

rot_train_dataset_eg = load_dataset(
                              dataset_name = dataset_name, 
                              split_type = "train", 
                              use_transformations = True, 
                              mini_batch_size = 1,
                              shuffle=True)

iterator=iter(rot_train_dataset_eg)

for _ in range(200):
    sample = next(iterator)
    image = sample[0].numpy().reshape(IMG_SIZE,IMG_SIZE,3)
    label = tf.math.argmax(sample[1], axis=1).numpy()[0]
    sample_dict[str(label)].append(image)

In [None]:
class_count = len(sample_dict)
plt.figure(figsize=(14,2*class_count))
i=1
for row in range(class_count):
    for column in range(7):
        plt.subplot(class_count, 7, i)
        image = sample_dict[str(row)][column]
        classs = map_dict[str(row)]
        plt.imshow(image)
        plt.title(f'{classs}')
        plt.axis('off')
        i += 1

## Fine-tune pretrained model for brain tumor classification task

In [20]:
major_version = 10
minor_version = 3
config_dict={"type": "SSL",
             "task": "Rotation angle classification",
             "dataset_origin": "Pneumothorax",
             "dataset_name": f"data_v10.0.0",
             "architecture": "EffNetB3",
             "continue_training_from": "",
             "imagenet_pretrained_backbone": False,
             "use_transformations": True,
             "loss_function": "categorical_crossentropy",
             "top_dropout_rate": 0.2,
             "metrics": ['acc'],
             "learning_rate": 0.0005,
             "batch_size": 32,
             "epochs": 100}


In [None]:
model_name = f"model_v{str(major_version)}.{str(minor_version)}.0"

run = wandb.init(entity = 'ssl_bakalauras',
                 project=f"SSL_v{str(major_version)}",
                 name = model_name,
                 config = config_dict)

config = wandb.config
project_name = run.project

print(config)
print(f"\nModel version: {model_name}\n")
print(f"{config.architecture} architecture will be used")

path_to_model = os.path.join(MODEL_ROOT, project_name, model_name)
os.makedirs(path_to_model)
tf.keras.backend.clear_session()

class_names = get_classes_dict(config.dataset_name)
print(f"\nClasses: {class_names}\n")
if config.use_transformations:
    transformations = str(transforms)
    print(f"Using transfromations for train dataset:{transformations}")
else:
    transformations = None
    print(f"No transformations will be used")


wandb.log({
    "class_names": str(class_names),
    "transformations": transformations
})

train_ds = load_dataset(
                      dataset_name = config.dataset_name, 
                      split_type = "train", 
                      use_transformations = config.use_transformations, 
                      mini_batch_size = config.batch_size,
                      shuffle=True)

val_ds = load_dataset(
                      dataset_name = config.dataset_name, 
                      split_type = "val", 
                      use_transformations = config.use_transformations, 
                      mini_batch_size = config.batch_size,
                      shuffle=True)

# Initialize model
if config.continue_training_from:
    print(f"Continue training from: {config.continue_training_from}")
    model = tf.keras.models.load_model(os.path.join(MODEL_ROOT, f"SSL_v{str(major_version)}", config.continue_training_from))
else:                   
    model = build_model(
                    class_count = len(class_names), 
                    imagenet_pretrained_backbone = config.imagenet_pretrained_backbone,
                    architecture = config.architecture,
                    top_dropout_rate = config.top_dropout_rate)
    print(f"Successfully initialized {config.architecture} model from scratch")


# Compile model
optimizer = tf.keras.optimizers.Adam(learning_rate=config.learning_rate)
model.compile(optimizer=optimizer, loss=config.loss_function, metrics = config.metrics)

model.summary()
mc = tf.keras.callbacks.ModelCheckpoint(os.path.join(path_to_model, "checkpoints", "weights{epoch:08d}.ckpt"), 
                                     save_weights_only=True, save_freq=191, verbose=1)
model.fit(train_ds,
          epochs=config.epochs,
          validation_data=val_ds,
          callbacks=[WandbCallback(save_model=(False)), mc])

model.save(path_to_model)

run.finish()
