From 79859ab4efc4003958db16435cbe68a4278bcd89 Mon Sep 17 00:00:00 2001 From: John Wilkie <124276291+JBWilkie@users.noreply.github.com> Date: Tue, 19 Dec 2023 15:35:39 +0000 Subject: [PATCH] [PY-645][externa] Improved tolerance for dots in filenames & test linting (#746) * Improved tolerance for dots in filenames & test linting * Fixed broken tests * Added case-insensitivity * Case insensitivity for dataset item name * Do not set image_id to lowercase --- darwin/dataset/local_dataset.py | 47 +++-- darwin/exporter/formats/darwin_1_0.py | 42 +++-- darwin/exporter/formats/nifti.py | 49 ++--- darwin/importer/formats/csv_tags_video.py | 4 +- darwin/importer/importer.py | 4 +- darwin/torch/dataset.py | 31 +++- darwin/utils/utils.py | 209 ++++++++++++++++------ 7 files changed, 267 insertions(+), 119 deletions(-) diff --git a/darwin/dataset/local_dataset.py b/darwin/dataset/local_dataset.py index b8c48a7ea..b2c4dff89 100644 --- a/darwin/dataset/local_dataset.py +++ b/darwin/dataset/local_dataset.py @@ -80,7 +80,9 @@ def __init__( self.original_annotations_path: Optional[List[Path]] = None self.keep_empty_annotations = keep_empty_annotations - release_path, annotations_dir, images_dir = self._initial_setup(dataset_path, release_name) + release_path, annotations_dir, images_dir = self._initial_setup( + dataset_path, release_name + ) self._validate_inputs(partition, split_type, annotation_type) # Get the list of classes @@ -120,7 +122,9 @@ def _validate_inputs(self, partition, split_type, annotation_type): if split_type not in ["random", "stratified"]: raise ValueError("split_type should be either 'random', 'stratified'") if annotation_type not in ["tag", "polygon", "bounding_box"]: - raise ValueError("annotation_type should be either 'tag', 'bounding_box', or 'polygon'") + raise ValueError( + "annotation_type should be either 'tag', 'bounding_box', or 'polygon'" + ) def _setup_annotations_and_images( self, @@ -148,7 +152,9 @@ def _setup_annotations_and_images( darwin_json, images_dir, with_folders, json_version, annotation_filepath ) if image_path.exists(): - if not keep_empty_annotations and is_stream_list_empty(darwin_json["annotations"]): + if not keep_empty_annotations and is_stream_list_empty( + darwin_json["annotations"] + ): continue self.images_path.append(image_path) self.annotations_path.append(annotation_filepath) @@ -215,7 +221,9 @@ def get_height_and_width(self, index: int) -> Tuple[float, float]: parsed = parse_darwin_json(self.annotations_path[index], index) return parsed.image_height, parsed.image_width - def extend(self, dataset: "LocalDataset", extend_classes: bool = False) -> "LocalDataset": + def extend( + self, dataset: "LocalDataset", extend_classes: bool = False + ) -> "LocalDataset": """ Extends the current dataset with another one. @@ -310,7 +318,10 @@ def parse_json(self, index: int) -> Dict[str, Any]: # Filter out unused classes and annotations of a different type if self.classes is not None: annotations = [ - a for a in annotations if a.annotation_class.name in self.classes and self.annotation_type_supported(a) + a + for a in annotations + if a.annotation_class.name in self.classes + and self.annotation_type_supported(a) ] return { "image_id": index, @@ -327,15 +338,20 @@ def annotation_type_supported(self, annotation) -> bool: elif self.annotation_type == "bounding_box": is_bounding_box = annotation_type == "bounding_box" is_supported_polygon = ( - annotation_type in ["polygon", "complex_polygon"] and "bounding_box" in annotation.data + annotation_type in ["polygon", "complex_polygon"] + and "bounding_box" in annotation.data ) return is_bounding_box or is_supported_polygon elif self.annotation_type == "polygon": return annotation_type in ["polygon", "complex_polygon"] else: - raise ValueError("annotation_type should be either 'tag', 'bounding_box', or 'polygon'") + raise ValueError( + "annotation_type should be either 'tag', 'bounding_box', or 'polygon'" + ) - def measure_mean_std(self, multi_threaded: bool = True) -> Tuple[np.ndarray, np.ndarray]: + def measure_mean_std( + self, multi_threaded: bool = True + ) -> Tuple[np.ndarray, np.ndarray]: """ Computes mean and std of trained images, given the train loader. @@ -358,7 +374,9 @@ def measure_mean_std(self, multi_threaded: bool = True) -> Tuple[np.ndarray, np. results = pool.map(self._return_mean, self.images_path) mean = np.sum(np.array(results), axis=0) / len(self.images_path) # Online image_classification deviation - results = pool.starmap(self._return_std, [[item, mean] for item in self.images_path]) + results = pool.starmap( + self._return_std, [[item, mean] for item in self.images_path] + ) std_sum = np.sum(np.array([item[0] for item in results]), axis=0) total_pixel_count = np.sum(np.array([item[1] for item in results])) std = np.sqrt(std_sum / total_pixel_count) @@ -404,14 +422,20 @@ def _compute_weights(labels: List[int]) -> np.ndarray: @staticmethod def _return_mean(image_path: Path) -> np.ndarray: img = np.array(load_pil_image(image_path)) - mean = np.array([np.mean(img[:, :, 0]), np.mean(img[:, :, 1]), np.mean(img[:, :, 2])]) + mean = np.array( + [np.mean(img[:, :, 0]), np.mean(img[:, :, 1]), np.mean(img[:, :, 2])] + ) return mean / 255.0 # Loads an image with OpenCV and returns the channel wise std of the image. @staticmethod def _return_std(image_path: Path, mean: np.ndarray) -> Tuple[np.ndarray, float]: img = np.array(load_pil_image(image_path)) / 255.0 - m2 = np.square(np.array([img[:, :, 0] - mean[0], img[:, :, 1] - mean[1], img[:, :, 2] - mean[2]])) + m2 = np.square( + np.array( + [img[:, :, 0] - mean[0], img[:, :, 1] - mean[1], img[:, :, 2] - mean[2]] + ) + ) return np.sum(np.sum(m2, axis=1), 1), m2.size / 3.0 def __getitem__(self, index: int): @@ -482,7 +506,6 @@ def get_annotation_filepaths( if partition is None: return (str(e) for e in sorted(annotations_dir.glob("**/*.json"))) - if split_type == "random": split_filename = f"{split_type}_{partition}.txt" elif split_type == "stratified": diff --git a/darwin/exporter/formats/darwin_1_0.py b/darwin/exporter/formats/darwin_1_0.py index f78af61e8..4adc6b3ad 100644 --- a/darwin/exporter/formats/darwin_1_0.py +++ b/darwin/exporter/formats/darwin_1_0.py @@ -45,17 +45,23 @@ def _export_file(annotation_file: AnnotationFile, _: int, output_dir: Path) -> N try: output: DictFreeForm = _build_json(annotation_file) except Exception as e: - raise ExportException_CouldNotBuildOutput(f"Could not build output for {annotation_file.path}") from e + raise ExportException_CouldNotBuildOutput( + f"Could not build output for {annotation_file.path}" + ) from e try: with open(output_file_path, "w") as f: op = json.dumps( output, - option=json.OPT_INDENT_2 | json.OPT_SERIALIZE_NUMPY | json.OPT_NON_STR_KEYS, + option=json.OPT_INDENT_2 + | json.OPT_SERIALIZE_NUMPY + | json.OPT_NON_STR_KEYS, ).decode("utf-8") f.write(op) except Exception as e: - raise ExportException_CouldNotWriteFile(f"Could not write output for {annotation_file.path}") from e + raise ExportException_CouldNotWriteFile( + f"Could not write output for {annotation_file.path}" + ) from e def _build_json(annotation_file: AnnotationFile) -> DictFreeForm: @@ -130,11 +136,17 @@ def _build_sub_annotation(sub: SubAnnotation) -> DictFreeForm: def _build_authorship(annotation: Union[VideoAnnotation, Annotation]) -> DictFreeForm: annotators = {} if annotation.annotators: - annotators = {"annotators": [_build_author(annotator) for annotator in annotation.annotators]} + annotators = { + "annotators": [ + _build_author(annotator) for annotator in annotation.annotators + ] + } reviewers = {} if annotation.reviewers: - reviewers = {"annotators": [_build_author(reviewer) for reviewer in annotation.reviewers]} + reviewers = { + "annotators": [_build_author(reviewer) for reviewer in annotation.reviewers] + } return {**annotators, **reviewers} @@ -143,7 +155,9 @@ def _build_video_annotation(annotation: VideoAnnotation) -> DictFreeForm: return { **annotation.get_data( only_keyframes=False, - post_processing=lambda annotation, _: _build_image_annotation(annotation, skip_slots=True), + post_processing=lambda annotation, _: _build_image_annotation( + annotation, skip_slots=True + ), ), "name": annotation.annotation_class.name, "slot_names": annotation.slot_names, @@ -151,7 +165,9 @@ def _build_video_annotation(annotation: VideoAnnotation) -> DictFreeForm: } -def _build_image_annotation(annotation: Annotation, skip_slots: bool = False) -> DictFreeForm: +def _build_image_annotation( + annotation: Annotation, skip_slots: bool = False +) -> DictFreeForm: json_subs = {} for sub in annotation.subs: json_subs.update(_build_sub_annotation(sub)) @@ -169,7 +185,9 @@ def _build_image_annotation(annotation: Annotation, skip_slots: bool = False) -> return {**base_json, "slot_names": annotation.slot_names} -def _build_legacy_annotation_data(annotation_class: AnnotationClass, data: DictFreeForm) -> DictFreeForm: +def _build_legacy_annotation_data( + annotation_class: AnnotationClass, data: DictFreeForm +) -> DictFreeForm: v1_data = {} polygon_annotation_mappings = {"complex_polygon": "paths", "polygon": "path"} @@ -232,7 +250,9 @@ def build_image_annotation(annotation_file: AnnotationFile) -> Dict[str, Any]: annotations: List[Dict[str, Any]] = [] for annotation in annotation_file.annotations: payload = { - annotation.annotation_class.annotation_type: _build_annotation_data(annotation), + annotation.annotation_class.annotation_type: _build_annotation_data( + annotation + ), "name": annotation.annotation_class.name, } @@ -260,6 +280,8 @@ def _build_annotation_data(annotation: Annotation) -> Dict[str, Any]: return {"path": annotation.data["paths"]} if annotation.annotation_class.annotation_type == "polygon": - return dict(filter(lambda item: item[0] != "bounding_box", annotation.data.items())) + return dict( + filter(lambda item: item[0] != "bounding_box", annotation.data.items()) + ) return dict(annotation.data) diff --git a/darwin/exporter/formats/nifti.py b/darwin/exporter/formats/nifti.py index a4379d918..71e27ab9c 100644 --- a/darwin/exporter/formats/nifti.py +++ b/darwin/exporter/formats/nifti.py @@ -1,5 +1,6 @@ import ast import json as native_json +import re from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Optional, Tuple, Union @@ -137,50 +138,28 @@ def check_for_error_and_return_imageid( image_id : str """ - # check if all item slots have the correct file-extension + # Check if all item slots have the correct file-extension for slot in video_annotation.slots: for source_file in slot.source_files: filename = Path(source_file["file_name"]) - - try: - suffixes = filename.suffixes[-2:] - except IndexError: - suffixes = filename.suffixes - if len(suffixes) == 2: - if suffixes[0] == ".nii" and suffixes[1] == ".gz": - image_id = str(filename).rstrip("".join(suffixes)) - else: - return create_error_message_json( - "Two suffixes found but not ending in .nii.gz", - output_dir, - str(filename), - ) - elif len(suffixes) == 1: - if suffixes[0] == ".nii" or suffixes[0] == ".dcm": - image_id = filename.stem - else: - return create_error_message_json( - "Misconfigured filename, not ending in .nii or .dcm. Are you sure this is medical data?", - output_dir, - str(filename), - ) - else: + if not ( + filename.name.lower().endswith(".nii.gz") + or filename.name.lower().endswith(".nii") + or filename.name.lower().endswith(".dcm") + ): return create_error_message_json( - "You are trying to export to nifti. Filename should contain either .nii, .nii.gz or .dcm extension." - "Are you sure this is medical data?", + "Misconfigured filename, not ending in .nii, .nii.gz or .dcm. Are you sure this is medical data?", output_dir, str(filename), ) filename = Path(video_annotation.filename) - try: - suffixes = filename.suffixes[-2:] - except IndexError: - suffixes = filename.suffixes - if len(suffixes) == 2: - image_id = str(filename).rstrip("".join(suffixes)) - elif len(suffixes) == 1: - image_id = str(filename.stem) + if filename.name.lower().endswith(".nii.gz"): + image_id = re.sub(r"(?i)\.nii\.gz$", "", str(filename)) + elif filename.name.lower().endswith(".nii"): + image_id = re.sub(r"(?i)\.nii$", "", str(filename)) + elif filename.name.lower().endswith(".dcm"): + image_id = re.sub(r"(?i)\.dcm$", "", str(filename)) else: image_id = str(filename) diff --git a/darwin/importer/formats/csv_tags_video.py b/darwin/importer/formats/csv_tags_video.py index a6885ac0c..fd9cdcb3b 100644 --- a/darwin/importer/formats/csv_tags_video.py +++ b/darwin/importer/formats/csv_tags_video.py @@ -51,9 +51,7 @@ def parse_path(path: Path) -> Optional[List[dt.AnnotationFile]]: file_annotation_map[filename].append(annotation) for filename in file_annotation_map: annotations = file_annotation_map[filename] - annotation_classes = { - annotation.annotation_class for annotation in annotations - } + annotation_classes = {annotation.annotation_class for annotation in annotations} filename_path = Path(filename) remote_path = str(filename_path.parent) if not remote_path.startswith("/"): diff --git a/darwin/importer/importer.py b/darwin/importer/importer.py index ce1c0676a..caf458491 100644 --- a/darwin/importer/importer.py +++ b/darwin/importer/importer.py @@ -603,7 +603,9 @@ def _warn_unsupported_annotations(parsed_files: List[AnnotationFile]) -> None: if annotation.annotation_class.annotation_type in UNSUPPORTED_CLASSES: skipped_annotations.append(annotation) if len(skipped_annotations) > 0: - types = {c.annotation_class.annotation_type for c in skipped_annotations} # noqa: C417 + types = { + c.annotation_class.annotation_type for c in skipped_annotations + } # noqa: C417 console.print( f"Import of annotation class types '{', '.join(types)}' is not yet supported. Skipping {len(skipped_annotations)} " + "annotations from '{parsed_file.full_path}'.\n", diff --git a/darwin/torch/dataset.py b/darwin/torch/dataset.py index b17a21238..84231ab55 100644 --- a/darwin/torch/dataset.py +++ b/darwin/torch/dataset.py @@ -99,7 +99,9 @@ class ClassificationDataset(LocalDataset): be composed via torchvision. """ - def __init__(self, transform: Optional[Union[Callable, List]] = None, **kwargs) -> None: + def __init__( + self, transform: Optional[Union[Callable, List]] = None, **kwargs + ) -> None: super().__init__(annotation_type="tag", **kwargs) if transform is not None and isinstance(transform, list): @@ -152,7 +154,11 @@ def get_target(self, index: int) -> Tensor: data = self.parse_json(index) annotations = data.pop("annotations") - tags = [a.annotation_class.name for a in annotations if a.annotation_class.annotation_type == "tag"] + tags = [ + a.annotation_class.name + for a in annotations + if a.annotation_class.annotation_type == "tag" + ] if not self.is_multi_label: # Binary or multiclass must have a label per image @@ -176,7 +182,11 @@ def check_if_multi_label(self) -> None: for idx in range(len(self)): target = self.parse_json(idx) annotations = target.pop("annotations") - tags = [a.annotation_class.name for a in annotations if a.annotation_class.annotation_type == "tag"] + tags = [ + a.annotation_class.name + for a in annotations + if a.annotation_class.annotation_type == "tag" + ] if len(tags) > 1: self.is_multi_label = True @@ -324,7 +334,9 @@ def get_target(self, index: int) -> Dict[str, Any]: path_key = "paths" if path_key not in annotation.data: - print(f"Warning: missing polygon in annotation {self.annotations_path[index]}") + print( + f"Warning: missing polygon in annotation {self.annotations_path[index]}" + ) # Extract the sequences of coordinates from the polygon annotation sequences = convert_polygons_to_sequences( annotation.data[path_key], @@ -353,7 +365,12 @@ def get_target(self, index: int) -> Dict[str, Any]: # Compute the area of the polygon # TODO fix with addictive/subtractive paths in complex polygons - poly_area: float = np.sum([polygon_area(x_coord, y_coord) for x_coord, y_coord in zip(x_coords, y_coords)]) + poly_area: float = np.sum( + [ + polygon_area(x_coord, y_coord) + for x_coord, y_coord in zip(x_coords, y_coords) + ] + ) # Create and append the new entry for this annotation annotations.append( @@ -405,7 +422,9 @@ class SemanticSegmentationDataset(LocalDataset): Object used to convert polygons to semantic masks. """ - def __init__(self, transform: Optional[Union[List[Callable], Callable]] = None, **kwargs): + def __init__( + self, transform: Optional[Union[List[Callable], Callable]] = None, **kwargs + ): super().__init__(annotation_type="polygon", **kwargs) if "__background__" not in self.classes: self.classes.insert(0, "__background__") diff --git a/darwin/utils/utils.py b/darwin/utils/utils.py index 721a0c8f6..951d6fbcd 100644 --- a/darwin/utils/utils.py +++ b/darwin/utils/utils.py @@ -215,7 +215,9 @@ def is_project_dir(project_path: Path) -> bool: return (project_path / "releases").exists() and (project_path / "images").exists() -def get_progress_bar(array: List[dt.AnnotationFile], description: Optional[str] = None) -> Iterable[ProgressType]: +def get_progress_bar( + array: List[dt.AnnotationFile], description: Optional[str] = None +) -> Iterable[ProgressType]: """ Get a rich a progress bar for the given list of annotation files. @@ -359,7 +361,9 @@ def persist_client_configuration( api_key=team_config.api_key, datasets_dir=team_config.datasets_dir, ) - config.set_global(api_endpoint=client.url, base_url=client.base_url, default_team=default_team) + config.set_global( + api_endpoint=client.url, base_url=client.base_url, default_team=default_team + ) return config @@ -416,7 +420,9 @@ def attempt_decode(path: Path) -> dict: return data except Exception: continue - raise UnrecognizableFileEncoding(f"Unable to load file {path} with any encodings: {encodings}") + raise UnrecognizableFileEncoding( + f"Unable to load file {path} with any encodings: {encodings}" + ) def load_data_from_file(path: Path) -> Tuple[dict, dt.AnnotationFileVersion]: @@ -425,7 +431,9 @@ def load_data_from_file(path: Path) -> Tuple[dict, dt.AnnotationFileVersion]: return data, version -def parse_darwin_json(path: Path, count: Optional[int] = None) -> Optional[dt.AnnotationFile]: +def parse_darwin_json( + path: Path, count: Optional[int] = None +) -> Optional[dt.AnnotationFile]: """ Parses the given JSON file in v7's darwin proprietary format. Works for images, split frame videos (treated as images) and playback videos. @@ -483,7 +491,7 @@ def stream_darwin_json(path: Path) -> PersistentStreamingJSONObject: with path.open() as infile: return json_stream.load(infile, persistent=True) - + def get_image_path_from_stream( darwin_json: PersistentStreamingJSONObject, @@ -558,7 +566,7 @@ def get_darwin_json_version(annotations_dir: Path) -> str: data_str = file.read() data = json.loads(data_str) return "2.0" if "version" in data and data["version"] == "2.0" else "1.0" - + def is_stream_list_empty(json_list: PersistentStreamingJSONList) -> bool: try: @@ -572,9 +580,15 @@ def is_stream_list_empty(json_list: PersistentStreamingJSONList) -> bool: def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile: item = data["item"] item_source = item.get("source_info", {}) - slots: List[dt.Slot] = list(filter(None, map(_parse_darwin_slot, item.get("slots", [])))) - annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(data) - annotation_classes: Set[dt.AnnotationClass] = {annotation.annotation_class for annotation in annotations} + slots: List[dt.Slot] = list( + filter(None, map(_parse_darwin_slot, item.get("slots", []))) + ) + annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations( + data + ) + annotation_classes: Set[dt.AnnotationClass] = { + annotation.annotation_class for annotation in annotations + } if len(slots) == 0: annotation_file = dt.AnnotationFile( @@ -582,7 +596,9 @@ def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile: path=path, filename=item["name"], item_id=item.get("source_info", {}).get("item_id", None), - dataset_name=item.get("source_info", {}).get("dataset", {}).get("name", None), + dataset_name=item.get("source_info", {}) + .get("dataset", {}) + .get("name", None), annotation_classes=annotation_classes, annotations=annotations, is_video=False, @@ -603,13 +619,17 @@ def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile: path=path, filename=item["name"], item_id=item.get("source_info", {}).get("item_id", None), - dataset_name=item.get("source_info", {}).get("dataset", {}).get("name", None), + dataset_name=item.get("source_info", {}) + .get("dataset", {}) + .get("name", None), annotation_classes=annotation_classes, annotations=annotations, is_video=slot.frame_urls is not None or slot.frame_manifest is not None, image_width=slot.width, image_height=slot.height, - image_url=None if len(slot.source_files or []) == 0 else slot.source_files[0]["url"], + image_url=None + if len(slot.source_files or []) == 0 + else slot.source_files[0]["url"], image_thumbnail_url=slot.thumbnail_url, workview_url=item_source.get("workview_url", None), seq=0, @@ -639,9 +659,15 @@ def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot: ) -def _parse_darwin_image(path: Path, data: Dict[str, Any], count: Optional[int]) -> dt.AnnotationFile: - annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(data) - annotation_classes: Set[dt.AnnotationClass] = {annotation.annotation_class for annotation in annotations} +def _parse_darwin_image( + path: Path, data: Dict[str, Any], count: Optional[int] +) -> dt.AnnotationFile: + annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations( + data + ) + annotation_classes: Set[dt.AnnotationClass] = { + annotation.annotation_class for annotation in annotations + } slot = dt.Slot( name=None, @@ -678,12 +704,20 @@ def _parse_darwin_image(path: Path, data: Dict[str, Any], count: Optional[int]) return annotation_file -def _parse_darwin_video(path: Path, data: Dict[str, Any], count: Optional[int]) -> dt.AnnotationFile: - annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(data) - annotation_classes: Set[dt.AnnotationClass] = {annotation.annotation_class for annotation in annotations} +def _parse_darwin_video( + path: Path, data: Dict[str, Any], count: Optional[int] +) -> dt.AnnotationFile: + annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations( + data + ) + annotation_classes: Set[dt.AnnotationClass] = { + annotation.annotation_class for annotation in annotations + } if "width" not in data["image"] or "height" not in data["image"]: - raise OutdatedDarwinJSONFormat("Missing width/height in video, please re-export") + raise OutdatedDarwinJSONFormat( + "Missing width/height in video, please re-export" + ) slot = dt.Slot( name=None, @@ -729,23 +763,41 @@ def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotati main_annotation: Optional[dt.Annotation] = None # Darwin JSON 2.0 representation of complex polygons - if "polygon" in annotation and "paths" in annotation["polygon"] and len(annotation["polygon"]["paths"]) > 1: + if ( + "polygon" in annotation + and "paths" in annotation["polygon"] + and len(annotation["polygon"]["paths"]) > 1 + ): bounding_box = annotation.get("bounding_box") paths = annotation["polygon"]["paths"] - main_annotation = dt.make_complex_polygon(name, paths, bounding_box, slot_names=slot_names) + main_annotation = dt.make_complex_polygon( + name, paths, bounding_box, slot_names=slot_names + ) # Darwin JSON 2.0 representation of simple polygons - elif "polygon" in annotation and "paths" in annotation["polygon"] and len(annotation["polygon"]["paths"]) == 1: + elif ( + "polygon" in annotation + and "paths" in annotation["polygon"] + and len(annotation["polygon"]["paths"]) == 1 + ): bounding_box = annotation.get("bounding_box") paths = annotation["polygon"]["paths"] - main_annotation = dt.make_polygon(name, paths[0], bounding_box, slot_names=slot_names) + main_annotation = dt.make_polygon( + name, paths[0], bounding_box, slot_names=slot_names + ) # Darwin JSON 1.0 representation of complex and simple polygons elif "polygon" in annotation: bounding_box = annotation.get("bounding_box") if "additional_paths" in annotation["polygon"]: - paths = [annotation["polygon"]["path"]] + annotation["polygon"]["additional_paths"] - main_annotation = dt.make_complex_polygon(name, paths, bounding_box, slot_names=slot_names) + paths = [annotation["polygon"]["path"]] + annotation["polygon"][ + "additional_paths" + ] + main_annotation = dt.make_complex_polygon( + name, paths, bounding_box, slot_names=slot_names + ) else: - main_annotation = dt.make_polygon(name, annotation["polygon"]["path"], bounding_box, slot_names=slot_names) + main_annotation = dt.make_polygon( + name, annotation["polygon"]["path"], bounding_box, slot_names=slot_names + ) # Darwin JSON 1.0 representation of complex polygons elif "complex_polygon" in annotation: bounding_box = annotation.get("bounding_box") @@ -757,7 +809,9 @@ def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotati if "additional_paths" in annotation["complex_polygon"]: paths.extend(annotation["complex_polygon"]["additional_paths"]) - main_annotation = dt.make_complex_polygon(name, paths, bounding_box, slot_names=slot_names) + main_annotation = dt.make_complex_polygon( + name, paths, bounding_box, slot_names=slot_names + ) elif "bounding_box" in annotation: bounding_box = annotation["bounding_box"] main_annotation = dt.make_bounding_box( @@ -771,7 +825,9 @@ def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotati elif "tag" in annotation: main_annotation = dt.make_tag(name, slot_names=slot_names) elif "line" in annotation: - main_annotation = dt.make_line(name, annotation["line"]["path"], slot_names=slot_names) + main_annotation = dt.make_line( + name, annotation["line"]["path"], slot_names=slot_names + ) elif "keypoint" in annotation: main_annotation = dt.make_keypoint( name, @@ -780,11 +836,17 @@ def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotati slot_names=slot_names, ) elif "ellipse" in annotation: - main_annotation = dt.make_ellipse(name, annotation["ellipse"], slot_names=slot_names) + main_annotation = dt.make_ellipse( + name, annotation["ellipse"], slot_names=slot_names + ) elif "cuboid" in annotation: - main_annotation = dt.make_cuboid(name, annotation["cuboid"], slot_names=slot_names) + main_annotation = dt.make_cuboid( + name, annotation["cuboid"], slot_names=slot_names + ) elif "skeleton" in annotation: - main_annotation = dt.make_skeleton(name, annotation["skeleton"]["nodes"], slot_names=slot_names) + main_annotation = dt.make_skeleton( + name, annotation["skeleton"]["nodes"], slot_names=slot_names + ) elif "table" in annotation: main_annotation = dt.make_table( name, @@ -793,7 +855,9 @@ def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotati slot_names=slot_names, ) elif "string" in annotation: - main_annotation = dt.make_string(name, annotation["string"]["sources"], slot_names=slot_names) + main_annotation = dt.make_string( + name, annotation["string"]["sources"], slot_names=slot_names + ) elif "graph" in annotation: main_annotation = dt.make_graph( name, @@ -820,19 +884,29 @@ def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotati if "id" in annotation: main_annotation.id = annotation["id"] if "instance_id" in annotation: - main_annotation.subs.append(dt.make_instance_id(annotation["instance_id"]["value"])) + main_annotation.subs.append( + dt.make_instance_id(annotation["instance_id"]["value"]) + ) if "attributes" in annotation: main_annotation.subs.append(dt.make_attributes(annotation["attributes"])) if "text" in annotation: main_annotation.subs.append(dt.make_text(annotation["text"]["text"])) if "inference" in annotation: - main_annotation.subs.append(dt.make_opaque_sub("inference", annotation["inference"])) + main_annotation.subs.append( + dt.make_opaque_sub("inference", annotation["inference"]) + ) if "directional_vector" in annotation: - main_annotation.subs.append(dt.make_opaque_sub("directional_vector", annotation["directional_vector"])) + main_annotation.subs.append( + dt.make_opaque_sub("directional_vector", annotation["directional_vector"]) + ) if "measures" in annotation: - main_annotation.subs.append(dt.make_opaque_sub("measures", annotation["measures"])) + main_annotation.subs.append( + dt.make_opaque_sub("measures", annotation["measures"]) + ) if "auto_annotate" in annotation: - main_annotation.subs.append(dt.make_opaque_sub("auto_annotate", annotation["auto_annotate"])) + main_annotation.subs.append( + dt.make_opaque_sub("auto_annotate", annotation["auto_annotate"]) + ) if annotation.get("annotators") is not None: main_annotation.annotators = _parse_annotators(annotation["annotators"]) @@ -890,7 +964,9 @@ def _parse_darwin_raster_annotation(annotation: dict) -> Optional[dt.Annotation] slot_names: Optional[List[str]] = parse_slot_names(annotation) if not id or not name or not raster_layer: - raise ValueError("Raster annotation must have an 'id', 'name' and 'raster_layer' field") + raise ValueError( + "Raster annotation must have an 'id', 'name' and 'raster_layer' field" + ) dense_rle, mask_annotation_ids_mapping, total_pixels = ( raster_layer.get("dense_rle", None), @@ -941,9 +1017,14 @@ def _parse_darwin_mask_annotation(annotation: dict) -> Optional[dt.Annotation]: def _parse_annotators(annotators: List[Dict[str, Any]]) -> List[dt.AnnotationAuthor]: if not (hasattr(annotators, "full_name") or not hasattr(annotators, "email")): - raise AttributeError("JSON file must contain annotators with 'full_name' and 'email' fields") + raise AttributeError( + "JSON file must contain annotators with 'full_name' and 'email' fields" + ) - return [dt.AnnotationAuthor(annotator["full_name"], annotator["email"]) for annotator in annotators] + return [ + dt.AnnotationAuthor(annotator["full_name"], annotator["email"]) + for annotator in annotators + ] def _parse_properties(properties: List[Dict[str, Any]]) -> Optional[List[SelectedProperty]]: @@ -994,9 +1075,13 @@ def split_video_annotation(annotation: dt.AnnotationFile) -> List[dt.AnnotationF frame_annotations = [] for i, frame_url in enumerate(urls): annotations = [ - a.frames[i] for a in annotation.annotations if isinstance(a, dt.VideoAnnotation) and i in a.frames + a.frames[i] + for a in annotation.annotations + if isinstance(a, dt.VideoAnnotation) and i in a.frames ] - annotation_classes: Set[dt.AnnotationClass] = {annotation.annotation_class for annotation in annotations} + annotation_classes: Set[dt.AnnotationClass] = { + annotation.annotation_class for annotation in annotations + } filename: str = f"{Path(annotation.filename).stem}/{i:07d}.png" frame_annotations.append( dt.AnnotationFile( @@ -1082,7 +1167,9 @@ def convert_polygons_to_sequences( else: list_polygons = cast(List[dt.Polygon], [polygons]) - if not isinstance(list_polygons[0], list) or not isinstance(list_polygons[0][0], dict): + if not isinstance(list_polygons[0], list) or not isinstance( + list_polygons[0][0], dict + ): raise ValueError("Unknown input format") sequences: List[List[Union[int, float]]] = [] @@ -1223,7 +1310,9 @@ def convert_bounding_box_to_xyxy(box: dt.BoundingBox) -> List[float]: return [box["x"], box["y"], x2, y2] -def convert_polygons_to_mask(polygons: List, height: int, width: int, value: Optional[int] = 1) -> np.ndarray: +def convert_polygons_to_mask( + polygons: List, height: int, width: int, value: Optional[int] = 1 +) -> np.ndarray: """ Converts a list of polygons, encoded as a list of dictionaries into an ``nd.array`` mask. @@ -1317,24 +1406,38 @@ def _parse_version(data: dict) -> dt.AnnotationFileVersion: return dt.AnnotationFileVersion(int(major), int(minor), suffix) -def _data_to_annotations(data: Dict[str, Any]) -> List[Union[dt.Annotation, dt.VideoAnnotation]]: +def _data_to_annotations( + data: Dict[str, Any] +) -> List[Union[dt.Annotation, dt.VideoAnnotation]]: raw_image_annotations = filter( lambda annotation: ( - ("frames" not in annotation) and ("raster_layer" not in annotation) and ("mask" not in annotation) + ("frames" not in annotation) + and ("raster_layer" not in annotation) + and ("mask" not in annotation) ), data["annotations"], ) - raw_video_annotations = filter(lambda annotation: "frames" in annotation, data["annotations"]) - raw_raster_annotations = filter(lambda annotation: "raster_layer" in annotation, data["annotations"]) - raw_mask_annotations = filter(lambda annotation: "mask" in annotation, data["annotations"]) - image_annotations: List[dt.Annotation] = list(filter(None, map(_parse_darwin_annotation, raw_image_annotations))) + raw_video_annotations = filter( + lambda annotation: "frames" in annotation, data["annotations"] + ) + raw_raster_annotations = filter( + lambda annotation: "raster_layer" in annotation, data["annotations"] + ) + raw_mask_annotations = filter( + lambda annotation: "mask" in annotation, data["annotations"] + ) + image_annotations: List[dt.Annotation] = list( + filter(None, map(_parse_darwin_annotation, raw_image_annotations)) + ) video_annotations: List[dt.VideoAnnotation] = list( filter(None, map(_parse_darwin_video_annotation, raw_video_annotations)) ) raster_annotations: List[dt.Annotation] = list( filter(None, map(_parse_darwin_raster_annotation, raw_raster_annotations)) ) - mask_annotations: List[dt.Annotation] = list(filter(None, map(_parse_darwin_mask_annotation, raw_mask_annotations))) + mask_annotations: List[dt.Annotation] = list( + filter(None, map(_parse_darwin_mask_annotation, raw_mask_annotations)) + ) return [ *image_annotations, @@ -1355,4 +1458,6 @@ def _supported_schema_versions() -> Dict[Tuple[int, int, str], str]: def _default_schema(version: dt.AnnotationFileVersion) -> Optional[str]: - return _supported_schema_versions().get((version.major, version.minor, version.suffix)) + return _supported_schema_versions().get( + (version.major, version.minor, version.suffix) + )