In [2]:
# %% [markdown]
# ### Requirements
# - keras >= 2.2.0 or tensorflow >= 1.13
# - segmentation-models==1.0.*
# - albumentations==0.3.0
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ["SM_FRAMEWORK"] = "tf.keras"


# %%
import os
import cv2
import keras
import numpy as np
import matplotlib.pyplot as plt
import segmentation_models as sm
import albumentations as A

# %%
DATA_DIR = './drive_png/'

x_train_dir = os.path.join(DATA_DIR, 'training', 'input')
y_train_dir = os.path.join(DATA_DIR, 'training', 'target')

x_valid_dir = os.path.join(DATA_DIR, 'test', 'input')
y_valid_dir = os.path.join(DATA_DIR, 'test', 'target')

# %% [markdown]
# # Dataloader and utility functions 

def visualize(**images):
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image, cmap='gray')
    plt.show()

def denormalize(x):
    x_max = np.percentile(x, 98)
    x_min = np.percentile(x, 2)    
    x = (x - x_min) / (x_max - x_min)
    x = x.clip(0, 1)
    return x

class Dataset:
    """DRIVE Dataset. Read images, apply augmentation and preprocessing transformations."""
    
    def __init__(self, images_dir, masks_dir, augmentation=None, preprocessing=None):
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id.replace('_training', '_manual1')) for image_id in self.ids]
        
        self.augmentation = augmentation
        self.preprocessing = preprocessing
    
    def __getitem__(self, i):
        # Read data
        image = cv2.imread(self.images_fps[i], cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.masks_fps[i], cv2.IMREAD_GRAYSCALE)
        
        mask = (mask > 0).astype(np.float32)  # Convert to binary mask
        
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
        
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
            
        return image, mask
    
    def __len__(self):
        return len(self.ids)
    
class Dataloader(keras.utils.Sequence):
    def __init__(self, dataset, batch_size=1, shuffle=False):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(len(dataset))
        self.on_epoch_end()

    def __getitem__(self, i):
        start = i * self.batch_size
        stop = (i + 1) * self.batch_size
        data = [self.dataset[j] for j in range(start, stop)]
        batch = [np.stack(samples, axis=0) for samples in zip(*data)]
        return batch

    def __len__(self):
        return len(self.indexes) // self.batch_size

    def on_epoch_end(self):
        if self.shuffle:
            self.indexes = np.random.permutation(self.indexes)

# %%
# Data augmentation
def get_training_augmentation():
    train_transform = [
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),
        A.PadIfNeeded(min_height=512, min_width=512, always_apply=True, border_mode=0),
        A.RandomCrop(height=512, width=512, always_apply=True),
        A.IAAAdditiveGaussianNoise(p=0.2),
        A.OneOf([A.CLAHE(p=1), A.RandomBrightness(p=1), A.RandomGamma(p=1)], p=0.9),
        A.OneOf([A.IAASharpen(p=1), A.Blur(blur_limit=3, p=1)], p=0.9),
        A.OneOf([A.RandomContrast(p=1), A.HueSaturationValue(p=1)], p=0.9),
        A.Lambda(mask=round_clip_0_1)
    ]
    return A.Compose(train_transform)

def get_validation_augmentation():
    test_transform = [A.PadIfNeeded(512, 512)]
    return A.Compose(test_transform)

def get_preprocessing(preprocessing_fn):
    return A.Compose([A.Lambda(image=preprocessing_fn)])

# %%
BACKBONE = 'efficientnetb3'
BATCH_SIZE = 4
CLASSES = ['vessel']
LR = 0.0001
EPOCHS = 50

preprocess_input = sm.get_preprocessing(BACKBONE)
n_classes = 1  # Binary segmentation
activation = 'sigmoid'

# Create model
model = sm.Unet(BACKBONE, classes=n_classes, activation=activation)

# Define optimizer
optim = keras.optimizers.Adam(LR)

# Define losses and metrics
dice_loss = sm.losses.DiceLoss()
focal_loss = sm.losses.BinaryFocalLoss()
total_loss = dice_loss + (1 * focal_loss)

metrics = [sm.metrics.IOUScore(threshold=0.5), sm.metrics.FScore(threshold=0.5), 'accuracy', 'precision', 'recall']

# Compile the model
model.compile(optim, total_loss, metrics)

# %%
# Datasets
train_dataset = Dataset(
    x_train_dir, y_train_dir, 
    augmentation=get_training_augmentation(),
    preprocessing=get_preprocessing(preprocess_input)
)

valid_dataset = Dataset(
    x_valid_dir, y_valid_dir, 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocess_input)
)

train_dataloader = Dataloader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_dataloader = Dataloader(valid_dataset, batch_size=1, shuffle=False)

# %%
# Callbacks and training
callbacks = [
    keras.callbacks.ModelCheckpoint('./best_model.h5', save_weights_only=True, save_best_only=True, mode='min'),
    keras.callbacks.ReduceLROnPlateau(),
]

history = model.fit_generator(
    train_dataloader, 
    steps_per_epoch=len(train_dataloader), 
    epochs=EPOCHS, 
    callbacks=callbacks, 
    validation_data=valid_dataloader, 
    validation_steps=len(valid_dataloader),
)

# %%
# Model evaluation
test_dataset = Dataset(
    x_valid_dir, y_valid_dir, 
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocess_input)
)

test_dataloader = Dataloader(test_dataset, batch_size=1, shuffle=False)

model.load_weights('best_model.h5')
scores = model.evaluate_generator(test_dataloader)

print("Loss: {:.5}".format(scores[0]))
for metric, value in zip(metrics, scores[1:]):
    print(f"{metric}: {value:.5}")

# %%
# Visualize results
n = 5
ids = np.random.choice(np.arange(len(test_dataset)), size=n)

for i in ids:
    image, gt_mask = test_dataset[i]
    image = np.expand_dims(image, axis=0)
    pr_mask = model.predict(image).round()
    
    visualize(
        image=denormalize(image.squeeze()),
        gt_mask=gt_mask.squeeze(),
        pr_mask=pr_mask.squeeze(),
    )


Segmentation Models: using `tf.keras` framework.


I0000 00:00:1726457390.126903 1598788 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1726457390.168692 1598788 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1726457390.176298 1598788 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1726457390.183801 1598788 cuda_executor.cc:1015] successful NUMA node read from SysFS ha

ValidationError: 1 validation error for InitSchema
  Value error, If 'border_mode' is set to 'BORDER_CONSTANT', 'value' must be provided. [type=value_error, input_value={'min_height': 512, 'min_..._value': None, 'p': 1.0}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.9/v/value_error