Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix syncing offline runs with file upload #1211

Merged
merged 12 commits into from
Feb 13, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Fixes
- Fix handling connection errors when refreshing oauth token ([#1204](https://github.com/neptune-ai/neptune-client/pull/1204))
- Fix syncing offline runs with file upload ([#1211](https://github.com/neptune-ai/neptune-client/pull/1211))

## neptune-client 0.16.17

Expand Down
2 changes: 1 addition & 1 deletion src/neptune/new/attributes/atoms/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def assign(self, value: FileVal, wait: bool = False) -> None:
operation = UploadFile.of_file(
value=value,
attribute_path=self._path,
upload_path=self._container._op_processor._operation_storage.upload_path,
operation_storage=self._container._op_processor._operation_storage,
)

with self._container.lock():
Expand Down
5 changes: 4 additions & 1 deletion src/neptune/new/cli/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
UniqueId,
)
from neptune.new.internal.operation import Operation
from neptune.new.internal.operation_processors.operation_storage import OperationStorage
from neptune.new.internal.utils.logger import logger

retries_timeout = int(os.getenv(NEPTUNE_SYNC_BATCH_TIMEOUT_ENV, "3600"))
Expand All @@ -80,8 +81,9 @@ def sync_execution(
container_id: UniqueId,
container_type: ContainerType,
) -> None:
operation_storage = OperationStorage(execution_path)
with DiskQueue(
dir_path=execution_path,
dir_path=operation_storage.data_path,
to_dict=lambda x: x.to_dict(),
from_dict=Operation.from_dict,
lock=threading.RLock(),
Expand All @@ -102,6 +104,7 @@ def sync_execution(
container_id=container_id,
container_type=container_type,
operations=batch,
operation_storage=operation_storage,
)
version_to_ack += processed_count
batch = batch[processed_count:]
Expand Down
13 changes: 10 additions & 3 deletions src/neptune/new/internal/backends/hosted_neptune_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
UploadFileContent,
UploadFileSet,
)
from neptune.new.internal.operation_processors.operation_storage import OperationStorage
from neptune.new.internal.utils import base64_decode
from neptune.new.internal.utils.generic_attribute_mapper import map_attribute_result_to_value
from neptune.new.internal.utils.paths import path_to_str
Expand Down Expand Up @@ -443,6 +444,7 @@ def execute_operations(
container_id: UniqueId,
container_type: ContainerType,
operations: List[Operation],
operation_storage: OperationStorage,
) -> Tuple[int, List[NeptuneException]]:
errors = []

Expand All @@ -466,6 +468,7 @@ def execute_operations(
container_id=container_id,
container_type=container_type,
upload_operations=preprocessed_operations.upload_operations,
operation_storage=operation_storage,
)
)

Expand All @@ -490,7 +493,7 @@ def execute_operations(
assign_artifact_operations,
preprocessed_operations.other_operations,
):
op.clean()
op.clean(operation_storage=operation_storage)

return (
operations_preprocessor.processed_ops_count + dropped_count,
Expand All @@ -502,6 +505,7 @@ def _execute_upload_operations(
container_id: str,
container_type: ContainerType,
upload_operations: List[Operation],
operation_storage: OperationStorage,
) -> List[NeptuneException]:
errors = list()

Expand All @@ -522,7 +526,7 @@ def _execute_upload_operations(
swagger_client=self.leaderboard_client,
container_id=container_id,
attribute=path_to_str(op.path),
source=op.file_path,
source=op.get_absolute_path(operation_storage),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like both - clear and get_absolute_path should be defined in OperationStorage. Operation should be a simple serializable dataclass with minimum logic. And it would be better to pass operation object instead of raw path as an argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll give it a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we define clear and get_absolute_path in OperationStorage we'll have to pass Operation (in particular UploadFile) to those functions because we would like to clear or get path of particular operation.

At the same time we need upload_path of OperationStorage when creating operation (when we create UploadFile of in-memory file we also save file to .neptune catalog).

It'll require OperationStorage to be some kind of proxy on UploadFile operation irresponsible also for creating it.

If you're ok with that I can do that, what do you think?

ext=op.ext,
multipart_config=multipart_config,
)
Expand Down Expand Up @@ -560,10 +564,13 @@ def _execute_upload_operations_with_400_retry(
container_id: str,
container_type: ContainerType,
upload_operations: List[Operation],
operation_storage: OperationStorage,
) -> List[NeptuneException]:
while True:
try:
return self._execute_upload_operations(container_id, container_type, upload_operations)
return self._execute_upload_operations(
container_id, container_type, upload_operations, operation_storage
)
except ClientHttpError as ex:
if "Length of stream does not match given range" not in ex.response:
raise ex
Expand Down
2 changes: 2 additions & 0 deletions src/neptune/new/internal/backends/neptune_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
UniqueId,
)
from neptune.new.internal.operation import Operation
from neptune.new.internal.operation_processors.operation_storage import OperationStorage
from neptune.new.internal.websockets.websockets_factory import WebsocketsFactory
from neptune.new.types.atoms import GitRef

Expand Down Expand Up @@ -138,6 +139,7 @@ def execute_operations(
container_id: UniqueId,
container_type: ContainerType,
operations: List[Operation],
operation_storage: OperationStorage,
) -> Tuple[int, List[NeptuneException]]:
pass

Expand Down
17 changes: 12 additions & 5 deletions src/neptune/new/internal/backends/neptune_backend_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
UploadFileContent,
UploadFileSet,
)
from neptune.new.internal.operation_processors.operation_storage import OperationStorage
from neptune.new.internal.operation_visitor import OperationVisitor
from neptune.new.internal.types.file_types import FileType
from neptune.new.internal.utils import base64_decode
Expand Down Expand Up @@ -282,24 +283,27 @@ def execute_operations(
container_id: UniqueId,
container_type: ContainerType,
operations: List[Operation],
operation_storage: OperationStorage,
) -> Tuple[int, List[NeptuneException]]:
result = []
for op in operations:
try:
self._execute_operation(container_id, container_type, op)
self._execute_operation(container_id, container_type, op, operation_storage)
except NeptuneException as e:
result.append(e)
return len(operations), result

def _execute_operation(self, container_id: UniqueId, container_type: ContainerType, op: Operation) -> None:
def _execute_operation(
self, container_id: UniqueId, container_type: ContainerType, op: Operation, operation_storage: OperationStorage
) -> None:
run = self._get_container(container_id, container_type)
val = run.get(op.path)
if val is not None and not isinstance(val, Value):
if isinstance(val, dict):
raise MetadataInconsistency("{} is a namespace, not an attribute".format(op.path))
else:
raise InternalClientError("{} is a {}".format(op.path, type(val)))
visitor = NeptuneBackendMock.NewValueOpVisitor(self, op.path, val)
visitor = NeptuneBackendMock.NewValueOpVisitor(self, op.path, val, operation_storage)
new_val = visitor.visit(op)
if new_val is not None:
run.set(op.path, new_val)
Expand Down Expand Up @@ -589,11 +593,14 @@ def copy_value(self, source_type: Type[Attribute], source_path: List[str]) -> At
raise NotImplementedError

class NewValueOpVisitor(OperationVisitor[Optional[Value]]):
def __init__(self, backend, path: List[str], current_value: Optional[Value]):
def __init__(
self, backend, path: List[str], current_value: Optional[Value], operation_storage: OperationStorage
):
self._backend = backend
self._path = path
self._current_value = current_value
self._artifact_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
self._operation_storage = operation_storage

def visit_assign_float(self, op: AssignFloat) -> Optional[Value]:
if self._current_value is not None and not isinstance(self._current_value, Float):
Expand Down Expand Up @@ -640,7 +647,7 @@ def visit_clear_artifact(self, _: ClearArtifact) -> Optional[Value]:
def visit_upload_file(self, op: UploadFile) -> Optional[Value]:
if self._current_value is not None and not isinstance(self._current_value, File):
raise self._create_type_error("save", File.__name__)
return File.from_path(path=op.file_path, extension=op.ext)
return File.from_path(path=op.get_absolute_path(self._operation_storage), extension=op.ext)

def visit_upload_file_content(self, op: UploadFileContent) -> Optional[Value]:
if self._current_value is not None and not isinstance(self._current_value, File):
Expand Down
53 changes: 34 additions & 19 deletions src/neptune/new/internal/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import (
TYPE_CHECKING,
Generic,
Expand All @@ -29,9 +28,13 @@
TypeVar,
)

from neptune.common.exceptions import InternalClientError
from neptune.common.exceptions import (
InternalClientError,
NeptuneException,
)
from neptune.new.exceptions import MalformedOperation
from neptune.new.internal.container_type import ContainerType
from neptune.new.internal.operation_processors.operation_storage import OperationStorage
from neptune.new.internal.types.file_types import FileType
from neptune.new.types.atoms.file import File

Expand All @@ -57,7 +60,7 @@ class Operation(abc.ABC):
def accept(self, visitor: "OperationVisitor[Ret]") -> Ret:
pass

def clean(self):
def clean(self, operation_storage: OperationStorage):
pass

def to_dict(self) -> dict:
Expand Down Expand Up @@ -185,33 +188,30 @@ def from_dict(data: dict) -> "AssignArtifact":
class UploadFile(Operation):

ext: str
file_path: str
file_path: str = None
tmp_file_name: str = None
# `clean_after_upload` is for backward compatibility and should be removed in the future
aniezurawski marked this conversation as resolved.
Show resolved Hide resolved
clean_after_upload: bool = False

@classmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe pass OperationStorage instead of upload_path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, why not.

def of_file(cls, value: File, attribute_path: List[str], upload_path: Path):
def of_file(cls, value: File, attribute_path: List[str], operation_storage: OperationStorage):
if value.file_type is FileType.LOCAL_FILE:
operation = UploadFile(
path=attribute_path,
ext=value.extension,
file_path=os.path.abspath(value.path),
)
elif value.file_type in (FileType.IN_MEMORY, FileType.STREAM):
tmp_file_path = cls.get_upload_path(attribute_path, value.extension, upload_path)
value._save(tmp_file_path)
operation = UploadFile(
path=attribute_path,
ext=value.extension,
file_path=os.path.abspath(tmp_file_path),
clean_after_upload=True,
)
tmp_file_name = cls.get_tmp_file_name(attribute_path, value.extension)
value._save(operation_storage.upload_path / tmp_file_name)
operation = UploadFile(path=attribute_path, ext=value.extension, tmp_file_name=tmp_file_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean_after_upload is missing. Is it intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Now we determine if operation should be cleaned by existence of tmp_file_name value.

clean_after_upload is deprecated flag leaved only for handling of old format operations if user invoke sync on old data.

else:
raise ValueError(f"Unexpected FileType: {value.file_type}")
return operation

def clean(self):
if self.clean_after_upload:
os.remove(self.file_path)
def clean(self, operation_storage: OperationStorage):
if self.clean_after_upload or self.tmp_file_name:
os.remove(self.get_absolute_path(operation_storage))

def accept(self, visitor: "OperationVisitor[Ret]") -> Ret:
return visitor.visit_upload_file(self)
Expand All @@ -220,20 +220,35 @@ def to_dict(self) -> dict:
ret = super().to_dict()
ret["ext"] = self.ext
ret["file_path"] = self.file_path
ret["tmp_file_name"] = self.tmp_file_name
ret["clean_after_upload"] = self.clean_after_upload
return ret

@staticmethod
def from_dict(data: dict) -> "UploadFile":
return UploadFile(data["path"], data["ext"], data["file_path"], data.get("clean_after_upload", False))
return UploadFile(
data["path"],
data["ext"],
data.get("file_path"),
data.get("tmp_file_name"),
data.get("clean_after_upload", False),
)

@staticmethod
def get_upload_path(attribute_path: List[str], extension: str, upload_path: Path):
def get_tmp_file_name(attribute_path: List[str], extension: str):
now = datetime.now()
tmp_file_name = (
f"{'_'.join(attribute_path)}-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}.{extension}"
)
return upload_path / tmp_file_name
return tmp_file_name

def get_absolute_path(self, operation_storage: OperationStorage) -> str:
if self.file_path:
return self.file_path
elif self.tmp_file_name:
return str(operation_storage.upload_path / self.tmp_file_name)

raise NeptuneException("Expected 'file_path' or 'tmp_file_name' to be filled.")


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def process_batch(self, batch: List[Operation], version: int) -> None:
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
version_to_ack += processed_count
batch = batch[processed_count:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import shutil
from pathlib import Path
from typing import Union

from neptune.new.constants import NEPTUNE_DATA_DIRECTORY
from neptune.new.internal.container_type import ContainerType
Expand All @@ -28,8 +29,11 @@


class OperationStorage:
def __init__(self, data_path: str):
self._data_path = Path(data_path)
def __init__(self, data_path: Union[str, Path]):
if isinstance(data_path, Path):
self._data_path = data_path
else:
self._data_path = Path(data_path)

# initialize directories
os.makedirs(self.data_path, exist_ok=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType):
return data_path

def enqueue_operation(self, op: Operation, wait: bool) -> None:
_, errors = self._backend.execute_operations(self._container_id, self._container_type, [op])
_, errors = self._backend.execute_operations(
container_id=self._container_id,
container_type=self._container_type,
operations=[op],
operation_storage=self._operation_storage,
)
if errors:
raise errors[0]

Expand Down
9 changes: 9 additions & 0 deletions tests/e2e/standard/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import neptune.new as neptune
from neptune.common.exceptions import NeptuneException
from neptune.new.cli import sync
from neptune.new.types import File
from src.neptune.new.cli.commands import clear
from tests.e2e.base import (
AVAILABLE_CONTAINERS,
Expand Down Expand Up @@ -128,6 +129,11 @@ def test_offline_sync(self, environment):
val = fake.word()
run[key] = val

# and some file
key2 = self.gen_key()
val2 = File.from_content(b"dummybytes")
run[key2].upload(val2)

# and stop it
run.stop()

Expand All @@ -142,6 +148,9 @@ def test_offline_sync(self, environment):

run2 = neptune.init_run(with_id=sys_id, project=environment.project)
assert run2[key].fetch() == val
run2[key2].download()
with open(f"{tmp}/{key2.split('/')[-1]}.bin", "rb") as file:
assert file.read() == b"dummybytes"
run2.stop()

@pytest.mark.parametrize("container_type", ["run"])
Expand Down