In [None]:
import u_net
import os
import cv2
import numpy as np
import tensorflow as tf
import random
import pandas as pd
from tqdm import tqdm
print(tf.config.list_physical_devices('GPU'))

X_TRAIN_PATH = '../datasets/Dataset_UHCSDB/Patched/images_patched/'
Y_TRAIN_PATH = '../datasets/Dataset_UHCSDB/Patched/labels_png_patched/'
METADATA_PATH = '../datasets/Dataset_UHCSDB/UHCSDB_Metadata_Segmentierung.xlsx'

def load_and_augment_data(image_path, mask_path, augment_data=True):
    images = []
    masks = []
    angles = [90, 180, 270]
    
    # Load metadata from the Excel file
    metadata = pd.read_excel(METADATA_PATH)

    for file in os.listdir(image_path):
        if file.endswith('.png'):
            img = cv2.imread(os.path.join(image_path, file), cv2.IMREAD_GRAYSCALE)
            
            # Extract the corresponding metadata row for the current image
            meta_row = metadata[metadata['patch_id'] == file]
            
            # Ensure metadata exists for the current image
            if not meta_row.empty:
                scale = meta_row['scale_um_per_px'].values[0] # float between 0.0 and 1.0
                microconstituent = meta_row['primary_microconstituent'].values[0] # 'spheroidite' or 'spheroidite+widmanstatten'
                # convert microconstituent to a numerical label 0 or 1
                if microconstituent == 'spheroidite':
                    microconstituent = 0
                else:
                    microconstituent = 1
                anneal_time_min = meta_row['anneal_time_min'].values[0] # int, could be '-' though
                if anneal_time_min == '-':
                    continue
                    anneal_time_min = 0
                    anneal_temp_c = 0
                    cooling_type = 0
                else:
                    anneal_temp_c = meta_row['anneal_temperature_C'].values[0] # int
                    cooling_type = meta_row['cooling'].values[0] # 'AR' or 'Q'
                    # convert cooling type to a numerical label 0 or 1
                    if cooling_type == 'AR':
                        cooling_type = 0
                    else:
                        cooling_type = 1
                
                # Add metadata to image in the form of 5 new channels
                meta_channels = np.zeros((img.shape[0], img.shape[1], 5), dtype=np.float32)
                meta_channels[..., 0] = scale
                meta_channels[..., 1] = microconstituent
                meta_channels[..., 2] = anneal_time_min / 1440.0
                meta_channels[..., 3] = anneal_temp_c / 1000.0
                meta_channels[..., 4] = cooling_type

                img = np.concatenate([img[..., np.newaxis], meta_channels.astype(np.float32)], axis=-1)

            mask = cv2.imread(os.path.join(mask_path, file), cv2.IMREAD_GRAYSCALE)

            if file == 'uhcs0312_patch2.png':
                global visualization_image
                visualization_image = img
                global visualization_mask
                visualization_mask = mask
            else:
                images.append(img)
                masks.append(mask)

    augmented_images = []
    augmented_masks = []

    for img, mask in tqdm(zip(images, masks), total=len(images), desc="Augmenting data"):
        augmented_images.append(img)
        augmented_masks.append(mask)
        if augment_data:
            for angle in angles:
                rotated_img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE if angle == 90 else cv2.ROTATE_180 if angle == 180 else cv2.ROTATE_90_COUNTERCLOCKWISE)
                rotated_mask = cv2.rotate(mask, cv2.ROTATE_90_CLOCKWISE if angle == 90 else cv2.ROTATE_180 if angle == 180 else cv2.ROTATE_90_COUNTERCLOCKWISE)
                augmented_images.append(rotated_img)
                augmented_masks.append(rotated_mask)

    return augmented_images, augmented_masks

n_classes = 4
train_images, train_masks = load_and_augment_data(X_TRAIN_PATH, Y_TRAIN_PATH, True)

print("Number of training images:", len(train_images))
print("Shape of a single training image:", train_images[0].shape)
print("Number of training masks:", len(train_masks))
print("Shape of a single training mask:", train_masks[0].shape)

In [None]:
# Split data into training and testing sets
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(train_images, train_masks, test_size=0.2, random_state=42)

# Turn the lists into numpy arrays
X_train = np.array(X_train)
X_test = np.array(X_test)
Y_train = np.array(Y_train)
Y_test = np.array(Y_test)

print("Class values in Y_train:")
unique_classes = np.unique(Y_train)
print(unique_classes)   # [0 1 2 3]

# print shape of X_train and Y_train
print("Shape of X_train:", X_train.shape)  # (number_of_images, height, width, channels)
print("Shape of Y_train:", Y_train.shape)  # (number_of_images, height, width, channels)
# both are (76, 322, 322)

# Convert Y_train to categorical
from keras.utils import to_categorical
train_masks_cat = to_categorical(Y_train, num_classes=n_classes)
Y_train_cat = train_masks_cat.reshape((Y_train.shape[0], Y_train.shape[1], Y_train.shape[2], n_classes))

test_masks_cat = to_categorical(Y_test, num_classes=n_classes)
Y_test_cat = test_masks_cat.reshape((Y_test.shape[0], Y_test.shape[1], Y_test.shape[2], n_classes))

from sklearn.utils import class_weight
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(Y_train),
    y=Y_train.flatten()
)
# Normalize class weights
class_weights = class_weights / class_weights.sum()
print("Class weights:", class_weights)  # [0.338 1.862 2.514 9.083]

In [None]:
os.environ['SM_FRAMEWORK'] = 'tf.keras'
import segmentation_models as sm

def selfmade_model():
    import u_net
    model = u_net.unet_model_same_padding(in_channels=6)  # 5 metadata channels + 1 grayscale channel
    return model

def pretrained_model(X_train, X_test, n_classes):
    BACKBONE = 'resnet34'
    preprocess_input = sm.get_preprocessing(BACKBONE)
    X_train = preprocess_input(X_train)
    X_test = preprocess_input(X_test)
    model = sm.Unet(backbone_name=BACKBONE, encoder_weights='imagenet', activation='softmax', classes=n_classes)
    return model

def compile_with_hybrid_loss(model):
    # Custom loss function combining Dice and Focal Loss and 2 different metrics
    LR = 0.0001
    optimizer = tf.keras.optimizers.Adam(LR)
    dice_loss = sm.losses.DiceLoss(class_weights=class_weights)
    focal_loss = sm.losses.CategoricalFocalLoss()
    total_loss = dice_loss + focal_loss
    metrics = [
        sm.metrics.IOUScore(threshold=0.5, class_weights=class_weights),
        sm.metrics.FScore(threshold=0.5, class_weights=class_weights)
    ]
    model.compile(optimizer=optimizer, loss=total_loss, metrics=metrics)

def compile_with_jaccard_loss(model):
    # This is what they used in the segmentation_models library tutorial
    # (https://github.com/qubvel/segmentation_models/blob/master/docs/tutorial.rst)
    model.compile('Adam', loss=sm.losses.bce_jaccard_loss, metrics=[sm.metrics.iou_score])


#model = pretrained_model(X_train, X_test, n_classes)
model = selfmade_model()
#compile_with_jaccard_loss(model)
compile_with_hybrid_loss(model)



In [None]:
# Callbacks
from keras.callbacks import EarlyStopping, ModelCheckpoint
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True)

history = model.fit(
    X_train,
    Y_train_cat,
    batch_size=4,
    verbose=1,
    epochs=50,
    validation_data=(X_test, Y_test_cat),
    shuffle=True,
    callbacks=[
        early_stopping,
        model_checkpoint
        ],
)

In [None]:
model = tf.keras.models.load_model('best models/best_small_cross_entropy.keras')

In [None]:
model = tf.keras.models.load_model('best models/best_large_focal_loss.keras', custom_objects={
    'jaccard_coeff_multiclass': u_net.jaccard_coeff_multiclass
})

In [None]:
os.environ['SM_FRAMEWORK'] = 'tf.keras'
import segmentation_models as sm

#model = tf.keras.models.load_model('best models/best_large_mixed_loss.keras', compile=False)
model = tf.keras.models.load_model('best_model.keras', compile=False)

LR = 0.0001
optimizer = tf.keras.optimizers.Adam(LR)
dice_loss = sm.losses.DiceLoss(class_weights=class_weights)
focal_loss = sm.losses.CategoricalFocalLoss()
total_loss = dice_loss + focal_loss
metrics = [
    sm.metrics.IOUScore(threshold=0.5, class_weights=class_weights),
    sm.metrics.FScore(threshold=0.5, class_weights=class_weights)
]

model.compile(optimizer=optimizer, loss=total_loss, metrics=metrics)

In [None]:
# Evaluate the model
results = model.evaluate(X_test, Y_test_cat, batch_size=4, verbose=1)

# Print the results
print(f"Test Loss: {results[0]}")  # Loss is the first value
for i, metric in enumerate(metrics, start=1):  # Metrics start from the second value
	print(f"Test {metric.name}: {results[i]}")

In [None]:
#IOU
y_pred = model.predict(X_test, batch_size=4)
y_pred_argmax=np.argmax(y_pred, axis=3)

#Using built in keras function
from keras.metrics import MeanIoU
n_classes = 4
IOU_keras = MeanIoU(num_classes=n_classes)  
IOU_keras.update_state(Y_test, y_pred_argmax)
print("Mean IoU =", IOU_keras.result().numpy())


#To calculate I0U for each class...
values = IOU_keras.total_cm.numpy()
print(values)
class1_IoU = values[0,0]/(values[0,0] + values[0,1] + values[0,2] + values[0,3] + values[1,0]+ values[2,0]+ values[3,0])
class2_IoU = values[1,1]/(values[1,1] + values[1,0] + values[1,2] + values[1,3] + values[0,1]+ values[2,1]+ values[3,1])
class3_IoU = values[2,2]/(values[2,2] + values[2,0] + values[2,1] + values[2,3] + values[0,2]+ values[1,2]+ values[3,2])
class4_IoU = values[3,3]/(values[3,3] + values[3,0] + values[3,1] + values[3,2] + values[0,3]+ values[1,3]+ values[2,3])

print("IoU for class1 is: ", class1_IoU)
print("IoU for class2 is: ", class2_IoU)
print("IoU for class3 is: ", class3_IoU)
print("IoU for class4 is: ", class4_IoU)

In [None]:

# plot the training history
import matplotlib.pyplot as plt
def plot_history(history):
    # Plot training & validation accuracy values
    plt.plot(history.history['iou_score'])
    plt.plot(history.history['val_iou_score'])
    plt.title('Model IoU')
    plt.ylabel('IoU')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    plt.show()

    # Plot training & validation loss values
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    plt.show()

plot_history(history)


In [None]:
#random.seed(43)
# Pick a random test image
import matplotlib.pyplot as plt
idx = random.randint(0, X_test.shape[0] - 1)
test_img = X_test[idx]
true_mask = Y_test[idx]

test_img = visualization_image
true_mask = visualization_mask
# Rotate the test image and true mask twice
test_img = cv2.rotate(test_img, cv2.ROTATE_90_CLOCKWISE)
test_img = cv2.rotate(test_img, cv2.ROTATE_90_CLOCKWISE)

true_mask = cv2.rotate(true_mask, cv2.ROTATE_90_CLOCKWISE)
true_mask = cv2.rotate(true_mask, cv2.ROTATE_90_CLOCKWISE)

# Predict the mask
pred_mask = model.predict(np.expand_dims(test_img, axis=0))
pred_mask = np.argmax(pred_mask, axis=-1)[0]

# Plot the images
plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.title("Test Image")
plt.imshow(test_img[..., 0], cmap='gray')  # Display the first channel
plt.axis('off')

plt.subplot(1, 3, 2)
plt.title("True Mask")
plt.imshow(true_mask, cmap='nipy_spectral', vmin=0, vmax=n_classes-1)
plt.axis('off')

plt.subplot(1, 3, 3)
plt.title("Predicted Mask")
plt.imshow(pred_mask, cmap='nipy_spectral', vmin=0, vmax=n_classes-1)
plt.axis('off')

plt.show()
