Skip to content

Commit

Permalink
[MED-22][external] import nifti to raster layer mask annotations (#725)
Browse files Browse the repository at this point in the history
Introduced the ability to import NifTI labels into the raster/mask layer
  • Loading branch information
tomvars committed Dec 19, 2023
1 parent 7cee58c commit e3165e4
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 14 deletions.
148 changes: 135 additions & 13 deletions darwin/importer/formats/nifti.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import uuid
import warnings
from collections import OrderedDict, defaultdict
from pathlib import Path
Expand Down Expand Up @@ -102,7 +103,7 @@ def _parse_nifti(
class_img = np.isin(img, class_idxs).astype(np.uint8)
cc_img, num_labels = cc3d.connected_components(class_img, return_N=True)
for instance_id in range(1, num_labels):
_video_annotations = get_video_annotation(
_video_annotations = get_polygon_video_annotations(
cc_img,
class_idxs=[instance_id],
class_name=class_name,
Expand All @@ -116,7 +117,7 @@ def _parse_nifti(
for class_name, class_idxs in processed_class_map.items():
if class_name == "background":
continue
_video_annotations = get_video_annotation(
_video_annotations = get_polygon_video_annotations(
img,
class_idxs=class_idxs,
class_name=class_name,
Expand All @@ -127,10 +128,23 @@ def _parse_nifti(
if _video_annotations is None:
continue
video_annotations += _video_annotations
annotation_classes = {
dt.AnnotationClass(class_name, "polygon", "polygon")
for class_name in class_map.values()
}
elif mode == "mask":
video_annotations = get_mask_video_annotations(
img,
processed_class_map,
slot_names,
)
if mode in ["video", "instances"]:
annotation_classes = {
dt.AnnotationClass(class_name, "polygon", "polygon")
for class_name in class_map.values()
}
elif mode == "mask":
annotation_classes = {
dt.AnnotationClass(class_name, "mask", "mask")
for class_name in class_map.values()
}

return dt.AnnotationFile(
path=json_path,
filename=str(filename),
Expand All @@ -148,7 +162,7 @@ def _parse_nifti(
)


def get_video_annotation(
def get_polygon_video_annotations(
volume: np.ndarray,
class_name: str,
class_idxs: List[int],
Expand All @@ -157,13 +171,18 @@ def get_video_annotation(
pixdims: Tuple[float],
) -> Optional[List[dt.VideoAnnotation]]:
if not is_mpr:
return nifti_to_video_annotation(
volume, class_name, class_idxs, slot_names, view_idx=2, pixdims=pixdims
return nifti_to_video_polygon_annotation(
volume,
class_name,
class_idxs,
slot_names,
view_idx=2,
pixdims=pixdims,
)
elif is_mpr and len(slot_names) == 3:
video_annotations = []
for view_idx, slot_name in enumerate(slot_names):
_video_annotations = nifti_to_video_annotation(
_video_annotations = nifti_to_video_polygon_annotation(
volume,
class_name,
class_idxs,
Expand All @@ -177,9 +196,99 @@ def get_video_annotation(
raise Exception("If is_mpr is True, slot_names must be of length 3")


def nifti_to_video_annotation(
volume, class_name, class_idxs, slot_names, view_idx=2, pixdims=(1, 1, 1)
):
def get_mask_video_annotations(
volume: np.ndarray, processed_class_map: Dict, slot_names: List[str]
) -> Optional[List[dt.VideoAnnotation]]:
"""
The function takes a volume and a class map and returns a list of video annotations
We write a single raster layer for the volume but K mask annotations, where K is the number of classes.
"""
frame_annotations = OrderedDict()
all_mask_annotations = defaultdict(lambda: OrderedDict())
# This is a dictionary of class_names to generated mask_annotation_ids
mask_annotation_ids = {
class_name: str(uuid.uuid4()) for class_name in processed_class_map.keys()
}
# We need to create a new mapping dictionary where the keys are the mask_annotation_ids
# and the values are the new integers which we use in the raster layer
mask_annotation_ids_mapping = {}
map_from_nifti_idx_to_raster_idx = {0: 0} # 0 is the background class
for i in range(volume.shape[2]):
slice_mask = volume[:, :, i].astype(np.uint8)
for raster_idx, (class_name, class_idxs) in enumerate(
processed_class_map.items()
):
if class_name == "background":
continue
class_mask = np.isin(slice_mask, class_idxs).astype(np.uint8).copy()
if class_mask.sum() == 0:
continue
all_mask_annotations[class_name][i] = dt.make_mask(
class_name, subs=None, slot_names=slot_names
)
all_mask_annotations[class_name][i].id = mask_annotation_ids[class_name]
mask_annotation_ids_mapping[mask_annotation_ids[class_name]] = (
raster_idx + 1
)
for class_idx in class_idxs:
map_from_nifti_idx_to_raster_idx[class_idx] = raster_idx + 1
# Now that we've created all the mask annotations, we need to create the raster layer
# We only map the mask_annotation_ids which appear in any given frame.
for i in range(volume.shape[2]):
slice_mask = volume[:, :, i].astype(
np.uint8
) # Product requirement: We only support 255 classes!
# We need to convert from nifti_idx to raster_idx
slice_mask = np.vectorize(
lambda key: map_from_nifti_idx_to_raster_idx.get(key, 0)
)(slice_mask)
dense_rle = convert_to_dense_rle(slice_mask)
raster_annotation = dt.make_raster_layer(
class_name="__raster_layer__",
mask_annotation_ids_mapping=mask_annotation_ids_mapping,
total_pixels=class_mask.size,
dense_rle=dense_rle,
slot_names=slot_names,
)
frame_annotations[i] = raster_annotation
all_frame_ids = list(frame_annotations.keys())
if not all_frame_ids:
return None
if len(all_frame_ids) == 1:
segments = [[all_frame_ids[0], all_frame_ids[0] + 1]]
elif len(all_frame_ids) > 1:
segments = [[min(all_frame_ids), max(all_frame_ids)]]
raster_video_annotation = dt.make_video_annotation(
frame_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
mask_video_annotations = []
for class_name, mask_annotations in all_mask_annotations.items():
mask_video_annotation = dt.make_video_annotation(
mask_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
mask_video_annotation.id = mask_annotation_ids[class_name]
mask_video_annotations.append(mask_video_annotation)

return [raster_video_annotation] + mask_video_annotations


def nifti_to_video_polygon_annotation(
volume: np.ndarray,
class_name: str,
class_idxs: List[int],
slot_names: List[str],
view_idx: int = 2,
pixdims: Tuple[int, int, int] = (1, 1, 1),
) -> Optional[List[dt.VideoAnnotation]]:
frame_annotations = OrderedDict()
for i in range(volume.shape[view_idx]):
if view_idx == 2:
Expand Down Expand Up @@ -376,3 +485,16 @@ def process_nifti(
data_array = nib.orientations.apply_orientation(img.get_fdata(), ornt)
pixdims = img.header.get_zooms()
return data_array, pixdims


def convert_to_dense_rle(raster: np.ndarray) -> List[int]:
dense_rle, prev_val, cnt = [], None, 0
for val in raster.T.flat:
if val == prev_val:
cnt += 1
else:
if prev_val is not None:
dense_rle.extend([int(prev_val), int(cnt)])
prev_val, cnt = val, 1
dense_rle.extend([int(prev_val), int(cnt)])
return dense_rle
2 changes: 1 addition & 1 deletion darwin/importer/formats/nifti_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"image": {"type": "string"},
"label": {"type": "string"},
"class_map": class_map,
"mode": {"type": "string", "enum": ["video", "instances"]},
"mode": {"type": "string", "enum": ["video", "instances", "mask"]},
"is_mpr": {"type": "boolean"},
"slot_names": {"type": "array", "items": {"type": "string"}},
},
Expand Down
2 changes: 2 additions & 0 deletions darwin/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,8 @@ def _import_annotations(
if (
annotation_type not in remote_classes
or annotation_class.name not in remote_classes[annotation_type]
and annotation_type
!= "raster_layer" # We do not skip raster layers as they are always available.
):
if annotation_type not in remote_classes:
logger.warning(
Expand Down
59 changes: 59 additions & 0 deletions tests/darwin/importer/formats/import_nifti_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,65 @@ def test_image_annotation_nifti_import_incorrect_number_slot(team_slug: str):
parse_path(path=upload_json)


def test_image_annotation_nifti_import_single_slot_to_mask(team_slug: str):
with tempfile.TemporaryDirectory() as tmpdir:
with ZipFile("tests/data.zip") as zfile:
zfile.extractall(tmpdir)
label_path = (
Path(tmpdir)
/ team_slug
/ "nifti"
/ "releases"
/ "latest"
/ "annotations"
/ "vol0_brain.nii.gz"
)
input_dict = {
"data": [
{
"image": "vol0 (1).nii",
"label": str(label_path),
"class_map": {"1": "brain"},
"mode": "mask",
"is_mpr": False,
"slot_names": ["0.1"],
}
]
}
upload_json = Path(tmpdir) / "annotations.json"
upload_json.write_text(
json.dumps(input_dict, indent=4, sort_keys=True, default=str)
)
annotation_files = parse_path(path=upload_json)
annotation_file = annotation_files[0]
output_json_string = json.loads(
serialise_annotation_file(annotation_file, as_dict=False)
)
expected_json_string = json.load(
open(
Path(tmpdir)
/ team_slug
/ "nifti"
/ "vol0_annotation_file_to_mask.json",
"r",
)
)
# This needs to not check for mask_annotation_ids_mapping as these
# are randomly generated
[
frame.get("raster_layer", {}).pop("mask_annotation_ids_mapping")
for frame in output_json_string["annotations"][0]["frames"].values()
]
[
frame.get("raster_layer", {}).pop("mask_annotation_ids_mapping")
for frame in expected_json_string["annotations"][0]["frames"].values()
]
assert (
output_json_string["annotations"][0]["frames"]
== expected_json_string["annotations"][0]["frames"]
)


def serialise_annotation_file(
annotation_file: AnnotationFile, as_dict
) -> Union[str, dict]:
Expand Down
Binary file modified tests/data.zip
Binary file not shown.

0 comments on commit e3165e4

Please sign in to comment.