diff --git a/CHANGELOG.md b/CHANGELOG.md index 92f8ecbdc..b4b51a987 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Fixes - Fix handling connection errors when refreshing oauth token ([#1204](https://github.com/neptune-ai/neptune-client/pull/1204)) +- Fix syncing offline runs with file upload ([#1211](https://github.com/neptune-ai/neptune-client/pull/1211)) ## neptune-client 0.16.17 diff --git a/src/neptune/new/attributes/atoms/file.py b/src/neptune/new/attributes/atoms/file.py index c6b34ceb7..7ce3b2fc8 100644 --- a/src/neptune/new/attributes/atoms/file.py +++ b/src/neptune/new/attributes/atoms/file.py @@ -30,7 +30,7 @@ def assign(self, value: FileVal, wait: bool = False) -> None: operation = UploadFile.of_file( value=value, attribute_path=self._path, - upload_path=self._container._op_processor._operation_storage.upload_path, + operation_storage=self._container._op_processor._operation_storage, ) with self._container.lock(): diff --git a/src/neptune/new/cli/sync.py b/src/neptune/new/cli/sync.py index 000dd591c..1f4acf66f 100644 --- a/src/neptune/new/cli/sync.py +++ b/src/neptune/new/cli/sync.py @@ -57,6 +57,7 @@ UniqueId, ) from neptune.new.internal.operation import Operation +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from neptune.new.internal.utils.logger import logger retries_timeout = int(os.getenv(NEPTUNE_SYNC_BATCH_TIMEOUT_ENV, "3600")) @@ -80,8 +81,9 @@ def sync_execution( container_id: UniqueId, container_type: ContainerType, ) -> None: + operation_storage = OperationStorage(execution_path) with DiskQueue( - dir_path=execution_path, + dir_path=operation_storage.data_path, to_dict=lambda x: x.to_dict(), from_dict=Operation.from_dict, lock=threading.RLock(), @@ -102,6 +104,7 @@ def sync_execution( container_id=container_id, container_type=container_type, operations=batch, + operation_storage=operation_storage, ) version_to_ack += processed_count batch = batch[processed_count:] diff --git a/src/neptune/new/internal/backends/hosted_neptune_backend.py b/src/neptune/new/internal/backends/hosted_neptune_backend.py index 4b72602d7..70d88862b 100644 --- a/src/neptune/new/internal/backends/hosted_neptune_backend.py +++ b/src/neptune/new/internal/backends/hosted_neptune_backend.py @@ -130,6 +130,7 @@ UploadFileContent, UploadFileSet, ) +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from neptune.new.internal.utils import base64_decode from neptune.new.internal.utils.generic_attribute_mapper import map_attribute_result_to_value from neptune.new.internal.utils.paths import path_to_str @@ -443,6 +444,7 @@ def execute_operations( container_id: UniqueId, container_type: ContainerType, operations: List[Operation], + operation_storage: OperationStorage, ) -> Tuple[int, List[NeptuneException]]: errors = [] @@ -466,6 +468,7 @@ def execute_operations( container_id=container_id, container_type=container_type, upload_operations=preprocessed_operations.upload_operations, + operation_storage=operation_storage, ) ) @@ -490,7 +493,7 @@ def execute_operations( assign_artifact_operations, preprocessed_operations.other_operations, ): - op.clean() + op.clean(operation_storage=operation_storage) return ( operations_preprocessor.processed_ops_count + dropped_count, @@ -502,6 +505,7 @@ def _execute_upload_operations( container_id: str, container_type: ContainerType, upload_operations: List[Operation], + operation_storage: OperationStorage, ) -> List[NeptuneException]: errors = list() @@ -522,7 +526,7 @@ def _execute_upload_operations( swagger_client=self.leaderboard_client, container_id=container_id, attribute=path_to_str(op.path), - source=op.file_path, + source=op.get_absolute_path(operation_storage), ext=op.ext, multipart_config=multipart_config, ) @@ -560,10 +564,13 @@ def _execute_upload_operations_with_400_retry( container_id: str, container_type: ContainerType, upload_operations: List[Operation], + operation_storage: OperationStorage, ) -> List[NeptuneException]: while True: try: - return self._execute_upload_operations(container_id, container_type, upload_operations) + return self._execute_upload_operations( + container_id, container_type, upload_operations, operation_storage + ) except ClientHttpError as ex: if "Length of stream does not match given range" not in ex.response: raise ex diff --git a/src/neptune/new/internal/backends/neptune_backend.py b/src/neptune/new/internal/backends/neptune_backend.py index 05f605cdc..9d0de6b15 100644 --- a/src/neptune/new/internal/backends/neptune_backend.py +++ b/src/neptune/new/internal/backends/neptune_backend.py @@ -55,6 +55,7 @@ UniqueId, ) from neptune.new.internal.operation import Operation +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from neptune.new.internal.websockets.websockets_factory import WebsocketsFactory from neptune.new.types.atoms import GitRef @@ -138,6 +139,7 @@ def execute_operations( container_id: UniqueId, container_type: ContainerType, operations: List[Operation], + operation_storage: OperationStorage, ) -> Tuple[int, List[NeptuneException]]: pass diff --git a/src/neptune/new/internal/backends/neptune_backend_mock.py b/src/neptune/new/internal/backends/neptune_backend_mock.py index 5e63d6879..0ccd96fb1 100644 --- a/src/neptune/new/internal/backends/neptune_backend_mock.py +++ b/src/neptune/new/internal/backends/neptune_backend_mock.py @@ -105,6 +105,7 @@ UploadFileContent, UploadFileSet, ) +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from neptune.new.internal.operation_visitor import OperationVisitor from neptune.new.internal.types.file_types import FileType from neptune.new.internal.utils import base64_decode @@ -282,16 +283,19 @@ def execute_operations( container_id: UniqueId, container_type: ContainerType, operations: List[Operation], + operation_storage: OperationStorage, ) -> Tuple[int, List[NeptuneException]]: result = [] for op in operations: try: - self._execute_operation(container_id, container_type, op) + self._execute_operation(container_id, container_type, op, operation_storage) except NeptuneException as e: result.append(e) return len(operations), result - def _execute_operation(self, container_id: UniqueId, container_type: ContainerType, op: Operation) -> None: + def _execute_operation( + self, container_id: UniqueId, container_type: ContainerType, op: Operation, operation_storage: OperationStorage + ) -> None: run = self._get_container(container_id, container_type) val = run.get(op.path) if val is not None and not isinstance(val, Value): @@ -299,7 +303,7 @@ def _execute_operation(self, container_id: UniqueId, container_type: ContainerTy raise MetadataInconsistency("{} is a namespace, not an attribute".format(op.path)) else: raise InternalClientError("{} is a {}".format(op.path, type(val))) - visitor = NeptuneBackendMock.NewValueOpVisitor(self, op.path, val) + visitor = NeptuneBackendMock.NewValueOpVisitor(self, op.path, val, operation_storage) new_val = visitor.visit(op) if new_val is not None: run.set(op.path, new_val) @@ -589,11 +593,14 @@ def copy_value(self, source_type: Type[Attribute], source_path: List[str]) -> At raise NotImplementedError class NewValueOpVisitor(OperationVisitor[Optional[Value]]): - def __init__(self, backend, path: List[str], current_value: Optional[Value]): + def __init__( + self, backend, path: List[str], current_value: Optional[Value], operation_storage: OperationStorage + ): self._backend = backend self._path = path self._current_value = current_value self._artifact_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + self._operation_storage = operation_storage def visit_assign_float(self, op: AssignFloat) -> Optional[Value]: if self._current_value is not None and not isinstance(self._current_value, Float): @@ -640,7 +647,7 @@ def visit_clear_artifact(self, _: ClearArtifact) -> Optional[Value]: def visit_upload_file(self, op: UploadFile) -> Optional[Value]: if self._current_value is not None and not isinstance(self._current_value, File): raise self._create_type_error("save", File.__name__) - return File.from_path(path=op.file_path, extension=op.ext) + return File.from_path(path=op.get_absolute_path(self._operation_storage), extension=op.ext) def visit_upload_file_content(self, op: UploadFileContent) -> Optional[Value]: if self._current_value is not None and not isinstance(self._current_value, File): diff --git a/src/neptune/new/internal/operation.py b/src/neptune/new/internal/operation.py index 8a507a4db..e236e4dd2 100644 --- a/src/neptune/new/internal/operation.py +++ b/src/neptune/new/internal/operation.py @@ -17,7 +17,6 @@ import os from dataclasses import dataclass from datetime import datetime -from pathlib import Path from typing import ( TYPE_CHECKING, Generic, @@ -29,9 +28,13 @@ TypeVar, ) -from neptune.common.exceptions import InternalClientError +from neptune.common.exceptions import ( + InternalClientError, + NeptuneException, +) from neptune.new.exceptions import MalformedOperation from neptune.new.internal.container_type import ContainerType +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from neptune.new.internal.types.file_types import FileType from neptune.new.types.atoms.file import File @@ -57,7 +60,7 @@ class Operation(abc.ABC): def accept(self, visitor: "OperationVisitor[Ret]") -> Ret: pass - def clean(self): + def clean(self, operation_storage: OperationStorage): pass def to_dict(self) -> dict: @@ -185,11 +188,13 @@ def from_dict(data: dict) -> "AssignArtifact": class UploadFile(Operation): ext: str - file_path: str + file_path: str = None + tmp_file_name: str = None + # `clean_after_upload` is for backward compatibility and should be removed in the future clean_after_upload: bool = False @classmethod - def of_file(cls, value: File, attribute_path: List[str], upload_path: Path): + def of_file(cls, value: File, attribute_path: List[str], operation_storage: OperationStorage): if value.file_type is FileType.LOCAL_FILE: operation = UploadFile( path=attribute_path, @@ -197,21 +202,16 @@ def of_file(cls, value: File, attribute_path: List[str], upload_path: Path): file_path=os.path.abspath(value.path), ) elif value.file_type in (FileType.IN_MEMORY, FileType.STREAM): - tmp_file_path = cls.get_upload_path(attribute_path, value.extension, upload_path) - value._save(tmp_file_path) - operation = UploadFile( - path=attribute_path, - ext=value.extension, - file_path=os.path.abspath(tmp_file_path), - clean_after_upload=True, - ) + tmp_file_name = cls.get_tmp_file_name(attribute_path, value.extension) + value._save(operation_storage.upload_path / tmp_file_name) + operation = UploadFile(path=attribute_path, ext=value.extension, tmp_file_name=tmp_file_name) else: raise ValueError(f"Unexpected FileType: {value.file_type}") return operation - def clean(self): - if self.clean_after_upload: - os.remove(self.file_path) + def clean(self, operation_storage: OperationStorage): + if self.clean_after_upload or self.tmp_file_name: + os.remove(self.get_absolute_path(operation_storage)) def accept(self, visitor: "OperationVisitor[Ret]") -> Ret: return visitor.visit_upload_file(self) @@ -220,20 +220,35 @@ def to_dict(self) -> dict: ret = super().to_dict() ret["ext"] = self.ext ret["file_path"] = self.file_path + ret["tmp_file_name"] = self.tmp_file_name ret["clean_after_upload"] = self.clean_after_upload return ret @staticmethod def from_dict(data: dict) -> "UploadFile": - return UploadFile(data["path"], data["ext"], data["file_path"], data.get("clean_after_upload", False)) + return UploadFile( + data["path"], + data["ext"], + data.get("file_path"), + data.get("tmp_file_name"), + data.get("clean_after_upload", False), + ) @staticmethod - def get_upload_path(attribute_path: List[str], extension: str, upload_path: Path): + def get_tmp_file_name(attribute_path: List[str], extension: str): now = datetime.now() tmp_file_name = ( f"{'_'.join(attribute_path)}-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}.{extension}" ) - return upload_path / tmp_file_name + return tmp_file_name + + def get_absolute_path(self, operation_storage: OperationStorage) -> str: + if self.file_path: + return self.file_path + elif self.tmp_file_name: + return str(operation_storage.upload_path / self.tmp_file_name) + + raise NeptuneException("Expected 'file_path' or 'tmp_file_name' to be filled.") @dataclass diff --git a/src/neptune/new/internal/operation_processors/async_operation_processor.py b/src/neptune/new/internal/operation_processors/async_operation_processor.py index 800aaa42e..249371175 100644 --- a/src/neptune/new/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/new/internal/operation_processors/async_operation_processor.py @@ -256,6 +256,7 @@ def process_batch(self, batch: List[Operation], version: int) -> None: container_id=self._processor._container_id, container_type=self._processor._container_type, operations=batch, + operation_storage=self._processor._operation_storage, ) version_to_ack += processed_count batch = batch[processed_count:] diff --git a/src/neptune/new/internal/operation_processors/operation_storage.py b/src/neptune/new/internal/operation_processors/operation_storage.py index 39eb0293d..609707f82 100644 --- a/src/neptune/new/internal/operation_processors/operation_storage.py +++ b/src/neptune/new/internal/operation_processors/operation_storage.py @@ -20,6 +20,7 @@ import os import shutil from pathlib import Path +from typing import Union from neptune.new.constants import NEPTUNE_DATA_DIRECTORY from neptune.new.internal.container_type import ContainerType @@ -28,8 +29,11 @@ class OperationStorage: - def __init__(self, data_path: str): - self._data_path = Path(data_path) + def __init__(self, data_path: Union[str, Path]): + if isinstance(data_path, Path): + self._data_path = data_path + else: + self._data_path = Path(data_path) # initialize directories os.makedirs(self.data_path, exist_ok=True) diff --git a/src/neptune/new/internal/operation_processors/sync_operation_processor.py b/src/neptune/new/internal/operation_processors/sync_operation_processor.py index 68536f0fb..8b5bd846e 100644 --- a/src/neptune/new/internal/operation_processors/sync_operation_processor.py +++ b/src/neptune/new/internal/operation_processors/sync_operation_processor.py @@ -45,7 +45,12 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType): return data_path def enqueue_operation(self, op: Operation, wait: bool) -> None: - _, errors = self._backend.execute_operations(self._container_id, self._container_type, [op]) + _, errors = self._backend.execute_operations( + container_id=self._container_id, + container_type=self._container_type, + operations=[op], + operation_storage=self._operation_storage, + ) if errors: raise errors[0] diff --git a/tests/e2e/standard/test_cli.py b/tests/e2e/standard/test_cli.py index c1aeac6de..6ecbe9910 100644 --- a/tests/e2e/standard/test_cli.py +++ b/tests/e2e/standard/test_cli.py @@ -24,6 +24,7 @@ import neptune.new as neptune from neptune.common.exceptions import NeptuneException from neptune.new.cli import sync +from neptune.new.types import File from src.neptune.new.cli.commands import clear from tests.e2e.base import ( AVAILABLE_CONTAINERS, @@ -128,6 +129,11 @@ def test_offline_sync(self, environment): val = fake.word() run[key] = val + # and some file + key2 = self.gen_key() + val2 = File.from_content(b"dummybytes") + run[key2].upload(val2) + # and stop it run.stop() @@ -142,6 +148,9 @@ def test_offline_sync(self, environment): run2 = neptune.init_run(with_id=sys_id, project=environment.project) assert run2[key].fetch() == val + run2[key2].download() + with open(f"{tmp}/{key2.split('/')[-1]}.bin", "rb") as file: + assert file.read() == b"dummybytes" run2.stop() @pytest.mark.parametrize("container_type", ["run"]) diff --git a/tests/unit/neptune/new/attributes/atoms/test_file.py b/tests/unit/neptune/new/attributes/atoms/test_file.py index 6a284b525..51bd4e32b 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_file.py +++ b/tests/unit/neptune/new/attributes/atoms/test_file.py @@ -45,12 +45,12 @@ class TestFile(TestAttributeBase): @unittest.skipIf(IS_WINDOWS, "Windows behaves strangely") def test_assign(self): - def get_tmp_uploaded_file(tmp_upload_dir): + def get_tmp_uploaded_file_name(tmp_upload_dir): """Get tmp file to uploaded from `upload_path` - here's assumption that we upload only one file per one path in test""" uploaded_files = os.listdir(tmp_upload_dir) assert len(uploaded_files) == 1 - return f"{tmp_upload_dir}/{uploaded_files[0]}" + return uploaded_files[0] a_text = "Some text stream" a_binary = b"Some binary stream" @@ -64,13 +64,13 @@ def get_tmp_uploaded_file(tmp_upload_dir): ( FileVal.from_stream(StringIO(a_text)), lambda attribute_path, tmp_uploaded_file: UploadFile( - attribute_path, ext="txt", file_path=tmp_uploaded_file, clean_after_upload=True + attribute_path, ext="txt", tmp_file_name=tmp_uploaded_file ), ), ( FileVal.from_stream(BytesIO(a_binary)), lambda attribute_path, tmp_uploaded_file: UploadFile( - attribute_path, ext="bin", file_path=tmp_uploaded_file, clean_after_upload=True + attribute_path, ext="bin", tmp_file_name=tmp_uploaded_file ), ), ] @@ -88,7 +88,7 @@ def get_tmp_uploaded_file(tmp_upload_dir): var.assign(value, wait=wait) if value.file_type is not FileType.LOCAL_FILE: - tmp_uploaded_file = get_tmp_uploaded_file(tmp_upload_dir) + tmp_uploaded_file = get_tmp_uploaded_file_name(tmp_upload_dir) self.assertTrue(os.path.exists(tmp_uploaded_file)) else: tmp_uploaded_file = None diff --git a/tests/unit/neptune/new/cli/test_sync.py b/tests/unit/neptune/new/cli/test_sync.py index 4ed37ea51..9d810b4b4 100644 --- a/tests/unit/neptune/new/cli/test_sync.py +++ b/tests/unit/neptune/new/cli/test_sync.py @@ -16,6 +16,7 @@ from unittest.mock import MagicMock +import mock import pytest from neptune.new.cli.sync import SyncRunner @@ -70,9 +71,15 @@ def test_sync_all_runs(tmp_path, mocker, capsys, backend, sync_runner, container assert f"Synchronising {get_qualified_name(synced_container)}" not in captured.out # and - backend.execute_operations.has_calls( - [ - mocker.call(unsynced_container.id, ContainerType.RUN, ["op-1", "op-2"]), + assert backend.execute_operations.called_once() + backend.execute_operations.assert_has_calls( + calls=[ + mocker.call( + container_id=unsynced_container.id, + container_type=container_type, + operations=["op-1", "op-2"], + operation_storage=mock.ANY, + ), ], any_order=True, ) @@ -103,9 +110,14 @@ def test_sync_all_offline_runs(tmp_path, mocker, capsys, backend, sync_runner): ) in captured.out # and - backend.execute_operations.has_calls( + backend.execute_operations.assert_has_calls( [ - mocker.call(offline_run.id, ContainerType.RUN, ["op-1", "op-2"]), + mocker.call( + container_id=offline_run.id, + container_type=ContainerType.RUN, + operations=["op-0", "op-1", "op-2"], + operation_storage=mock.ANY, + ), ], any_order=True, ) @@ -169,17 +181,13 @@ def test_sync_selected_runs(tmp_path, mocker, capsys, backend, sync_runner): assert "Synchronising {}".format(get_qualified_name(unsync_exp)) not in captured.out # and - backend.execute_operations.has_calls( + backend.execute_operations.assert_has_calls( [ mocker.call( - sync_exp.id, - ContainerType.RUN, - operations=["op-1", "op-2"], - ), - mocker.call( - offline_run.id, - ContainerType.RUN, + container_id=offline_run.id, + container_type=ContainerType.RUN, operations=["op-0", "op-1", "op-2"], + operation_storage=mock.ANY, ), ], any_order=True, @@ -220,17 +228,19 @@ def test_sync_deprecated_runs(tmp_path, mocker, capsys, backend, sync_runner): assert "Synchronization of run {} completed.".format(get_qualified_name(offline_old_run)) in captured.out # and - backend.execute_operations.has_calls( + backend.execute_operations.assert_has_calls( [ mocker.call( - deprecated_unsynced_run.id, - ContainerType.RUN, + container_id=deprecated_unsynced_run.id, + container_type=ContainerType.RUN, operations=["op-1", "op-2"], + operation_storage=mock.ANY, ), mocker.call( - offline_old_run.id, - ContainerType.RUN, + container_id=offline_old_run.id, + container_type=ContainerType.RUN, operations=["op-0", "op-1", "op-2"], + operation_storage=mock.ANY, ), ], any_order=True, diff --git a/tests/unit/neptune/new/cli/utils.py b/tests/unit/neptune/new/cli/utils.py index 8874acb94..fdafb9fb6 100644 --- a/tests/unit/neptune/new/cli/utils.py +++ b/tests/unit/neptune/new/cli/utils.py @@ -48,7 +48,7 @@ def get_metadata_container(container_id, expected_container_type: ContainerType) return get_metadata_container -def execute_operations(container_id, container_type, operations): +def execute_operations(container_id, container_type, operations, operation_storage): return len(operations), [] diff --git a/tests/unit/neptune/new/internal/backends/test_hosted_neptune_backend.py b/tests/unit/neptune/new/internal/backends/test_hosted_neptune_backend.py index 1529a650a..0dd1e828d 100644 --- a/tests/unit/neptune/new/internal/backends/test_hosted_neptune_backend.py +++ b/tests/unit/neptune/new/internal/backends/test_hosted_neptune_backend.py @@ -57,6 +57,7 @@ UploadFile, UploadFileContent, ) +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from neptune.new.internal.utils import base64_encode from tests.unit.neptune.new.backend_test_mixin import BackendTestMixin from tests.unit.neptune.new.utils import response_mock @@ -86,6 +87,7 @@ def setUp(self) -> None: create_artifacts_client.cache_clear() self.container_types = [ContainerType.RUN, ContainerType.PROJECT] + self.dummy_operation_storage = OperationStorage("./dummy_storage") @patch("neptune.new.internal.backends.hosted_neptune_backend.upload_file_attribute") @patch("socket.gethostbyname", MagicMock(return_value="1.1.1.1")) @@ -136,6 +138,7 @@ def test_execute_operations(self, upload_mock, swagger_client_factory): file_path="other/file/path.txt", ), ], + operation_storage=self.dummy_operation_storage, ) # then @@ -250,6 +253,7 @@ def test_upload_files_destination_path(self, upload_mock, swagger_client_factory file_path="/path/to/some_image.jpeg", ), ], + operation_storage=self.dummy_operation_storage, ) # then @@ -332,6 +336,7 @@ def test_track_to_new_artifact(self, track_to_new_artifact_mock, swagger_client_ entries=[("/path/to/file2", None)], ), ], + operation_storage=self.dummy_operation_storage, ) # then @@ -423,6 +428,7 @@ def test_track_to_existing_artifact(self, track_to_existing_artifact_mock, swagg entries=[("/path/to/file2", None)], ), ], + operation_storage=self.dummy_operation_storage, ) # then @@ -550,6 +556,7 @@ def test_limit_exceed(self, swagger_client_factory): operations=[ LogFloats(["float1"], [LogFloats.ValueType(1, 2, 3)]), ], + operation_storage=self.dummy_operation_storage, ) @patch("socket.gethostbyname", MagicMock(return_value="1.1.1.1")) @@ -574,4 +581,5 @@ def test_limit_exceed_legacy(self, swagger_client_factory): operations=[ LogFloats(["float1"], [LogFloats.ValueType(1, 2, 3)]), ], + operation_storage=self.dummy_operation_storage, ) diff --git a/tests/unit/neptune/new/internal/backends/test_neptune_backend_mock.py b/tests/unit/neptune/new/internal/backends/test_neptune_backend_mock.py index 022e4832d..2f2c9b2e2 100644 --- a/tests/unit/neptune/new/internal/backends/test_neptune_backend_mock.py +++ b/tests/unit/neptune/new/internal/backends/test_neptune_backend_mock.py @@ -45,6 +45,7 @@ LogFloats, LogStrings, ) +from neptune.new.internal.operation_processors.operation_storage import OperationStorage from tests.unit.neptune.legacy.random_utils import a_string @@ -64,13 +65,19 @@ def setUp(self) -> None: (model.id, ContainerType.MODEL), (model_version.id, ContainerType.MODEL_VERSION), ] + self.dummy_operation_storage = OperationStorage("./dummy_storage") def test_get_float_attribute(self): for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): # given digit = randint(1, 10**4) - self.backend.execute_operations(container_id, container_type, operations=[AssignFloat(["x"], digit)]) + self.backend.execute_operations( + container_id, + container_type, + operations=[AssignFloat(["x"], digit)], + operation_storage=self.dummy_operation_storage, + ) # when ret = self.backend.get_float_attribute(container_id, container_type, path=["x"]) @@ -83,7 +90,12 @@ def test_get_string_attribute(self): with self.subTest(f"For containerType: {container_type}"): # given text = a_string() - self.backend.execute_operations(container_id, container_type, operations=[AssignString(["x"], text)]) + self.backend.execute_operations( + container_id, + container_type, + operations=[AssignString(["x"], text)], + operation_storage=self.dummy_operation_storage, + ) # when ret = self.backend.get_string_attribute(container_id, container_type, path=["x"]) @@ -97,7 +109,12 @@ def test_get_datetime_attribute(self): # given now = datetime.datetime.now() now = now.replace(microsecond=1000 * int(now.microsecond / 1000)) - self.backend.execute_operations(container_id, container_type, [AssignDatetime(["x"], now)]) + self.backend.execute_operations( + container_id, + container_type, + [AssignDatetime(["x"], now)], + operation_storage=self.dummy_operation_storage, + ) # when ret = self.backend.get_datetime_attribute(container_id, container_type, ["x"]) @@ -121,6 +138,7 @@ def test_get_float_series_attribute(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) self.backend.execute_operations( container_id, @@ -134,6 +152,7 @@ def test_get_float_series_attribute(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) # when @@ -158,6 +177,7 @@ def test_get_string_series_attribute(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) self.backend.execute_operations( container_id, @@ -171,6 +191,7 @@ def test_get_string_series_attribute(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) # when @@ -183,7 +204,12 @@ def test_get_string_set_attribute(self): # given for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AddStrings(["x"], {"abcx", "qwe"})]) + self.backend.execute_operations( + container_id, + container_type, + [AddStrings(["x"], {"abcx", "qwe"})], + operation_storage=self.dummy_operation_storage, + ) # when ret = self.backend.get_string_set_attribute(container_id, container_type, ["x"]) @@ -207,6 +233,7 @@ def test_get_string_series_values(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) self.backend.execute_operations( container_id, @@ -220,6 +247,7 @@ def test_get_string_series_values(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) # when @@ -257,6 +285,7 @@ def test_get_float_series_values(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) self.backend.execute_operations( container_id, @@ -270,6 +299,7 @@ def test_get_float_series_values(self): ], ) ], + operation_storage=self.dummy_operation_storage, ) # when @@ -295,7 +325,12 @@ def test_get_float_attribute_wrong_type(self): # given for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AssignString(["x"], "abc")]) + self.backend.execute_operations( + container_id, + container_type, + [AssignString(["x"], "abc")], + operation_storage=self.dummy_operation_storage, + ) # then with self.assertRaises(MetadataInconsistency): @@ -305,7 +340,12 @@ def test_get_string_attribute_wrong_type(self): # given for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AssignFloat(["x"], 5)]) + self.backend.execute_operations( + container_id, + container_type, + [AssignFloat(["x"], 5)], + operation_storage=self.dummy_operation_storage, + ) # then with self.assertRaises(MetadataInconsistency): @@ -315,7 +355,12 @@ def test_get_datetime_attribute_wrong_type(self): # given for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AssignString(["x"], "abc")]) + self.backend.execute_operations( + container_id, + container_type, + [AssignString(["x"], "abc")], + operation_storage=self.dummy_operation_storage, + ) # then with self.assertRaises(MetadataInconsistency): @@ -325,7 +370,12 @@ def test_get_string_series_attribute_wrong_type(self): # given for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AssignString(["x"], "abc")]) + self.backend.execute_operations( + container_id, + container_type, + [AssignString(["x"], "abc")], + operation_storage=self.dummy_operation_storage, + ) # then with self.assertRaises(MetadataInconsistency): @@ -335,7 +385,12 @@ def test_get_string_set_attribute_wrong_type(self): # given for container_id, container_type in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AssignString(["x"], "abc")]) + self.backend.execute_operations( + container_id, + container_type, + [AssignString(["x"], "abc")], + operation_storage=self.dummy_operation_storage, + ) # then with self.assertRaises(MetadataInconsistency): @@ -345,7 +400,12 @@ def test_container_not_found(self): # given for (container_id, container_type) in self.ids_with_types: with self.subTest(f"For containerType: {container_type}"): - self.backend.execute_operations(container_id, container_type, [AssignString(["x"], "abc")]) + self.backend.execute_operations( + container_id, + container_type, + [AssignString(["x"], "abc")], + operation_storage=self.dummy_operation_storage, + ) # then with self.assertRaises(ContainerUUIDNotFound): diff --git a/tests/unit/neptune/new/internal/test_operations.py b/tests/unit/neptune/new/internal/test_operations.py index b59bd2e5d..ca355564e 100644 --- a/tests/unit/neptune/new/internal/test_operations.py +++ b/tests/unit/neptune/new/internal/test_operations.py @@ -87,7 +87,9 @@ def _list_objects(): TestOperations._random_path(), "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", ), - UploadFile(TestOperations._random_path(), "f.txt", "file/path/f.txt"), + UploadFile(TestOperations._random_path(), "txt", file_path="file/path/f.txt"), + UploadFile(TestOperations._random_path(), "txt", file_path="file/path/f.txt", clean_after_upload=True), + UploadFile(TestOperations._random_path(), "txt", tmp_file_name="f.txt"), UploadFileContent(TestOperations._random_path(), "stream.txt", "some base64"), UploadFileSet( TestOperations._random_path(),