In [None]:
# imports
import pathlib
import os

import albumentations
import numpy as np
import pandas as pd
import torch
from neptunecontrib.api import log_table
from skimage.transform import resize
from torch.utils.data import DataLoader

from customdatasets import SegmentationDataSet4
from transformations import (
    ComposeDouble,
    AlbuSeg2d,
    FunctionWrapperDouble,
    create_dense_target,
    normalize_01,
)


In [None]:
# parameters
checkpoint_location = r"<your/location/here>"  # where checkpoints are saved to
project_name = (
    "<username>/<project>"  # the project has to be created beforehand in neptune!
)

In [None]:
# hyper-parameters
params = {
    "BATCH_SIZE": 8,
    "DEPTH": 4,
    "ACTIVATION": "relu",
    "NORMALIZATION": "group8",
    "UPSAMPLING": "transposed",
    "LR": 0.0001,
    "WEIGTH_CE": torch.tensor((0.2, 0.8)),
    "WEIGTH_DICE": torch.tensor((0.0, 1.0)),
    "PRECISION": 32,
    "LR_FINDER": False,
    "INPUT_SIZE": (128, 128),
    "CLASSES": 2,
    "SEED": 42,
    "EXPERIMENT": "carvana",
    "MAXEPOCHS": 10,
}


In [None]:
# api key
api_key = os.environ['NEPTUNE']  # if this throws an error, you didn't set your env var

In [None]:
# root directory
root = pathlib.Path.cwd() / "Carvana"


In [None]:
# function to get file paths
def get_filenames_of_path(path: pathlib.Path, ext: str = "*"):
    """Returns a list of files in a directory/path. Uses pathlib."""
    filenames = [file for file in path.glob(ext) if file.is_file()]
    return filenames


In [None]:
# input and target files
inputs = get_filenames_of_path(root / "Input")
targets = get_filenames_of_path(root / "Target")

inputs.sort()
targets.sort()


In [None]:
# pre-transformations
pre_transforms = ComposeDouble(
    [
        FunctionWrapperDouble(
            resize, input=True, target=False, output_shape=(128, 128, 3)
        ),
        FunctionWrapperDouble(
            resize,
            input=False,
            target=True,
            output_shape=(128, 128),
            order=0,
            anti_aliasing=False,
            preserve_range=True,
        ),
    ]
)

# training transformations and augmentations
transforms_training = ComposeDouble(
    [
        AlbuSeg2d(albumentations.HorizontalFlip(p=0.5)),
        FunctionWrapperDouble(create_dense_target, input=False, target=True),
        FunctionWrapperDouble(
            np.moveaxis, input=True, target=False, source=-1, destination=0
        ),
        FunctionWrapperDouble(normalize_01),
    ]
)

# validation transformations
transforms_validation = ComposeDouble(
    [
        FunctionWrapperDouble(create_dense_target, input=False, target=True),
        FunctionWrapperDouble(
            np.moveaxis, input=True, target=False, source=-1, destination=0
        ),
        FunctionWrapperDouble(normalize_01),
    ]
)

# test transformations
transforms_test = ComposeDouble(
    [
        FunctionWrapperDouble(create_dense_target, input=False, target=True),
        FunctionWrapperDouble(
            np.moveaxis, input=True, target=False, source=-1, destination=0
        ),
        FunctionWrapperDouble(normalize_01),
    ]
)


In [None]:
# split dataset into training set and validation set (manually)
inputs_train, inputs_valid, inputs_test = inputs[:80], inputs[80:], inputs[80:]
targets_train, targets_valid, targets_test = targets[:80], targets[80:], targets[80:]


In [None]:
# random seed
from pytorch_lightning import seed_everything

seed_everything(params["SEED"])


In [None]:
# dataset training
dataset_train = SegmentationDataSet4(
    inputs=inputs_train,
    targets=targets_train,
    transform=transforms_training,
    use_cache=True,
    pre_transform=pre_transforms,
)

# dataset validation
dataset_valid = SegmentationDataSet4(
    inputs=inputs_valid,
    targets=targets_valid,
    transform=transforms_validation,
    use_cache=True,
    pre_transform=pre_transforms,
)

# dataset test
dataset_test = SegmentationDataSet4(
    inputs=inputs_test,
    targets=targets_test,
    transform=transforms_test,
    use_cache=True,
    pre_transform=pre_transforms,
)


In [None]:
# dataloader training
dataloader_training = DataLoader(
    dataset=dataset_train, batch_size=params["BATCH_SIZE"], shuffle=True, num_workers=0
)

# dataloader validation
dataloader_validation = DataLoader(
    dataset=dataset_valid, batch_size=params["BATCH_SIZE"], shuffle=False, num_workers=0
)

# dataloader test
dataloader_test = DataLoader(
    dataset=dataset_test,
    batch_size=1,  # has to be 1 for the analysis function at the bottom (k-highest, k-lowest)
    shuffle=False,
    num_workers=0,
)


In [None]:
# I am using a the SegmentationDataSet4 here which returns a dict instead of a tuple
batch = dataset_train[0]
x, y, x_name, y_name = batch["x"], batch["y"], batch["x_name"], batch["y_name"]
print(x.shape)
print(x.min(), x.max())
print(y.shape)
print(torch.unique(y))


In [None]:
# Little workaround to make the DatasetViewer work with the SegmentationDataSet4
from visual import DatasetViewer


class DatasetViewerExtra(DatasetViewer):
    def show_sample(self):

        # Get a sample from the dataset
        sample = self.get_sample_dataset(self.index)
        x, y, x_name, y_name = (
            sample["x"],
            sample["y"],
            sample["x_name"],
            sample["y_name"],
        )

        # Transform the sample to numpy, cpu and correct format to visualize
        x = self.transform_x(x)
        y = self.transform_y(y)

        # Create or update image layer
        if self.image_layer not in self.viewer.layers:
            self.image_layer = self.create_image_layer(x, x_name)
        else:
            self.update_image_layer(self.image_layer, x, x_name)

        # Create or update label layer
        if self.label_layer not in self.viewer.layers:
            self.label_layer = self.create_label_layer(y, y_name)
        else:
            self.update_label_layer(self.label_layer, y, y_name)

        # Reset view
        self.viewer.reset_view()


In [None]:
# create DatasetViewerExtra instances
dataset_viewer_training = DatasetViewerExtra(dataset_train)
dataset_viewer_validation = DatasetViewerExtra(dataset_valid)
dataset_viewer_test = DatasetViewerExtra(dataset_test)


In [None]:
# open napari instance for training dataset
# navigate with 'n' for next and 'b' for back on the keyboard
dataset_viewer_training.napari()


In [None]:
# open napari instance for validation dataset
# navigate with 'n' for next and 'b' for back on the keyboard
dataset_viewer_validation.napari()


In [None]:
# open napari instance for test dataset
# navigate with 'n' for next and 'b' for back on the keyboard
dataset_viewer_test.napari()


In [None]:
# neptune logger
from pytorch_lightning.loggers.neptune import NeptuneLogger

neptune_logger = NeptuneLogger(
    api_key=api_key,
    project_name=project_name,  # make sure this path exists in your netpune account
    experiment_name=params[
        "EXPERIMENT"
    ],  # make sure this path exists in your netpune account
    offline_mode=False,
    params=params,
)
assert neptune_logger.name  # http GET request to check if the project exists

# this can be a simple csv logger, or a custom logger
# you can also ignore this, see the trainer class for more information


In [None]:
# lightning module
from unet_lightning import Segmentation_UNET


In [None]:
# model init
from unet import UNet

model = UNet(
    in_channels=3,
    out_channels=2,
    n_blocks=params["DEPTH"],
    start_filters=32,
    activation=params["ACTIVATION"],
    normalization=params["NORMALIZATION"],
    conv_mode="same",
    dim=2,
    up_mode=params["UPSAMPLING"],
)

# you can replace this model with any other segmentation model here!


In [None]:
# task init
task = Segmentation_UNET(
    model,
    lr=params["LR"],
    weight_ce=params["WEIGTH_CE"],
    weight_dice=params["WEIGTH_DICE"],
    num_classes=params["CLASSES"],
    metrics=True,
)


In [None]:
# callbacks
from pytorch_lightning.callbacks import (
    ModelCheckpoint,
    LearningRateMonitor,
    EarlyStopping,
)

checkpoint_callback = ModelCheckpoint(monitor="checkpoint_valid_f1_epoch", mode="max")
learningrate_callback = LearningRateMonitor(logging_interval="step", log_momentum=False)
# early_stopping_callback = EarlyStopping(monitor="checkpoint_valid_f1_epoch", patience=10, mode="max") # throws an error atm, because of the custom metric computation that I use -> it's better to use their approach

# 3 very basic but important callbacks


In [None]:
# trainer init
from pytorch_lightning import Trainer

trainer = Trainer(
    gpus=1,
    precision=params["PRECISION"],  # try 16 with enable_pl_optimizer=False
    benchmark=True,  # good if the input sizes do not change, will increase speed
    callbacks=[checkpoint_callback, learningrate_callback],
    default_root_dir=checkpoint_location,  # where checkpoints are saved to
    logger=neptune_logger,  # you can also set it to False without breaking the code!
    log_every_n_steps=1,
    num_sanity_val_steps=0,
)

# the trainer class has many parameters!
# you can also set the logger arg to to False without breaking the code!


In [None]:
# learning rate finder
if params["LR_FINDER"]:
    lr_finder = trainer.tuner.lr_find(
        model=task,
        train_dataloader=dataloader_training,
        val_dataloaders=dataloader_validation,
        min_lr=1e-8,
        max_lr=1.0,
        num_training=100,  # number of learning rates to test
        mode="exponential",
        early_stop_threshold=None,
    )

    lr_finder_results = lr_finder.results  # results: lr vs loss in dict
    fig = lr_finder.plot(suggest=True, show=True)  # show fig of suggested lr
    neptune_logger.experiment.log_image(
        "Learning Rate Range Test", fig
    )  # log to neptune
    new_lr = lr_finder.suggestion()  # new lr

    task.lr = new_lr  # update with suggested lr
    neptune_logger.experiment.set_property("LR", new_lr)

# PL has a learning rate finder, very convenient!


In [None]:
# start training
trainer.max_epochs = params["MAXEPOCHS"]
trainer.fit(
    task, train_dataloader=dataloader_training, val_dataloaders=dataloader_validation
)


In [None]:
# start testing
trainer.test(ckpt_path="best", test_dataloaders=dataloader_test)
# this is how you would run your test dataset


In [None]:
# log packages
import importlib_metadata

dists = importlib_metadata.distributions()
packages = {
    idx: (dist.metadata["Name"], dist.version) for idx, dist in enumerate(dists)
}

packages_df = pd.DataFrame.from_dict(
    packages, orient="index", columns=["package", "version"]
)

log_table(name="packages", table=packages_df, experiment=neptune_logger.experiment)

packages_df
# I like to log the packages of the env that I used for the training run


In [None]:
# # log checkpoint including the model weights
# checkpoint_path = pathlib.Path(checkpoint_callback.best_model_path)
# neptune_logger.experiment.set_property('checkpoint_name', checkpoint_path.name)
# neptune_logger.experiment.log_artifact(str(checkpoint_path))

# you can either upload the complete checkpoint here or extract the model weights and upload them to your neptune experiment. Let me know if you're interested in knowing how this can be done.


In [None]:
# get k highest and lowest scores for analysis purposes, only works if batch size of the test dataset is set to 1
def get_k_highest_values(scores, k):
    # return indices
    return np.argpartition(np.array(scores), -k)[-k:]


def get_k_lowest_values(scores, k):
    # return indices
    return np.argpartition(np.array(scores), k)[:k]


def log_k_worst_best_scores(metric_obj, k):
    import itertools

    scores = metric_obj.get_metrics_epoch(last=True, transpose=False).numpy()
    names = np.array(list(itertools.chain.from_iterable(metric_obj.last_names)))

    k_lowest = get_k_lowest_values(scores, k=k)  # returns indices
    k_highest = get_k_highest_values(scores, k=k)  # returns indices

    df_lowest = pd.DataFrame(
        {f"{metric_obj}": scores[k_lowest], "name": names[k_lowest]}
    )
    df_highest = pd.DataFrame(
        {f"{metric_obj}": scores[k_highest], "name": names[k_highest]}
    )

    log_table(
        name=f"{metric_obj}-lowest",
        table=df_lowest,
        experiment=neptune_logger.experiment,
    )
    log_table(
        name=f"{metric_obj}-highest",
        table=df_highest,
        experiment=neptune_logger.experiment,
    )

    return df_lowest, df_highest


In [None]:
log_k_worst_best_scores(task.f1_test, k=5)
log_k_worst_best_scores(task.iou_test, k=5)


In [None]:
# stop experiment
neptune_logger.experiment.stop()
neptune_logger.close()


In [None]:
# Analyse your results in neptune.ai!
