Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,9 @@ def _get_remote_files_that_require_legacy_scaling(self) -> List[Path]:
A list of full remote paths of dataset items that require NifTI annotations to be scaled
"""
remote_files_that_require_legacy_scaling = []
remote_files = self.fetch_remote_files()
remote_files = self.fetch_remote_files(
filters={"statuses": ["new", "annotate", "review", "complete", "archived"]}
)
Copy link
Contributor

@dorfmanrobert dorfmanrobert Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are workflow statuses not processing statuses right? PR description mentions uploading, processing, error, and archived; is that what should be used here?

Copy link
Contributor Author

@JBWilkie JBWilkie Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the status that list_items returns. In the backend, this is GeneralStatus, so it accounts for processing status

We need to use this status type, because list_items doesn't allow filtering based on processing status

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha thanks, I guess it would be nice to have types/objects that can be used for these but maybe a separate piece of work

for remote_file in remote_files:
if not (
remote_file.slots[0]
Expand Down
62 changes: 52 additions & 10 deletions darwin/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,31 +215,69 @@ def _build_attribute_lookup(dataset: "RemoteDataset") -> Dict[str, Unknown]:
return lookup


def _get_remote_files(
dataset: "RemoteDataset", filenames: List[str], chunk_size: int = 100
def _get_remote_files_ready_for_import(
dataset: "RemoteDataset",
filenames: List[str],
chunk_size: int = 100,
) -> Dict[str, Dict[str, Any]]:
"""
Fetches remote files from the datasets in chunks; by default 100 filenames at a time.
Fetches remote files that are ready for import from the datasets in chunks; by
default 100 filenames at a time.

The output is a dictionary for each remote file with the following keys:
- "item_id": Item ID
- "slot_names": A list of each slot name for the item
- "layout": The layout of the item

Fetching slot names & layout is necessary here to avoid double-trip to API downstream for remote files.

Raises a ValueError if any of the remote files are not in the `new`, `annotate`,
`review`, `complete`, or `archived` statuses.

Parameters
----------
dataset : RemoteDataset
The remote dataset to fetch the files from.
filenames : List[str]
A list of filenames to fetch.
chunk_size : int
The number of filenames to fetch at a time.
"""
remote_files = {}
remote_files_not_ready_for_import = {}
for i in range(0, len(filenames), chunk_size):
chunk = filenames[i : i + chunk_size]
for remote_file in dataset.fetch_remote_files(
{"types": "image,playback_video,video_frame", "item_names": chunk}
):
slot_names = _get_slot_names(remote_file)
remote_files[remote_file.full_path] = {
"item_id": remote_file.id,
"slot_names": slot_names,
"layout": remote_file.layout,
}
if remote_file.status not in [
"new",
"annotate",
"review",
"complete",
"archived",
]:
remote_files_not_ready_for_import[remote_file.full_path] = (
remote_file.status
)
else:
slot_names = _get_slot_names(remote_file)
remote_files[remote_file.full_path] = {
"item_id": remote_file.id,
"slot_names": slot_names,
"layout": remote_file.layout,
}
if remote_files_not_ready_for_import:
console = Console(theme=_console_theme())
console.print(
"The following files are either still processing, or failed to process, so annotations cannot be imported:",
style="warning",
)
for file, status in remote_files_not_ready_for_import.items():
console.print(f" - {file}, status: {status}")
raise ValueError(
"Some files targeted for annotation import are either still processing, or failed to process, so annotations cannot be imported."
)
return remote_files


Expand Down Expand Up @@ -1261,7 +1299,11 @@ def import_annotations( # noqa: C901
chunk_size = 100
while chunk_size > 0:
try:
remote_files = _get_remote_files(dataset, filenames, chunk_size)
remote_files = _get_remote_files_ready_for_import(
dataset,
filenames,
chunk_size,
)
break
except RequestEntitySizeExceeded:
chunk_size -= 8
Expand Down
99 changes: 91 additions & 8 deletions tests/darwin/importer/importer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
_display_slot_warnings_and_errors,
_find_and_parse,
_get_annotation_format,
_get_remote_files,
_get_remote_files_ready_for_import,
_get_slot_names,
_import_annotations,
_is_skeleton_class,
Expand Down Expand Up @@ -213,14 +213,42 @@ def test__build_attribute_lookup() -> None:
assert result == expected_lookup


def test__get_remote_files() -> None:
def test__get_remote_files_ready_for_import_succeeds() -> None:
mock_dataset = Mock()
mock_dataset.fetch_remote_files.return_value = [
Mock(full_path="path/to/file1", id="file1_id", layout="layout1"),
Mock(full_path="path/to/file2", id="file2_id", layout="layout2"),
Mock(
full_path="path/to/file1",
id="file1_id",
layout="layout1",
status="new",
),
Mock(
full_path="path/to/file2",
id="file2_id",
layout="layout2",
status="annotate",
),
Mock(
full_path="path/to/file3",
id="file3_id",
layout="layout3",
status="review",
),
Mock(
full_path="path/to/file4",
id="file4_id",
layout="layout4",
status="complete",
),
Mock(
full_path="path/to/file5",
id="file5_id",
layout="layout5",
status="archived",
),
]

filenames = ["file1", "file2"]
filenames = ["file1", "file2", "file3", "file4"]
expected_result = {
"path/to/file1": {
"item_id": "file1_id",
Expand All @@ -232,13 +260,68 @@ def test__get_remote_files() -> None:
"slot_names": ["slot_name2"],
"layout": "layout2",
},
"path/to/file3": {
"item_id": "file3_id",
"slot_names": ["slot_name3"],
"layout": "layout3",
},
"path/to/file4": {
"item_id": "file4_id",
"slot_names": ["slot_name4"],
"layout": "layout4",
},
"path/to/file5": {
"item_id": "file5_id",
"slot_names": ["slot_name5"],
"layout": "layout5",
},
}

with patch("darwin.importer.importer._get_slot_names") as mock_get_slot_names:
mock_get_slot_names.side_effect = [["slot_name1"], ["slot_name2"]]
result = _get_remote_files(mock_dataset, filenames)
mock_get_slot_names.side_effect = [
["slot_name1"],
["slot_name2"],
["slot_name3"],
["slot_name4"],
["slot_name5"],
]
result = _get_remote_files_ready_for_import(mock_dataset, filenames)
assert result == expected_result
assert mock_get_slot_names.call_count == 2
assert mock_get_slot_names.call_count == 5


def test__get_remote_files_ready_for_import_raises_with_statuses_not_ready_for_import() -> (
None
):
mock_dataset = Mock()

mock_dataset.fetch_remote_files.return_value = [
Mock(full_path="path/to/file2", id="file2_id", layout="layout2", status="error")
]
with pytest.raises(ValueError):
_get_remote_files_ready_for_import(mock_dataset, ["file2"])

mock_dataset.fetch_remote_files.return_value = [
Mock(
full_path="path/to/file3",
id="file3_id",
layout="layout3",
status="uploading",
)
]
with pytest.raises(ValueError):
_get_remote_files_ready_for_import(mock_dataset, ["file3"])

mock_dataset.fetch_remote_files.return_value = [
Mock(
full_path="path/to/file4",
id="file4_id",
layout="layout4",
status="processing",
)
]
with pytest.raises(ValueError):
_get_remote_files_ready_for_import(mock_dataset, ["file4"])


def test__get_slot_names() -> None:
Expand Down
Loading