### Imports

In [None]:
# various imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# setting seaborn display pattern for matplotlib
sns.set()

# sklearn for preprocessing and model evaluation
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder, LabelEncoder, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import classification_report, confusion_matrix, f1_score

# for faster data loading
import shutil
import os

# tensorflow for model structure
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    GlobalAveragePooling2D, Lambda, Dense, BatchNormalization,
    Dropout, Concatenate, Activation
)
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import ReduceLROnPlateau

# mount drive
# from google.colab import drive
# drive.mount('/content/drive')

### Manipulating the DataFrame

In [None]:
# load Dataframe
imageDataframe = pd.read_csv('../dataset_index/processed_image_data.csv')

# create Labelencoder and transform the 'Cancer Type' column
classEncoder = LabelEncoder()
imageDataframe['Cancer Type Encoded'] = classEncoder.fit_transform(imageDataframe['Cancer Type'])

# compute the counts for each original class
counts = imageDataframe['Cancer Type'].value_counts()

# create a DataFrame for class encodings
class_df = pd.DataFrame({
    'Decoded': classEncoder.classes_,
    'Encoded': range(len(classEncoder.classes_)),
    'Counts': [counts[label] for label in classEncoder.classes_]
})

# sort the DataFrame by the 'Encoded' column
class_df = class_df.sort_values(by='Encoded')
class_df

In [None]:
# dropping the maginfication Collum
imageDataframe.drop(columns=['Benign or Malignant', 'Magnification'], inplace = True)

imageDataframe = imageDataframe[['rel_path', 'Cancer Type Encoded']]

file_paths = imageDataframe.copy()
file_paths.head(3)

### Preloading the data

In [None]:
shutil.copytree('../PATH/TO/images_plain', '/tmp', dirs_exist_ok = True)

### Building the dataflow generator

In [None]:
# parameters
BATCH_SIZE = 128
IMG_SIZE = (256, 256)

# split the data into train, validation, and test sets
def split_dataset(filepaths_df):
    train_df, temp_df = train_test_split(
        filepaths_df,
        test_size=0.3,
        stratify=filepaths_df['Cancer Type Encoded'],
        random_state=42
    )
    test_df, val_df = train_test_split(
        temp_df,
        test_size=0.5,
        stratify=temp_df['Cancer Type Encoded'],
        random_state=42
    )
    return train_df, val_df, test_df

# validate file paths
def validate_file_paths(filepaths_df):
    invalid_paths = [path for path in filepaths_df['rel_path'] if not os.path.exists(path)]
    if invalid_paths:
        raise FileNotFoundError(f"The following files are missing: {invalid_paths}")
    return True

# custom real-time loading function
def create_dataset(filepaths_df, img_size, batch_size, augment_labels = None, is_training = True):
    file_paths = filepaths_df['rel_path'].values
    labels = filepaths_df['Cancer Type Encoded'].astype(np.int32).values

    dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels))

    # load image
    def load_image(file_path):
        image = tf.io.read_file(file_path)
        image = tf.image.decode_jpeg(image, channels = 3)
        image = tf.image.resize(image, img_size)
        image = tf.cast(image, tf.float32) / 255.0
        return image

    # simplified augmentation
    def augment_image(image):
        image = tf.image.random_flip_left_right(image)
        image = tf.image.random_flip_up_down(image)
        return image

    # processing images
    def process_image(file_path, label):
        image = load_image(file_path)

        # universal preprocessing, rotate 90 degrees
        image = tf.image.rot90(image, tf.random.uniform(shape = [], minval = 0, maxval = 4, dtype = tf.int32))

        # conditionally augment the image, enhancing the minority classes
        if is_training and augment_labels and tf.reduce_any(tf.equal(label, augment_labels)):
            augmented_image = augment_image(image)
            return tf.data.Dataset.from_tensors((image, label)).concatenate(
                tf.data.Dataset.from_tensors((augmented_image, label))
            )
        else:
            return tf.data.Dataset.from_tensors((image, label))

    # apply processing to dataset
    dataset = dataset.flat_map(process_image)

    # shuffle training set
    # and repeat the last datapoints if the batch is too small; used to eliminate a warning that appeared repeatedly
    if is_training:
        dataset = dataset.shuffle(buffer_size = 1000).repeat()

    dataset = dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

# compute class weights
def calculate_class_weights(train_df):
    per_class_count = train_df['Cancer Type Encoded'].value_counts()

    # multiply counts of classes other than class 1 by 2
    for label, count in per_class_count.items():
        if label != 1:
            per_class_count[label] = count * 2

    # calculate class weights
    total_samples = per_class_count.sum()
    class_weights = {}
    for label, count in per_class_count.items():
        class_weights[label] = total_samples / (len(per_class_count) * count)

    return class_weights

### Executing the creation of the datasets

In [None]:
filepaths = file_paths.copy()

# we split the dataset
train_df, val_df, test_df = split_dataset(filepaths)

# we check that there are no missing files
validate_file_paths(train_df)
validate_file_paths(val_df)
validate_file_paths(test_df)

# define augmented labels, everything except D-Carcinoma
augment_labels = [0, 2, 3, 4, 5, 6, 7]

# create train, test, validation datasets
train_ds = create_dataset(train_df, IMG_SIZE, BATCH_SIZE, augment_labels, is_training=True)
val_ds = create_dataset(val_df, IMG_SIZE, BATCH_SIZE, augment_labels, is_training=False).repeat()
test_ds = create_dataset(test_df, IMG_SIZE, BATCH_SIZE, augment_labels, is_training=False)

# compute effective dataset size
augmented_count = train_df['Cancer Type Encoded'].isin(augment_labels).sum()
effective_train_size = len(train_df) + augmented_count

# calculate steps per epoch
steps_per_epoch = effective_train_size // BATCH_SIZE
if effective_train_size % BATCH_SIZE != 0:
    steps_per_epoch += 1

# calculate class weights
class_weights = calculate_class_weights(train_df)

print("Class Weights")
print(class_weights)

print("\nSteps_per_epoch")
print(steps_per_epoch)

### Defining callbacks

In [None]:
earlystopping = keras.callbacks.EarlyStopping(
    monitor = 'val_loss',
    min_delta = 0,
    patience = 7,
    mode = 'auto',
    restore_best_weights = True
)

reduce_lr = ReduceLROnPlateau(
    monitor = 'val_loss',
    factor = 0.5,
    patience = 3,
    min_lr = 1e-6
)

### Creating the model

In [None]:
# the base model is a pretrained DenseNet121
base_model = DenseNet121(
    weights='imagenet',
    include_top=False,
    input_shape = (256, 256, 3)
)

# unfreeze all the layers
base_model.trainable = True

# find the last convolutional layer...
for layer in base_model.layers:
    if layer.name == 'conv5_block16_concat':
        x = layer.output
        break

# and change the rest of the model to combat overfitting
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = Dropout(0.5)(x)

# flatten and normalize
x = GlobalAveragePooling2D()(x)
x = Lambda(lambda x: tf.math.l2_normalize(x, axis = 1))(x)

# final layers
x = Dense(64, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)

predictions = Dense(8, activation='softmax')(x)

# creating the model
model = Model(inputs=base_model.input, outputs=predictions)

# compiling the model
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate = 0.0001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
model.summary()

### Fitting the model

In [None]:
history = model.fit(
    train_ds,
    validation_data = val_ds,
    epochs = 70,
    verbose = 1,

    # we use the calculations made above
    class_weight = class_weights,

    steps_per_epoch = steps_per_epoch,
    validation_steps = len(val_df) // BATCH_SIZE,
    callbacks = [earlystopping, reduce_lr]
)

### Comparing the training and validation

In [None]:
# plotting the validation and the training loss and accuracy in two plots, side by side
plt.figure(figsize = (12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc = 'upper left')

plt.show()

### Testing the model

In [None]:
# get true labels from the test dataset
y_test_true = np.concatenate([y.numpy() for x, y in test_ds], axis=0)
test_predictions = model.predict(test_ds)

# predicting on the test dataset and rounding the results
y_test_pred = np.argmax(test_predictions, axis=1)

# print the F1 score, accuracy, and classification report
test_f1 = f1_score(y_test_true, y_test_pred, average='macro')
test_accuracy = np.mean(y_test_true == y_test_pred)
print("Test Accuracy:", test_accuracy)
print("Test F1 Score:", test_f1)
print(classification_report(y_test_true, y_test_pred))

# compute and display the confusion matrix
test_conf_matrix = confusion_matrix(y_test_true, y_test_pred)

plt.figure(figsize=(8, 6))
sns.heatmap(test_conf_matrix, annot=True, fmt = 'd', cmap = 'Blues', xticklabels = np.unique(y_test_true), yticklabels = np.unique(y_test_true))
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix - Test Data")
plt.show()