![unet-2.png](attachment:unet-2.png)

In [None]:
train_path = "data/data_science_bowl/stage1_train"
validation_path = "data/data_science_bowl/stage1_validation"
test_path = "data/data_science_bowl/stage1_test"

In [None]:
import tensorflow as tf
from typing import Tuple, List, Optional, Union, Any
import numpy as np
from numpy import ndarray
import os
import matplotlib.pyplot as plt

In [None]:
def split_masks_into_binary(
        mask: np.ndarray,
        colormap: List[Tuple[int, int, int]]
        ) -> np.ndarray:
    """
    Splits multi-class mask into binary masks.

    Parameters
    ----------
    mask: np.ndarray
        Segmentation mask.
    colormap: List[Tuple[int, int, int]]
        Class color map.
    """
    return np.stack([np.all(mask == c, axis=-1) * 1 for c in colormap], axis=-1)

def read_and_sum_masks(
        paths: List[List[str]],
        target_size: Tuple[int, int]
) -> list:
    """
    Read and sums masks.

    Parameters
    ----------
    paths: List[List[str]]
        List of images paths to sum.
    target_size: Tuple[int, int]
        Target size for the image to be loaded.

    Returns
    -------
    masks: list
        List of masks.
    """
    masks = [sum([read_image(si, channels=3, target_size=target_size) for si in sub_list]) for sub_list in paths]
    return masks

In [None]:
def create_images_masks_paths(
        path: str, only_images: bool, subdirs: Tuple[str, str]
) -> dict:
    """
    Generates the dictionary storing the paths to the images and optionally coresponding masks.
    It is the latter foundation upon which the batches are generated.

    Parameters
    ----------
    path: str
        Images and masks directory.
    only_images: bool
        Should generator read only images (e.g. on train set for predictions).
    subdirs: Tuple[str, str]
        Vector of two characters containing names of subdirectories with images and masks.

    Returns
    -------
    path_dict: dict
        Dictionary with images and optionally masks paths.
    """
    nested_dirs = os.listdir(path)
    nested_dirs.sort()

    images_paths = []
    masks_paths = []
    for nd in nested_dirs:
        try:
            images_paths_batch = [
                os.path.join(path, nd, subdirs[0], s) for s in sorted(
                    os.listdir(os.path.join(path, nd, subdirs[0]))
                )
            ]
            images_paths.append(images_paths_batch)
            if not only_images:
                masks_paths_batch = [
                    os.path.join(path, nd, subdirs[1], s) for s in sorted(
                        os.listdir(os.path.join(path, nd, subdirs[1]))
                    )
                ]
                masks_paths.append(masks_paths_batch)
        except FileNotFoundError:
            log.warning(f"The current image {nd} is incomplete for it contains only masks or images!")
            pass
    if not only_images:
        path_dict = {"images_paths": images_paths, "masks_paths": masks_paths}
        return path_dict
    else:
        path_dict = {"images_paths": images_paths}
        return path_dict


def filter_paths_by_indices(paths: List[List[str]], indices: Optional[Tuple[int]]) -> List[List[str]]:
    """
    Filters path list by indices.

    Parameters
    ----------
        paths: List[str]
            Images paths.
        indices: Optional[Tuple[int]]
            Indices.

    Returns
    -------
    filtered_paths: List[str]
        Filtered paths list.
    """
    filtered_paths = [paths[idx] for idx in indices] if indices is not None else paths
    return filtered_paths

In [None]:
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from skimage.transform import resize
from skimage.color import rgb2gray, gray2rgb
from typing import Union, Tuple, List

def read_image(path: str, channels: int, target_size: Union[int, Tuple[int, int]]) -> ndarray:
    """
    Loads image as numpy array.

    Parameters
    ----------
    path: str
        Image path.
    channels: int
        Number of color channels.
    target_size: Union[int, Tuple[int, int]]
        Target size for the image to be loaded.

    Returns
    -------
    pixel_array: ndarray
        Image as numpy array.
    """
    if channels == 1:
        color_mode = "grayscale"
    elif channels == 3:
        color_mode = "rgb"
    elif channels == 4:
        color_mode = "rgba"
    else:
        raise ValueError('For classical (PNG, JPG, ...) images number of channels can be set to 1, 3 or 4!')
    pixel_array = img_to_array(load_img(path, color_mode=color_mode, target_size=target_size))
    return pixel_array

def read_and_concatenate_images(
        paths: List[List[str]],
        channels: List[int],
        target_size: Union[int, Tuple[int, int]]
) -> list:
    """
    Read and concatenates images along channel axis.

    Parameters
    ----------
    paths: List[List[str]]
        List of images paths to concatenate.
    channels: List[int]
        Number of color channels.
    target_size: Union[int, Tuple[int, int]]
        Target size for the image to be loaded.

    Returns
    -------
    images: list
        List of images.
    """
    channels = channels if isinstance(channels, list) else [channels]
    images = [np.concatenate([read_image(path, channels=ch, target_size=target_size)
                              for path, ch in zip(sub_list, channels)], axis=-1) for sub_list in paths]
    return images

In [None]:
class SegmentationGenerator(tf.keras.utils.Sequence):
    """The class can be used as train, validation or test generator. It is also utilized by the engine
    while producing the output plots, based on the trained models.

    Methods
    -------
    create_images_masks_paths(path: str, mode: str, only_images: bool, subdirs: Tuple[str, str], column_sep: str)
        Generates the dictionary storing the paths to the images and optionally coresponding masks.

    read_images_and_masks_from_directory(self, indices: Optional[List])
        Composes the batch of data out of the loaded images and optionally masks.
    """

    def __init__(
            self,
            path: str,
            colormap: Optional[List[Tuple[int, int, int]]],
            only_images: bool = False,
            net_h: Union[int, List[int]] = 256,
            net_w: Union[int, List[int]] = 256,
            channels: Union[int, List[int], List[Union[int, List[int]]]] = 3,
            batch_size: int = 32,
            shuffle: bool = True,
            subdirs: Tuple[str, str] = ("images", "masks"),
            return_paths: bool = False
    ) -> None:
        """
        Generates batches of data (images and masks). The data will be looped over (in batches).

        Parameters
        ----------
        path: str
            Images and masks directory.
        colormap: List[Tuple[int, int, int]]
            Class color map.
        mode: str
            Character. One of "nested_dirs", "config_file"
        only_images: bool
            Should generator read only images (e.g. on train set for predictions).
        net_h: Union[int, List[int]]
            Input layer height or list of heights for multiple inputs.
        net_w: Union[int, List[int]]
            Input layer width or list of widths for multiple inputs.
        channels: Union[int, List[int]]
            Defines inputs layer color channels.
        batch_size: int
            Batch size.
        shuffle: bool
            Should data be shuffled.
        subdirs: Tuple[str, str]
            Vector of two characters containing names of subdirectories with images and masks.
        return_paths: bool
            Indicates whether the generator is supposed return images paths.
        """
        self.path = path
        self.colormap = colormap
        self.only_images = only_images
        self.net_h = net_h
        self.net_w = net_w
        self.channels = channels
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.subdirs = subdirs
        self.classes = len(colormap)
        self.return_paths = return_paths
        self.config = create_images_masks_paths(self.path, self.only_images, self.subdirs)
        self.indexes = None
        self.steps_per_epoch = self.calculate_steps_per_epoch()
        print(len(self.config["images_paths"]), "images detected!")
        print("Set 'steps_per_epoch' to:", self.steps_per_epoch)
        self.on_epoch_end()
        
    def calculate_steps_per_epoch(self) -> int:
        """Calculates the number of steps needed to go through all the images given the batch size.

        Returns
        -------
        steps_per_epoch: int
            Steps that the generator is to take in order to complete an epoch.
        """
        steps_per_epoch = int(np.ceil(len(self.config["images_paths"]) / self.batch_size))
        return steps_per_epoch

    def calculate_masks_target_size(self) -> Tuple[int, int]:
        """
        Calculates masks target size.

        Returns
        -------
        target_size: Tuple[int, int]
            Masks target size.
        """
        return (self.net_h, self.net_w)

    def calculate_images_target_sizes(self) -> List[Tuple[int, int]]:
        """
        Calculates images target sizes.

        Returns
        -------
        target_size: List[Tuple[int, int]]
            Images target sizes.
        """
        return (self.net_h, self.net_w)

    def read_images_and_masks_from_directory(
            self, indices: Optional[List]
    ) -> Union[tuple[list[Any], list[ndarray]], list[Any]]:
        """
        Composes the batch of data out of the loaded images and optionally masks.

        Parameters
        ----------
        indices: List
            Indices of selected images. If `None` all images in `paths` will be selected.

        Returns
        -------
        loaded_data: list
            List of images and masks.
        """
        selected_images_paths = filter_paths_by_indices(self.config["images_paths"], indices)
        target_size = self.calculate_masks_target_size()
        channels = self.channels
        selected_images = read_and_concatenate_images(selected_images_paths, channels, target_size)
        if not self.only_images:
            selected_masks_paths = filter_paths_by_indices(self.config["masks_paths"], indices)
            selected_masks = read_and_sum_masks(selected_masks_paths, target_size)
            selected_masks = [split_masks_into_binary(mask, self.colormap) for mask in selected_masks]
        if not self.only_images:
            loaded_data = (selected_images, selected_masks, selected_images_paths)
        else:
            loaded_data = (selected_images, selected_images_paths)
        return loaded_data

    def on_epoch_end(self) -> None:
        """Updates indexes on epoch end, optionally shuffles them for the sake of randomization."""
        self.indexes = list(range(len(self.config["images_paths"])))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __getitem__(self, index: int) -> Union[tuple[ndarray, ndarray], ndarray]:
        """
        Returns one batch of data.

        Parameters
        ----------
        index: int
            Batch index.

        Returns
        -------
        batch: tuple
            Batch of data being images and masks.
        """
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        if not self.only_images:
            images, masks, paths = self.read_images_and_masks_from_directory(indexes)
            images = np.stack(images, axis=0) / 255
            masks = np.stack(masks, axis=0)
            batch = (images, masks)
            if self.return_paths:
                batch = (images, masks, paths)
        else:
            images, paths = self.read_images_and_masks_from_directory(indexes)
            images = np.stack(images, axis=0) / 255
            batch = images
            if self.return_paths:
                batch = (images, paths)
        return batch

    def __len__(self) -> int:
        """Returns the number of batches that one epoch is comprised of."""
        return int(np.ceil(len(self.config["images_paths"]) / self.batch_size))

In [None]:
train_gen = SegmentationGenerator(
            path = train_path,
            colormap = [(0, 0, 0), (255, 255, 255)],
            only_images = False,
            net_h = 256,
            net_w = 256,
            channels = 1,
            batch_size = 8,
            shuffle = True,
            subdirs = ("images", "masks"),
            return_paths = False
)

validation_gen = SegmentationGenerator(
            path = validation_path,
            colormap = [(0, 0, 0), (255, 255, 255)],
            only_images = False,
            net_h = 256,
            net_w = 256,
            channels = 1,
            batch_size = 8,
            shuffle = True,
            subdirs = ("images", "masks"),
            return_paths = False
)

In [None]:
sample_barch = train_gen.__getitem__(0)
print(sample_barch[0].shape)
print(sample_barch[1].shape)

In [None]:
plt.matshow(sample_barch[0][0,:,:,:])

In [None]:
plt.matshow(sample_barch[1][0,:,:,1])

In [None]:
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Dropout, Conv2DTranspose

def create_unet(input_shape=(256, 256, 1)):
    # Zainicjalizuj Input
    inputs = Input(input_shape)
    
    # Dodaj konwolucję 2D: 16 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same' 
    c1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (inputs)
    # Dodaj dropout 10%
    c1 = Dropout(0.1) (c1)
    # Dodaj konwolucję 2D: 16 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c1)
    # Dodaj max pooling 2D: 2x2
    p1 = MaxPooling2D((2, 2)) (c1)

    # Dodaj konwolucję 2D: 32 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p1)
    # Dodaj dropout 10%
    c2 = Dropout(0.1) (c2)
    # Dodaj konwolucję 2D: 32 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c2)
    # Dodaj max pooling 2D: 2x2
    p2 = MaxPooling2D((2, 2)) (c2)

    # Dodaj konwolucję 2D: 64 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p2)
    # Dodaj dropout 20%
    c3 = Dropout(0.2) (c3)
    # Dodaj konwolucję 2D: 64 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c3)
    # Dodaj max pooling 2D: 2x2
    p3 = MaxPooling2D((2, 2)) (c3)

    # Dodaj konwolucję 2D: 128 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p3)
    # Dodaj dropout 20%
    c4 = Dropout(0.2) (c4)
    # Dodaj konwolucję 2D: 128 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c4)
    # Dodaj max pooling 2D: 2x2
    p4 = MaxPooling2D(pool_size=(2, 2)) (c4)

    # Dodaj konwolucję 2D: 256 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p4)
    # Dodaj dropout 30%
    c5 = Dropout(0.3) (c5)
    # Dodaj konwolucję 2D: 256 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c5)

    # Dodaj Transponowaną kownolucję 2D: 128 filtrów 2x2, strides 2x2, padding 'same'
    u6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same') (c5)
    # Dodaj konkatenację: u6 i c4
    u6 = concatenate([u6, c4])
    # Dodaj konwolucję 2D: 128 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u6)
    # Dodaj dropout 20%
    c6 = Dropout(0.2) (c6)
    # Dodaj konwolucję 2D: 128 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c6)

    # Dodaj Transponowaną kownolucję 2D: 64 filtrów 2x2, strides 2x2, padding 'same'
    u7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same') (c6)
    # Dodaj konkatenację: u7 i c3
    u7 = concatenate([u7, c3])
    # Dodaj konwolucję 2D: 64 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u7)
    # Dodaj dropout 20%
    c7 = Dropout(0.2) (c7)
    # Dodaj konwolucję 2D: 64 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c7)

    # Dodaj Transponowaną kownolucję 2D: 32 filtrów 2x2, strides 2x2, padding 'same'
    u8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same') (c7)
    # Dodaj konkatenację: u8 i c2
    u8 = concatenate([u8, c2])
    # Dodaj konwolucję 2D: 32 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u8)
    # Dodaj dropout 10%
    c8 = Dropout(0.1) (c8)
    # Dodaj konwolucję 2D: 32 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c8)

    # Dodaj Transponowaną kownolucję 2D: 16 filtrów 2x2, strides 2x2, padding 'same'
    u9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same') (c8)
    # Dodaj konkatenację: u9 i c1
    u9 = concatenate([u9, c1], axis=3)
    # Dodaj konwolucję 2D: 16 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u9)
    # Dodaj dropout 10%
    c9 = Dropout(0.1) (c9)
    # Dodaj konwolucję 2D: 16 filtrow 3x3, aktywacja 'elu', kernel_initializer 'he_normal', padding 'same'
    c9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c9)

    # Dodaj konwolucję 2D: 2 filtry 1x1, aktywacja 'softmax'
    outputs = Conv2D(2, (1, 1), activation='softmax') (c9)
    
    # Stwórz model
    model = Model(inputs=[inputs], outputs=[outputs])
    return model

In [None]:
model = create_unet()
model.summary()

![dice.png](attachment:dice.png)

In [None]:
import tensorflow.keras.backend as kb

def dice_coefficient(y_actual: tf.Tensor, y_pred: tf.Tensor) -> tf.Tensor:
    """
    Calculates the dice coefficient.

    Parameters
    ----------
    y_actual: tf.Tensor
        True segmentation mask.
    y_pred: tf.Tensor
        Predicted segmentation mask.

    Returns
    -------
    dice_coefficient: tf.Tensor
        Dice coefficient.
    """
    intersection = kb.sum(tf.cast(y_actual, 'float32') * y_pred)
    masks_sum = kb.sum(tf.cast(y_actual, 'float32')) + kb.sum(y_pred)
    dice_coefficient = (2 * intersection + kb.epsilon()) / (masks_sum + kb.epsilon())
    return dice_coefficient

In [None]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=[dice_coefficient])

In [None]:
history = model.fit(
    train_gen, epochs=30, steps_per_epoch=67,
    validation_data=validation_gen, validation_steps=17
)

In [None]:
preds = model.predict(validation_gen)
preds.shape

In [None]:
plt.matshow(np.round(preds[2,:,:,1]))