import logging
import os
import shutil
import sys
import scipy.sparse as sparse
import numpy as np
import torch
def save_checkpoint(state, is_best, checkpoint_dir, logger=None):
"""Saves model and training parameters at '{checkpoint_dir}/last_checkpoint.pytorch'.
If is_best==True saves '{checkpoint_dir}/best_checkpoint.pytorch' as well.
state (dict): contains model's state_dict, optimizer's state_dict, epoch
and best evaluation metric value so far
is_best (bool): if True state contains the best model seen so far
checkpoint_dir (string): directory where the checkpoint are to be saved
def log_info(message):
if logger is not None:
if not os.path.exists(checkpoint_dir):
f"Checkpoint directory does not exists. Creating {checkpoint_dir}")
last_file_path = os.path.join(checkpoint_dir, 'last_checkpoint.pytorch')
log_info(f"Saving last checkpoint to '{last_file_path}'"), last_file_path)
if is_best:
best_file_path = os.path.join(checkpoint_dir, 'best_checkpoint.pytorch')
log_info(f"Saving best checkpoint to '{best_file_path}'")
shutil.copyfile(last_file_path, best_file_path)
def load_checkpoint(checkpoint_path, model, optimizer=None):
"""Loads model and training parameters from a given checkpoint_path
If optimizer is provided, loads optimizer's state_dict of as well.
checkpoint_path (string): path to the checkpoint to be loaded
model (torch.nn.Module): model into which the parameters are to be copied
optimizer (torch.optim.Optimizer) optional: optimizer instance into
which the parameters are to be copied
if not os.path.exists(checkpoint_path):
raise IOError(f"Checkpoint '{checkpoint_path}' does not exist")
state = torch.load(checkpoint_path)
if optimizer is not None:
return state
def get_logger(name, level=logging.INFO):
logger = logging.getLogger(name)
# Logging to console
stream_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s')
return logger
def get_number_of_learnable_parameters(model):
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
return sum([ for p in model_parameters])
class RunningAverage:
"""Computes and stores the average
def __init__(self):
self.count = 0
self.sum = 0
self.avg = 0
def update(self, value, n=1):
self.count += n
self.sum += value * n
self.avg = self.sum / self.count
def find_maximum_patch_size(model, device):
"""Tries to find the biggest patch size that can be send to GPU for inference
without throwing CUDA out of memory"""
logger = get_logger('PatchFinder')
in_channels = model.in_channels
patch_shapes = [(64, 128, 128), (96, 128, 128),
(64, 160, 160), (96, 160, 160),
(64, 192, 192), (96, 192, 192)]
for shape in patch_shapes:
# generate random patch of a given size
patch = np.random.randn(*shape).astype('float32')
patch = torch \
.from_numpy(patch) \
.view((1, in_channels) + patch.shape) \
.to(device)"Current patch size: {shape}")
def unpad(probs, index, shape, pad_width=8):
def _new_slices(slicing, max_size):
if slicing.start == 0:
p_start = 0
i_start = 0
p_start = pad_width
i_start = slicing.start + pad_width
if slicing.stop == max_size:
p_stop = None
i_stop = max_size
p_stop = -pad_width
i_stop = slicing.stop - pad_width
return slice(p_start, p_stop), slice(i_start, i_stop)
D, H, W = shape
i_c, i_z, i_y, i_x = index
p_c = slice(0, probs.shape[0])
p_z, i_z = _new_slices(i_z, D)
p_y, i_y = _new_slices(i_y, H)
p_x, i_x = _new_slices(i_x, W)
probs_index = (p_c, p_z, p_y, p_x)
index = (i_c, i_z, i_y, i_x)
return probs[probs_index], index
def create_feature_maps(init_channel_number, number_of_fmaps):
return [init_channel_number * 2 ** k for k in range(number_of_fmaps)]
# Code taken from
def adapted_rand(seg, gt, all_stats=False):
"""Compute Adapted Rand error as defined by the SNEMI3D contest [1]
Formula is given as 1 - the maximal F-score of the Rand index
(excluding the zero component of the original labels). Adapted
from the SNEMI3D MATLAB script, hence the strange style.
seg : np.ndarray
the segmentation to score, where each value is the label at that point
gt : np.ndarray, same shape as seg
the groundtruth to score against, where each value is a label
all_stats : boolean, optional
whether to also return precision and recall as a 3-tuple with rand_error
are : float
The adapted Rand error; equal to $1 - \frac{2pr}{p + r}$,
where $p$ and $r$ are the precision and recall described below.
prec : float, optional
The adapted Rand precision. (Only returned when `all_stats` is ``True``.)
rec : float, optional
The adapted Rand recall. (Only returned when `all_stats` is ``True``.)
# just to prevent division by 0
epsilon = 1e-6
# segA is truth, segB is query
segA = np.ravel(gt)
segB = np.ravel(seg)
n = segA.size
n_labels_A = np.amax(segA) + 1
n_labels_B = np.amax(segB) + 1
ones_data = np.ones(n)
p_ij = sparse.csr_matrix((ones_data, (segA[:], segB[:])), shape=(n_labels_A, n_labels_B))
a = p_ij[1:n_labels_A, :]
b = p_ij[1:n_labels_A, 1:n_labels_B]
c = p_ij[1:n_labels_A, 0].todense()
d = b.multiply(b)
a_i = np.array(a.sum(1))
b_i = np.array(b.sum(0))
sumA = np.sum(a_i * a_i)
sumB = np.sum(b_i * b_i) + (np.sum(c) / n)
sumAB = np.sum(d) + (np.sum(c) / n)
precision = sumAB / max(sumB, epsilon)
recall = sumAB / max(sumA, epsilon)
fScore = 2.0 * precision * recall / max(precision + recall, epsilon)
are = 1.0 - fScore
if all_stats:
return are, precision, recall
return are
