Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable stream exporters: VOC, YOLO, Datumaro, and COCO data format #1102

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Migrate DVC v3.0.0
(<https://github.com/openvinotoolkit/datumaro/pull/1072>)
- Stream dataset import/export
(<https://github.com/openvinotoolkit/datumaro/pull/1077>, <https://github.com/openvinotoolkit/datumaro/pull/1081>, <https://github.com/openvinotoolkit/datumaro/pull/1082>, <https://github.com/openvinotoolkit/datumaro/pull/1091>, <https://github.com/openvinotoolkit/datumaro/pull/1093>, <https://github.com/openvinotoolkit/datumaro/pull/1098>)
(<https://github.com/openvinotoolkit/datumaro/pull/1077>, <https://github.com/openvinotoolkit/datumaro/pull/1081>, <https://github.com/openvinotoolkit/datumaro/pull/1082>, <https://github.com/openvinotoolkit/datumaro/pull/1091>, <https://github.com/openvinotoolkit/datumaro/pull/1093>, <https://github.com/openvinotoolkit/datumaro/pull/1098>, <https://github.com/openvinotoolkit/datumaro/pull/1102>)
- Support mask annotations for CVAT data format
(<https://github.com/openvinotoolkit/datumaro/pull/1078>)

Expand Down
4 changes: 3 additions & 1 deletion src/datumaro/components/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ def export(

assert "ctx" not in kwargs
exporter_kwargs = copy(kwargs)
exporter_kwargs["stream"] = self._stream
exporter_kwargs["ctx"] = ExportContext(
progress_reporter=progress_reporter, error_policy=error_policy
)
Expand Down Expand Up @@ -632,7 +633,8 @@ def export(
raise e.__cause__

self.bind(save_dir, format, options=copy(kwargs))
self.flush_changes()
if not self._stream:
self.flush_changes()

def save(self, save_dir: Optional[str] = None, **kwargs) -> None:
options = dict(self._options)
Expand Down
11 changes: 4 additions & 7 deletions src/datumaro/components/dataset_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,9 @@ def __init__(
):
if not source.is_stream:
raise ValueError("source should be a stream.")
super().__init__(source, infos, categories, media_type)
self._subset_names = None
self._subset_names = list(source.subsets().keys())
self._transform_ids_for_latest_subset_names = []
super().__init__(source, infos, categories, media_type)

def is_cache_initialized(self) -> bool:
log.debug("This function has no effect on streaming.")
Expand Down Expand Up @@ -660,12 +660,9 @@ def get_subset(self, name: str) -> IDataset:

@property
def subset_names(self):
if self._subset_names is None:
self._subset_names = {item.subset for item in self}
self._transforms_for_latest_subset_names = [id(t) for t in self._transforms]
elif self._transforms_for_latest_subset_names != [id(t) for t in self._transforms]:
if self._transform_ids_for_latest_subset_names != [id(t) for t in self._transforms]:
self._subset_names = {item.subset for item in self}
self._transforms_for_latest_subset_names = [id(t) for t in self._transforms]
self._transform_ids_for_latest_subset_names = [id(t) for t in self._transforms]

return self._subset_names

Expand Down
12 changes: 12 additions & 0 deletions src/datumaro/components/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def __init__(
default_image_ext: Optional[str] = None,
save_dataset_meta: bool = False,
save_hashkey_meta: bool = False,
stream: bool = False,
ctx: Optional[ExportContext] = None,
):
default_image_ext = default_image_ext or self.DEFAULT_IMAGE_EXT
Expand Down Expand Up @@ -222,6 +223,12 @@ def __init__(
else:
self._patch = None

if stream and not self.can_stream:
raise DatasetExportError(
f"{self.__class__.__name__} cannot export a dataset in a stream manner"
)
self._stream = stream

self._ctx: ExportContext = ctx or NullExportContext()

def _find_image_ext(self, item: Union[DatasetItem, Image]):
Expand Down Expand Up @@ -299,6 +306,11 @@ def _check_hash_key_existence(self, item):
self._save_hashkey_meta = True
return

@property
def can_stream(self) -> bool:
"""Flag to indicate whether the exporter can export the dataset in a stream manner or not."""
return False


# TODO: Currently, ExportContextComponent is introduced only for Datumaro and DatumaroBinary format
# for multi-processing. We need to propagate this to everywhere in Datumaro 1.2.0
Expand Down
9 changes: 7 additions & 2 deletions src/datumaro/plugins/data_formats/arrow/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ def __init__(
num_shards: int = 1,
max_shard_size: Optional[int] = None,
):
super().__init__(context, "", export_context)
super().__init__(
context=context,
subset=subset,
ann_file="",
export_context=export_context,
)
self._schema = deepcopy(DatumaroArrow.SCHEMA)
self._subset = subset
self._writers = []
self._fnames = []
self._max_chunk_size = max_chunk_size
Expand Down Expand Up @@ -370,6 +374,7 @@ def __init__(
num_shards: int = 1,
max_shard_size: Optional[int] = None,
max_chunk_size: int = 1000,
**kwargs,
):
super().__init__(
extractor=extractor,
Expand Down
Loading
Loading