# Train your own object detector with Faster-RCNN & PyTorch: Heads detector

### Rename downloaded files

In [None]:
import pathlib
from utils import get_filenames_of_path

root = pathlib.Path('heads')

inputs = get_filenames_of_path(root / 'input')
inputs.sort()


for idx, path in enumerate(inputs):
    old_name = path.stem
    old_extension = path.suffix
    dir = path.parent
    new_name = str(idx).zfill(3) + old_extension
    path.rename(pathlib.Path(dir, new_name))

### Start annotating

In [None]:
import pathlib
from visual import Annotator

from utils import get_filenames_of_path

dir = pathlib.Path('heads')
image_files = get_filenames_of_path(dir / 'input')

annotator = Annotator(image_ids=image_files)
annotator.napari()

### Add labels

In [None]:
annotator.add_class(label='head', color='red') # head

In [None]:
annotator.add_class(label='eye', color='blue') # eye

### Save the annotations of the current image

In [None]:
annotator.export(pathlib.Path('.../some_directory'))

### Save all available annotations in one go

In [None]:
annotator.export_all(pathlib.Path('heads/target'))

### Inspect the annotation

In [None]:
import pathlib
import torch
from utils import get_filenames_of_path

root = pathlib.Path('heads')

targets = get_filenames_of_path(root / 'target')
targets.sort()

In [None]:
annotation = torch.load(targets[1])

In [None]:
annotation.keys()

In [None]:
annotation['labels']

In [None]:
annotation['boxes']

# Dataset

In [None]:
import pathlib

import albumentations as A
import numpy as np

from datasets import ObjectDetectionDataSet
from transformations import ComposeDouble, Clip, AlbumentationWrapper, FunctionWrapperDouble
from transformations import normalize_01
from utils import get_filenames_of_path

In [None]:
root = pathlib.Path('heads')

In [None]:
inputs = get_filenames_of_path(root / 'input')
targets = get_filenames_of_path(root / 'target')

inputs.sort()
targets.sort()

In [None]:
mapping = {
    'head': 1,
}

### Transformations

In [None]:
transforms = ComposeDouble([
    Clip(),
    # AlbumentationWrapper(albumentation=A.HorizontalFlip(p=0.5)),
    # AlbumentationWrapper(albumentation=A.RandomScale(p=0.5, scale_limit=0.5)),
    # AlbuWrapper(albu=A.VerticalFlip(p=0.5)),
    FunctionWrapperDouble(np.moveaxis, source=-1, destination=0),
    FunctionWrapperDouble(normalize_01)
])

In [None]:
dataset = ObjectDetectionDataSet(inputs=inputs,
                                 targets=targets,
                                 transform=transforms,
                                 use_cache=False,
                                 convert_to_format=None,
                                 mapping=mapping)

### Visualize dataset

In [None]:
color_mapping = {
    1: 'red',
}

from visual import DatasetViewer

datasetviewer = DatasetViewer(dataset, color_mapping)
datasetviewer.napari()
datasetviewer.gui_text_properties(datasetviewer.shape_layer)

### Visualize dataset with Faster-RCNN transformer

In [None]:
color_mapping = {
    1: 'red',
}

from visual import DatasetViewer

from torchvision.models.detection.transform import GeneralizedRCNNTransform

transform = GeneralizedRCNNTransform(min_size=1024,
                                     max_size=1024,
                                     image_mean=[0.485, 0.456, 0.406],
                                     image_std=[0.229, 0.224, 0.225])

datasetviewer = DatasetViewer(dataset, color_mapping, rccn_transform=transform)
datasetviewer.napari()

In [None]:
datasetviewer.image_layer.data.shape

### Dataset statistics

In [None]:
from utils import stats_dataset

stats = stats_dataset(dataset)

from torchvision.models.detection.transform import GeneralizedRCNNTransform

transform = GeneralizedRCNNTransform(min_size=1024,
                                     max_size=1024,
                                     image_mean=[0.485, 0.456, 0.406],
                                     image_std=[0.229, 0.224, 0.225])

stats_transform = stats_dataset(dataset, transform)

In [None]:
stats.keys()

In [None]:
stats['image_height'].max()

In [None]:
stats_transform['image_height'].max()

In [None]:
stats['image_height'].min()

In [None]:
stats_transform['image_height'].min()

## AnchorViewer

In [None]:
from torchvision.models.detection.transform import GeneralizedRCNNTransform
from visual import AnchorViewer

transform = GeneralizedRCNNTransform(min_size=1024,
                                     max_size=1024,
                                     image_mean=[0.485, 0.456, 0.406],
                                     image_std=[0.229, 0.224, 0.225])

image = dataset[0]['x']  # ObjectDetectionDataSet
feature_map_size = (512, 32, 32)
anchorviewer = AnchorViewer(image=image,
                 rcnn_transform=transform,
                 feature_map_size=feature_map_size,
                 anchor_size=((128, 256, 512),),
                 aspect_ratios=((0.5, 1.0, 2.0),)
                 )
anchorviewer.napari()


# Training

In [None]:
# Imports
import pathlib

import albumentations as A
import numpy as np
from torch.utils.data import DataLoader

from datasets import ObjectDetectionDataSet
from transformations import ComposeDouble, Clip, AlbumentationWrapper, FunctionWrapperDouble
from transformations import normalize_01
from utils import get_filenames_of_path, collate_double

In [None]:
# hyper-parameters
params = {'BATCH_SIZE': 2,
          'LR': 0.001,
          'PRECISION': 32,
          'CLASSES': 2,
          'SEED': 42,
          'PROJECT': 'Heads',
          'EXPERIMENT': 'heads',
          'MAXEPOCHS': 500,
          'BACKBONE': 'resnet34',
          'FPN': False,
          'ANCHOR_SIZE': ((32, 64, 128, 256, 512),),
          'ASPECT_RATIOS': ((0.5, 1.0, 2.0),),
          'MIN_SIZE': 1024,
          'MAX_SIZE': 1024,
          'IMG_MEAN': [0.485, 0.456, 0.406],
          'IMG_STD': [0.229, 0.224, 0.225],
          'IOU_THRESHOLD': 0.5
          }

In [None]:
# root directory
root = pathlib.Path('heads')

In [None]:
# input and target files
inputs = get_filenames_of_path(root / 'input')
targets = get_filenames_of_path(root / 'target')

inputs.sort()
targets.sort()

In [None]:
# mapping
mapping = {
    'head': 1,
}

In [None]:
# training transformations and augmentations
transforms_training = ComposeDouble([
    Clip(),
    AlbumentationWrapper(albumentation=A.HorizontalFlip(p=0.5)),
    AlbumentationWrapper(albumentation=A.RandomScale(p=0.5, scale_limit=0.5)),
    # AlbuWrapper(albu=A.VerticalFlip(p=0.5)),
    FunctionWrapperDouble(np.moveaxis, source=-1, destination=0),
    FunctionWrapperDouble(normalize_01)
])

# validation transformations
transforms_validation = ComposeDouble([
    Clip(),
    FunctionWrapperDouble(np.moveaxis, source=-1, destination=0),
    FunctionWrapperDouble(normalize_01)
])

# test transformations
transforms_test = ComposeDouble([
    Clip(),
    FunctionWrapperDouble(np.moveaxis, source=-1, destination=0),
    FunctionWrapperDouble(normalize_01)
])

In [None]:
# random seed
from pytorch_lightning import seed_everything

seed_everything(params['SEED'])

In [None]:
# training validation test split
inputs_train, inputs_valid, inputs_test = inputs[:12], inputs[12:16], inputs[16:]
targets_train, targets_valid, targets_test = targets[:12], targets[12:16], targets[16:]

In [None]:
# dataset training
dataset_train = ObjectDetectionDataSet(inputs=inputs_train,
                                       targets=targets_train,
                                       transform=transforms_training,
                                       use_cache=True,
                                       convert_to_format=None,
                                       mapping=mapping)

# dataset validation
dataset_valid = ObjectDetectionDataSet(inputs=inputs_valid,
                                       targets=targets_valid,
                                       transform=transforms_validation,
                                       use_cache=True,
                                       convert_to_format=None,
                                       mapping=mapping)

# dataset test
dataset_test = ObjectDetectionDataSet(inputs=inputs_test,
                                      targets=targets_test,
                                      transform=transforms_test,
                                      use_cache=True,
                                      convert_to_format=None,
                                      mapping=mapping)

# dataloader training
dataloader_train = DataLoader(dataset=dataset_train,
                              batch_size=params['BATCH_SIZE'],
                              shuffle=True,
                              num_workers=0,
                              collate_fn=collate_double)

# dataloader validation
dataloader_valid = DataLoader(dataset=dataset_valid,
                              batch_size=1,
                              shuffle=False,
                              num_workers=0,
                              collate_fn=collate_double)

# dataloader test
dataloader_test = DataLoader(dataset=dataset_test,
                             batch_size=1,
                             shuffle=False,
                             num_workers=0,
                             collate_fn=collate_double)

In [None]:
# neptune logger
from pytorch_lightning.loggers.neptune import NeptuneLogger
from api_key_neptune import get_api_key

# api_key_neptune.py
#
# def get_api_key():
#     return 'your_super_long_API_token'


api_key = get_api_key()

neptune_logger = NeptuneLogger(
    api_key=api_key,
    project_name=f'your_neptune_name/{params["PROJECT"]}',
    experiment_name=params['EXPERIMENT'],
    params=params
)

In [None]:
# model init
from faster_RCNN import get_fasterRCNN_resnet

model = get_fasterRCNN_resnet(num_classes=params['CLASSES'],
                              backbone_name=params['BACKBONE'],
                              anchor_size=params['ANCHOR_SIZE'],
                              aspect_ratios=params['ASPECT_RATIOS'],
                              fpn=params['FPN'],
                              min_size=params['MIN_SIZE'],
                              max_size=params['MAX_SIZE'])

In [None]:
# lightning init
from faster_RCNN import FasterRCNN_lightning

task = FasterRCNN_lightning(model=model, lr=params['LR'], iou_threshold=params['IOU_THRESHOLD'])

In [None]:
# callbacks
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor, EarlyStopping

checkpoint_callback = ModelCheckpoint(monitor='Validation_mAP', mode='max')
learningrate_callback = LearningRateMonitor(logging_interval='step', log_momentum=False)
early_stopping_callback = EarlyStopping(monitor='Validation_mAP', patience=50, mode='max')

# trainer init
from pytorch_lightning import Trainer

trainer = Trainer(gpus=1,
                  precision=params['PRECISION'],  # try 16 with enable_pl_optimizer=False
                  callbacks=[checkpoint_callback, learningrate_callback, early_stopping_callback],
                  default_root_dir='heads',  # where checkpoints are saved to
                  logger=neptune_logger,
                  log_every_n_steps=1,
                  num_sanity_val_steps=0,
                  enable_pl_optimizer=False,  # False seems to be necessary for half precision
                  )

In [None]:
# start training
trainer.max_epochs = params['MAXEPOCHS']
trainer.fit(task,
            train_dataloader=dataloader_train,
            val_dataloaders=dataloader_valid)

In [None]:
# start testing
trainer.test(ckpt_path='best', test_dataloaders=dataloader_test)

In [None]:
# log packages
from utils import log_packages_neptune

log_packages_neptune(neptune_logger)

In [None]:
# log mapping as table
from utils import log_mapping_neptune

log_mapping_neptune(mapping, neptune_logger)

In [None]:
# log model
from utils import log_model_neptune

checkpoint_path = pathlib.Path(checkpoint_callback.best_model_path)
log_model_neptune(checkpoint_path=checkpoint_path,
                  save_directory=pathlib.Path.home(),
                  name='best_model.pt',
                  neptune_logger=neptune_logger)

# Inference

In [None]:
# imports
import ast
import pathlib

import neptune
import numpy as np
import torch
from torch.utils.data import DataLoader

from api_key_neptune import get_api_key
from datasets import ObjectDetectionDatasetSingle, ObjectDetectionDataSet
from transformations import ComposeSingle, FunctionWrapperSingle, normalize_01, ComposeDouble, FunctionWrapperDouble
from utils import get_filenames_of_path, collate_single

In [None]:
# parameters
params = {'EXPERIMENT': 'experiment_name',
          'INPUT_DIR': 'heads/test', # files to predict
          'PREDICTIONS_PATH': 'heads', # where to save the predictions
          'MODEL_DIR': 'heads', # load model from checkpoint
          'DOWNLOAD': False, # whether to download from neptune
          'DOWNLOAD_PATH': 'heads/', # where to save the model
          'OWNER': 'your_neptune_name',
          'PROJECT': 'Heads',
          }

In [None]:
# input files
inputs = get_filenames_of_path(pathlib.Path(params['INPUT_DIR']))
inputs.sort()

In [None]:
# transformations
transforms = ComposeSingle([
    FunctionWrapperSingle(np.moveaxis, source=-1, destination=0),
    FunctionWrapperSingle(normalize_01)
])

In [None]:
# create dataset and dataloader
dataset = ObjectDetectionDatasetSingle(inputs=inputs,
                                       transform=transforms,
                                       use_cache=False,
                                       )

dataloader_prediction = DataLoader(dataset=dataset,
                                   batch_size=1,
                                   shuffle=False,
                                   num_workers=0,
                                   collate_fn=collate_single)

In [None]:
# import experiment from neptune
api_key = get_api_key()  # get the personal api key
project_name = f'{params["OWNER"]}/{params["PROJECT"]}'
project = neptune.init(project_qualified_name=project_name, api_token=api_key)  # get project
experiment_id = params['EXPERIMENT']  # experiment id
experiment = project.get_experiments(id=experiment_id)[0]
parameters = experiment.get_parameters()
properties = experiment.get_properties()

In [None]:
# view dataset
from visual import DatasetViewerSingle
from torchvision.models.detection.transform import GeneralizedRCNNTransform

transform = GeneralizedRCNNTransform(min_size=int(parameters['MIN_SIZE']),
                                     max_size=int(parameters['MAX_SIZE']),
                                     image_mean=ast.literal_eval(parameters['IMG_MEAN']),
                                     image_std=ast.literal_eval(parameters['IMG_STD']))


datasetviewer = DatasetViewerSingle(dataset, rccn_transform=None)
datasetviewer.napari()

In [None]:
# download model from neptune or load from checkpoint
if params['DOWNLOAD']:
    download_path = pathlib.Path(params['DOWNLOAD_PATH'])
    model_name = properties['checkpoint_name'] # logged when called log_model_neptune()
    if not (download_path / model_name).is_file():
        experiment.download_artifact(path=model_name, destination_dir=download_path)  # download model

    model_state_dict = torch.load(download_path / model_name)
else:
    checkpoint = torch.load(params['MODEL_DIR'])
    model_state_dict = checkpoint['hyper_parameters']['model'].state_dict()

In [None]:
# model init
from faster_RCNN import get_fasterRCNN_resnet
model = get_fasterRCNN_resnet(num_classes=int(parameters['CLASSES']),
                              backbone_name=parameters['BACKBONE'],
                              anchor_size=ast.literal_eval(parameters['ANCHOR_SIZE']),
                              aspect_ratios=ast.literal_eval(parameters['ASPECT_RATIOS']),
                              fpn=ast.literal_eval(parameters['FPN']),
                              min_size=int(parameters['MIN_SIZE']),
                              max_size=int(parameters['MAX_SIZE'])
                              )

In [None]:
# load weights
model.load_state_dict(model_state_dict)

In [None]:
# inference
model.eval()
for sample in dataloader_prediction:
    x, x_name = sample
    with torch.no_grad():
        pred = model(x)
        pred = {key: value.numpy() for key, value in pred[0].items()}
        name = pathlib.Path(x_name[0])
        torch.save(pred, pathlib.Path(params['PREDICTIONS_PATH']) / name.with_suffix('.pt'))

In [None]:
# create prediction dataset
predictions = get_filenames_of_path(pathlib.Path(params['PREDICTIONS_PATH']))
predictions.sort()

transforms_prediction = ComposeDouble([
    FunctionWrapperDouble(np.moveaxis, source=-1, destination=0),
    FunctionWrapperDouble(normalize_01)
])

dataset_prediction = ObjectDetectionDataSet(inputs=inputs,
                                            targets=predictions,
                                            transform=transforms_prediction,
                                            use_cache=False)

In [None]:
# visualize predictions
from visual import DatasetViewer

color_mapping = {
    1: 'red',
}

datasetviewer_prediction = DatasetViewer(dataset_prediction, color_mapping)
datasetviewer_prediction.napari()

In [None]:
# add text properties gui
datasetviewer_prediction.gui_text_properties(datasetviewer_prediction.shape_layer)

In [None]:
# add nms slider
datasetviewer_prediction.gui_nms_slider(datasetviewer_prediction.shape_layer)

In [None]:
# add score slider
datasetviewer_prediction.gui_score_slider(datasetviewer_prediction.shape_layer)