Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 135 additions & 114 deletions darwin/exporter/formats/nifti.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from rich.console import Console
from rich.theme import Theme

from darwin.utils.utils import get_annotations_in_slot


def _console_theme() -> Theme:
return Theme(
Expand Down Expand Up @@ -76,97 +78,108 @@ def export(
sends output volumes, image_id and output_dir to the write_output_volume_to_disk function

"""
video_annotations = list(annotation_files)
for video_annotation in video_annotations:
slot_name = video_annotation.slots[0].name
try:
medical_metadata = video_annotation.slots[0].metadata
legacy = not medical_metadata.get("handler") == "MONAI" # type: ignore
plane_map = medical_metadata.get("plane_map", {slot_name: "AXIAL"})
primary_plane = medical_metadata.get(
"primary_plane", plane_map.get(slot_name, "AXIAL")
)
except (KeyError, AttributeError):
legacy = True
primary_plane = "AXIAL"

image_id = check_for_error_and_return_imageid(video_annotation, output_dir)
for annotation_file in annotation_files:
slot_map = {slot.name: slot for slot in annotation_file.slots}
image_id = check_for_error_and_return_imageid(annotation_file, output_dir)
if not isinstance(image_id, str):
continue
polygon_class_names = [
ann.annotation_class.name
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "polygon"
]
# Check if there are any rasters in the annotation, these are created with a _m suffix
# in addition to those created from polygons.
annotation_types = [
a.annotation_class.annotation_type for a in video_annotation.annotations
]
mask_present = "raster_layer" in annotation_types and "mask" in annotation_types
output_volumes = build_output_volumes(
video_annotation,
class_names_to_export=polygon_class_names,
from_raster_layer=False,
mask_present=mask_present,
primary_plane=primary_plane,
)
slot_map = {slot.name: slot for slot in video_annotation.slots}
polygon_annotations = [
ann
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "polygon"
]
if polygon_annotations:
populate_output_volumes_from_polygons(
polygon_annotations, slot_map, output_volumes, legacy=legacy

for slot in annotation_file.slots:
slot_name = slot.name
slot_annotations = get_annotations_in_slot(
slot_name, annotation_file.annotations
)
write_output_volume_to_disk(
output_volumes,
image_id=image_id,
output_dir=output_dir,
legacy=legacy,
filename=video_annotation.filename,
)
# Need to map raster layers to SeriesInstanceUIDs
if mask_present:
mask_id_to_classname = {
ann.id: ann.annotation_class.name
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "mask"
}
raster_output_volumes = build_output_volumes(
video_annotation,
class_names_to_export=list(mask_id_to_classname.values()),
from_raster_layer=True,

try:
medical_metadata = slot.metadata
legacy = not medical_metadata.get("handler") == "MONAI" # type: ignore
plane_map = medical_metadata.get("plane_map", {slot_name: "AXIAL"})
primary_plane = medical_metadata.get(
"primary_plane", plane_map.get(slot_name, "AXIAL")
)
except (KeyError, AttributeError):
legacy = True
primary_plane = "AXIAL"

polygon_class_names = [
ann.annotation_class.name
for ann in slot_annotations
if ann.annotation_class.annotation_type == "polygon"
]
# Check if there are any rasters in the annotation, these are created with a _m suffix
# in addition to those created from polygons.
annotation_types = [
a.annotation_class.annotation_type for a in slot_annotations
]
mask_present = (
"raster_layer" in annotation_types and "mask" in annotation_types
)
output_volumes = build_output_volumes(
slot,
class_names_to_export=polygon_class_names,
from_raster_layer=False,
mask_present=mask_present,
primary_plane=primary_plane,
)

# This assumes only one raster_layer annotation. If we allow multiple raster layers per annotation file we need to change this.
raster_layer_annotation = [
polygon_annotations = [
ann
for ann in video_annotation.annotations
if ann.annotation_class.annotation_type == "raster_layer"
][0]
if raster_layer_annotation:
populate_output_volumes_from_raster_layer(
annotation=raster_layer_annotation,
mask_id_to_classname=mask_id_to_classname,
slot_map=slot_map,
output_volumes=raster_output_volumes,
primary_plane=primary_plane,
for ann in slot_annotations
if ann.annotation_class.annotation_type == "polygon"
]
if polygon_annotations:
populate_output_volumes_from_polygons(
polygon_annotations, slot_map, output_volumes, legacy=legacy
)
write_output_volume_to_disk(
raster_output_volumes,
output_volumes,
image_id=image_id,
output_dir=output_dir,
legacy=legacy,
filename=video_annotation.filename,
item_name=annotation_file.path.stem,
slot_name=slot.name,
filename=slot.source_files[0].file_name,
)
# Need to map raster layers to SeriesInstanceUIDs
if mask_present:
mask_id_to_classname = {
ann.id: ann.annotation_class.name
for ann in slot_annotations
if ann.annotation_class.annotation_type == "mask"
}
raster_output_volumes = build_output_volumes(
slot,
class_names_to_export=list(mask_id_to_classname.values()),
from_raster_layer=True,
primary_plane=primary_plane,
)

# This assumes only one raster_layer annotation. If we allow multiple raster layers per annotation file we need to change this.
raster_layer_annotation = [
ann
for ann in slot_annotations
if ann.annotation_class.annotation_type == "raster_layer"
][0]
if raster_layer_annotation:
populate_output_volumes_from_raster_layer(
annotation=raster_layer_annotation,
mask_id_to_classname=mask_id_to_classname,
slot_map=slot_map,
output_volumes=raster_output_volumes,
primary_plane=primary_plane,
)
write_output_volume_to_disk(
raster_output_volumes,
image_id=image_id,
output_dir=output_dir,
legacy=legacy,
item_name=annotation_file.path.stem,
slot_name=slot.name,
filename=slot.source_files[0].file_name,
)


def build_output_volumes(
video_annotation: dt.AnnotationFile,
slot: dt.Slot,
from_raster_layer: bool = False,
class_names_to_export: List[str] = None,
mask_present: Optional[bool] = False,
Expand All @@ -177,7 +190,7 @@ def build_output_volumes(

Parameters
----------
video_annotation : dt.AnnotationFile
annotation_file : dt.AnnotationFile
The ``AnnotationFile``\\s to be exported.
from_raster_layer : bool
Whether the output volumes are being built from raster layers or not
Expand All @@ -196,21 +209,20 @@ def build_output_volumes(
# Builds a map of class to integer, if its a polygon we use the class name as is
# for the mask annotations we append a suffix _m to ensure backwards compatibility

output_volumes = {}
for slot in video_annotation.slots:
slot_metadata = slot.metadata
assert slot_metadata is not None
series_instance_uid = slot_metadata.get(
"SeriesInstanceUID", "SeriesIntanceUIDNotProvided"
)
# Builds output volumes per class
volume_dims, pixdims, affine, original_affine = process_metadata(slot.metadata)
if not mask_present and not class_names_to_export:
class_names_to_export = [
""
] # If there are no annotations to export, we still need to create an empty volume

output_volumes[series_instance_uid] = {
slot_metadata = slot.metadata
assert slot_metadata is not None
series_instance_uid = slot_metadata.get(
"SeriesInstanceUID", "SeriesIntanceUIDNotProvided"
)
# Builds output volumes per class
volume_dims, pixdims, affine, original_affine = process_metadata(slot_metadata)
if not mask_present and not class_names_to_export:
class_names_to_export = [
""
] # If there are no annotations to export, we still need to create an empty volume

return {
series_instance_uid: {
class_name: Volume(
pixel_array=np.zeros(volume_dims, dtype=np.uint8),
affine=affine,
Expand All @@ -223,19 +235,19 @@ def build_output_volumes(
primary_plane=primary_plane,
)
for class_name in class_names_to_export
}
return output_volumes
},
}


def check_for_error_and_return_imageid(
video_annotation: dt.AnnotationFile, output_dir: Path
annotation_file: dt.AnnotationFile, output_dir: Path
) -> Union[str, bool]:
"""Given the video_annotation file and the output directory, checks for a range of errors and
"""Given the annotation_file file and the output directory, checks for a range of errors and
returns messages accordingly.

Parameters
----------
video_annotation : dt.AnnotationFile
annotation_file : dt.AnnotationFile
The ``AnnotationFile``\\s to be exported.
output_dir : Path
The folder where the new instance mask files will be.
Expand All @@ -246,7 +258,7 @@ def check_for_error_and_return_imageid(
Returns the image_id if no errors are found, otherwise returns False
"""
# Check if all item slots have the correct file-extension
for slot in video_annotation.slots:
for slot in annotation_file.slots:
for source_file in slot.source_files:
filename = Path(source_file.file_name)
if not (
Expand All @@ -260,7 +272,7 @@ def check_for_error_and_return_imageid(
str(filename),
)

filename = Path(video_annotation.filename)
filename = Path(annotation_file.filename)
if filename.name.lower().endswith(".nii.gz"):
image_id = re.sub(r"(?i)\.nii\.gz$", "", str(filename))
elif filename.name.lower().endswith(".nii"):
Expand All @@ -270,17 +282,7 @@ def check_for_error_and_return_imageid(
else:
image_id = str(filename)

if video_annotation is None:
return create_error_message_json(
"video_annotation not found", output_dir, image_id
)
if video_annotation is None:
return create_error_message_json(
"video_annotation not found", output_dir, image_id
)

for slot in video_annotation.slots:
# Pick the first slot to take the metadata from. We assume that all slots have the same metadata.
for slot in annotation_file.slots:
metadata = slot.metadata
if metadata is None:
return create_error_message_json(
Expand Down Expand Up @@ -503,7 +505,9 @@ def write_output_volume_to_disk(
output_volumes: Dict,
image_id: str,
output_dir: Union[str, Path],
item_name: str,
legacy: bool = False,
slot_name: str = "0", # default slot name is "0"
filename: str = None,
) -> None:
"""Writes the given output volumes to disk.
Expand All @@ -519,8 +523,12 @@ def write_output_volume_to_disk(
legacy : bool, default=False
If ``True``, the exporter will use the legacy calculation.
If ``False``, the exporter will use the new calculation by dividing with pixdims.
filename: str
The filename of the dataset item
item_name : str
Name of the dataset item.
slot_name : str
Name of the dataset item slot the volume belongs to.
filename : str
Name of the file occupying the dataset item slot.

Returns
-------
Expand All @@ -544,10 +552,23 @@ def unnest_dict_to_list(d: Dict) -> List:
affine=volume.affine,
)
img = _get_reoriented_nifti_image(img, volume, legacy, filename)
filename_stem = Path(
Path(filename).stem
).stem # We take stem twice to handle ".nii.gz" suffixes
if volume.from_raster_layer:
output_path = Path(output_dir) / f"{image_id}_{volume.class_name}_m.nii.gz"
output_path = (
Path(output_dir)
/ item_name
/ slot_name
/ f"{filename_stem}_{volume.class_name}_m.nii.gz"
)
else:
output_path = Path(output_dir) / f"{image_id}_{volume.class_name}.nii.gz"
output_path = (
Path(output_dir)
/ item_name
/ slot_name
/ f"{filename_stem}_{volume.class_name}.nii.gz"
)
if not output_path.parent.exists():
output_path.parent.mkdir(parents=True)
nib.save(img=img, filename=output_path)
Expand Down
13 changes: 13 additions & 0 deletions darwin/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
Expand Down Expand Up @@ -627,6 +628,18 @@ def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile:
return annotation_file


def get_annotations_in_slot(
slot_name: str, annotations: Sequence[Union[dt.Annotation, dt.VideoAnnotation]]
) -> List[Union[dt.Annotation, dt.VideoAnnotation]]:
return [
annotation
for annotation in annotations
if hasattr(annotation, "slot_names")
and annotation.slot_names
and annotation.slot_names[0] == slot_name
]


def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot:
source_files_data = data.get("source_files", [])
source_files = [
Expand Down
5 changes: 5 additions & 0 deletions e2e_tests/cli/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def compare_directories(self, path: Path, expected_path: Path) -> None:
data_path / "nifti-no-legacy-scaling/from",
data_path / "nifti-no-legacy-scaling/to",
),
(
"nifti",
data_path / "nifti-multislot/from",
data_path / "nifti-multislot/to",
),
(
"instance_mask",
data_path / "instance_mask/from",
Expand Down
Loading