From 9b111e2190a901a8b0284faf8753cafa27002e02 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Fri, 29 Sep 2023 10:44:00 +0200 Subject: [PATCH 01/23] Added NeptuneObject interface and stop synchronization util --- src/neptune/metadata_containers/abstract.py | 14 ++++++++-- .../metadata_containers/metadata_container.py | 4 +-- src/neptune/typing.py | 7 +++-- src/neptune/utils.py | 27 ++++++++++++++++++- 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/neptune/metadata_containers/abstract.py b/src/neptune/metadata_containers/abstract.py index c1263d05e..e7e42defd 100644 --- a/src/neptune/metadata_containers/abstract.py +++ b/src/neptune/metadata_containers/abstract.py @@ -13,13 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["SupportsNamespaces"] +__all__ = ["SupportsNamespaces", "NeptuneObject"] from abc import ( ABC, abstractmethod, ) -from typing import TYPE_CHECKING +from typing import ( + TYPE_CHECKING, + Optional, + Union, +) if TYPE_CHECKING: from neptune.handler import Handler @@ -64,3 +68,9 @@ def __delitem__(self, path) -> None: @abstractmethod def get_root_object(self) -> "SupportsNamespaces": ... + + +class NeptuneObject(SupportsNamespaces, ABC): + @abstractmethod + def stop(self, *, seconds: Optional[Union[float, int]] = None) -> None: + ... diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 1aa03ce10..203763c50 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -73,7 +73,7 @@ from neptune.internal.utils.paths import parse_path from neptune.internal.utils.uncaught_exception_handler import instance as uncaught_exception_handler from neptune.internal.value_to_attribute_visitor import ValueToAttributeVisitor -from neptune.metadata_containers.abstract import SupportsNamespaces +from neptune.metadata_containers.abstract import NeptuneObject from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode from neptune.types.type_casting import cast_value @@ -88,7 +88,7 @@ def inner_fun(self: "MetadataContainer", *args, **kwargs): return inner_fun -class MetadataContainer(AbstractContextManager, SupportsNamespaces): +class MetadataContainer(AbstractContextManager, NeptuneObject): container_type: ContainerType LEGACY_METHODS = set() diff --git a/src/neptune/typing.py b/src/neptune/typing.py index 267baa88b..46d2f97e5 100644 --- a/src/neptune/typing.py +++ b/src/neptune/typing.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["SupportsNamespaces"] +__all__ = ["SupportsNamespaces", "NeptuneObject"] -from neptune.metadata_containers.abstract import SupportsNamespaces +from neptune.metadata_containers.abstract import ( + NeptuneObject, + SupportsNamespaces, +) diff --git a/src/neptune/utils.py b/src/neptune/utils.py index 198e262f0..e8a83d0fa 100644 --- a/src/neptune/utils.py +++ b/src/neptune/utils.py @@ -14,7 +14,7 @@ # limitations under the License. # """Utility functions to support ML metadata logging with neptune.ai.""" -__all__ = ["stringify_unsupported"] +__all__ = ["stringify_unsupported", "stop_synchronization_callback"] from typing import ( Any, @@ -24,6 +24,10 @@ ) from neptune.internal.types.stringify_value import StringifyValue +from neptune.internal.utils.logger import logger +from neptune.typing import NeptuneObject + +DEFAULT_STOP_TIMEOUT = 60.0 def stringify_unsupported(value: Any) -> Union[StringifyValue, Mapping]: @@ -48,3 +52,24 @@ def stringify_unsupported(value: Any) -> Union[StringifyValue, Mapping]: return {str(k): stringify_unsupported(v) for k, v in value.items()} return StringifyValue(value=value) + + +def stop_synchronization_callback(neptune_object: NeptuneObject) -> None: + """ + Callback function that stops the synchronization of the experiment with Neptune. + + Args: + neptune_object (NeptuneObject): A Neptune object (run, model, model version, and project). + + Example: + >>> import neptune + >>> from neptune.utils import stop_synchronization_callback + >>> run = neptune.init_run( + ... async_no_progress_callback = stop_synchronization_callback + ... ) + + For more information, see: + https://docs.neptune.ai/? + """ + logger.error("Stopping synchronization using the stop synchronization callback.") + neptune_object.stop(seconds=DEFAULT_STOP_TIMEOUT) From 3c642a372dd474f55bfe7b431247d712ed7f320c Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Fri, 29 Sep 2023 11:03:19 +0200 Subject: [PATCH 02/23] Parameters added to neptune objects --- src/neptune/internal/init/parameters.py | 10 +++++- src/neptune/metadata_containers/__init__.py | 6 +++- .../metadata_containers/metadata_container.py | 23 ++++++++++++-- src/neptune/metadata_containers/model.py | 27 ++++++++++++++-- .../metadata_containers/model_version.py | 27 ++++++++++++++-- src/neptune/metadata_containers/project.py | 31 +++++++++++++++++-- src/neptune/metadata_containers/run.py | 27 ++++++++++++++-- src/neptune/typing.py | 3 +- 8 files changed, 140 insertions(+), 14 deletions(-) diff --git a/src/neptune/internal/init/parameters.py b/src/neptune/internal/init/parameters.py index db6e85b82..4ae0b8618 100644 --- a/src/neptune/internal/init/parameters.py +++ b/src/neptune/internal/init/parameters.py @@ -13,8 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["DEFAULT_FLUSH_PERIOD", "DEFAULT_NAME", "OFFLINE_PROJECT_QUALIFIED_NAME"] +__all__ = [ + "DEFAULT_FLUSH_PERIOD", + "DEFAULT_NAME", + "OFFLINE_PROJECT_QUALIFIED_NAME", + "ASYNC_LAG_THRESHOLD", + "ASYNC_NO_PROGRESS_THRESHOLD", +] DEFAULT_FLUSH_PERIOD = 5 DEFAULT_NAME = "Untitled" OFFLINE_PROJECT_QUALIFIED_NAME = "offline/project-placeholder" +ASYNC_LAG_THRESHOLD = 1800.0 +ASYNC_NO_PROGRESS_THRESHOLD = 300.0 diff --git a/src/neptune/metadata_containers/__init__.py b/src/neptune/metadata_containers/__init__.py index 930140042..d20696e47 100644 --- a/src/neptune/metadata_containers/__init__.py +++ b/src/neptune/metadata_containers/__init__.py @@ -15,13 +15,17 @@ # __all__ = [ "MetadataContainer", + "NeptuneObjectCallback", "Model", "ModelVersion", "Project", "Run", ] -from neptune.metadata_containers.metadata_container import MetadataContainer +from neptune.metadata_containers.metadata_container import ( + MetadataContainer, + NeptuneObjectCallback, +) from neptune.metadata_containers.model import Model from neptune.metadata_containers.model_version import ModelVersion from neptune.metadata_containers.project import Project diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 203763c50..224d42c88 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["MetadataContainer"] +__all__ = ["MetadataContainer", "NeptuneObjectCallback"] import abc import atexit @@ -25,7 +25,9 @@ from contextlib import AbstractContextManager from functools import wraps from typing import ( + TYPE_CHECKING, Any, + Callable, Dict, Iterable, List, @@ -63,7 +65,11 @@ UniqueId, conform_optional, ) -from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, + DEFAULT_FLUSH_PERIOD, +) from neptune.internal.operation import DeleteAttribute from neptune.internal.operation_processors.factory import get_operation_processor from neptune.internal.operation_processors.operation_processor import OperationProcessor @@ -78,6 +84,9 @@ from neptune.types.mode import Mode from neptune.types.type_casting import cast_value +if TYPE_CHECKING: + NeptuneObjectCallback = Callable[["MetadataContainer"], None] + def ensure_not_stopped(fun): @wraps(fun) @@ -101,12 +110,22 @@ def __init__( mode: Mode = Mode.ASYNC, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): verify_type("project", project, (str, type(None))) verify_type("api_token", api_token, (str, type(None))) verify_type("mode", mode, Mode) verify_type("flush_period", flush_period, (int, float)) verify_type("proxies", proxies, (dict, type(None))) + verify_type("async_lag_threshold", async_lag_threshold, (int, float)) + verify_type("async_lag_callback", async_lag_callback, (NeptuneObjectCallback, type(None))) + verify_type("async_no_progress_threshold", async_no_progress_threshold, (int, float)) + verify_type("async_no_progress_callback", async_no_progress_callback, (NeptuneObjectCallback, type(None))) + + # TODO: Save/pass further all async lag parameters self._mode: Mode = mode self._flush_period = flush_period diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index 20eafa965..6a29ffd5a 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -43,6 +43,8 @@ from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, DEFAULT_FLUSH_PERIOD, DEFAULT_NAME, OFFLINE_PROJECT_QUALIFIED_NAME, @@ -50,7 +52,10 @@ from neptune.internal.state import ContainerState from neptune.internal.utils import verify_type from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers import ( + MetadataContainer, + NeptuneObjectCallback, +) from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode @@ -71,6 +76,10 @@ def __init__( mode: Optional[str] = None, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): """Initializes a Model object from an existing or new model. @@ -105,6 +114,10 @@ def __init__( (in seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. + async_lag_callback: ? # TODO + async_lag_threshold: ? # TODO + async_no_progress_callback: ? # TODO + async_no_progress_threshold: ? # TODO Returns: Model object that is used to manage the model and log metadata to it. @@ -166,7 +179,17 @@ def __init__( if mode == Mode.DEBUG: project = OFFLINE_PROJECT_QUALIFIED_NAME - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: project_workspace = self._project_api_object.workspace diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index 2f43e722a..08d1a6597 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -35,6 +35,8 @@ from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, DEFAULT_FLUSH_PERIOD, DEFAULT_NAME, OFFLINE_PROJECT_QUALIFIED_NAME, @@ -43,7 +45,10 @@ from neptune.internal.state import ContainerState from neptune.internal.utils import verify_type from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers import ( + MetadataContainer, + NeptuneObjectCallback, +) from neptune.types.mode import Mode from neptune.types.model_version_stage import ModelVersionStage @@ -64,6 +69,10 @@ def __init__( mode: Optional[str] = None, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ) -> None: """Initializes a ModelVersion object from an existing or new model version. @@ -99,6 +108,10 @@ def __init__( (in seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. + async_lag_callback: ? # TODO + async_lag_threshold: ? # TODO + async_no_progress_callback: ? # TODO + async_no_progress_threshold: ? # TODO Returns: ModelVersion object that is used to manage the model version and log metadata to it. @@ -162,7 +175,17 @@ def __init__( if mode == Mode.DEBUG: project = OFFLINE_PROJECT_QUALIFIED_NAME - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: project_workspace = self._project_api_object.workspace diff --git a/src/neptune/metadata_containers/project.py b/src/neptune/metadata_containers/project.py index a6f7605a5..680eb0101 100644 --- a/src/neptune/metadata_containers/project.py +++ b/src/neptune/metadata_containers/project.py @@ -34,14 +34,21 @@ NQLQueryAttribute, ) from neptune.internal.container_type import ContainerType -from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, + DEFAULT_FLUSH_PERIOD, +) from neptune.internal.state import ContainerState from neptune.internal.utils import ( as_list, verify_type, ) from neptune.internal.utils.run_state import RunState -from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers import ( + MetadataContainer, + NeptuneObjectCallback, +) from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode @@ -59,6 +66,10 @@ def __init__( mode: Optional[str] = None, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): """Starts a connection to an existing Neptune project. @@ -88,6 +99,10 @@ def __init__( Defaults to 5 (every 5 seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. + async_lag_callback: ? # TODO + async_lag_threshold: ? # TODO + async_no_progress_callback: ? # TODO + async_no_progress_threshold: ? # TODO Returns: Project object that can be used to interact with the project as a whole, @@ -128,7 +143,17 @@ def __init__( if mode == Mode.OFFLINE: raise NeptuneException("Project can't be initialized in OFFLINE mode") - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: return ApiExperiment( diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index 7c2ab1056..abfd1a880 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -56,6 +56,8 @@ from neptune.internal.hardware.hardware_metric_reporting_job import HardwareMetricReportingJob from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, DEFAULT_FLUSH_PERIOD, DEFAULT_NAME, OFFLINE_PROJECT_QUALIFIED_NAME, @@ -88,7 +90,10 @@ from neptune.internal.utils.source_code import upload_source_code from neptune.internal.utils.traceback_job import TracebackJob from neptune.internal.websockets.websocket_signals_background_job import WebsocketSignalsBackgroundJob -from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers import ( + MetadataContainer, + NeptuneObjectCallback, +) from neptune.types import ( GitRef, StringSeries, @@ -147,6 +152,10 @@ def __init__( capture_traceback: bool = True, git_ref: Optional[Union[GitRef, GitRefDisabled, bool]] = None, dependencies: Optional[Union[str, os.PathLike]] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, **kwargs, ): """Starts a new tracked run that logs ML model-building metadata to neptune.ai. @@ -229,6 +238,10 @@ def __init__( dependencies: If you pass `"infer"`, Neptune logs dependencies installed in the current environment. You can also pass a path to your dependency file directly. If left empty, no dependency file is uploaded. + async_lag_callback: ? # TODO + async_lag_threshold: ? # TODO + async_no_progress_callback: ? # TODO + async_no_progress_threshold: ? # TODO Returns: Run object that is used to manage the tracked run and log metadata to it. @@ -382,7 +395,17 @@ def __init__( if mode == Mode.OFFLINE or mode == Mode.DEBUG: project = OFFLINE_PROJECT_QUALIFIED_NAME - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: project_workspace = self._project_api_object.workspace diff --git a/src/neptune/typing.py b/src/neptune/typing.py index 46d2f97e5..98f0de639 100644 --- a/src/neptune/typing.py +++ b/src/neptune/typing.py @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["SupportsNamespaces", "NeptuneObject"] +__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"] +from neptune.metadata_containers import NeptuneObjectCallback from neptune.metadata_containers.abstract import ( NeptuneObject, SupportsNamespaces, From e96eb9fce94110dfb3e38018cee5f4fec7237e0c Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Fri, 29 Sep 2023 11:05:20 +0200 Subject: [PATCH 03/23] Changelog updated --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cfa5e031..6e41c9bde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -## [UNRELEASED] neptune 1.7.1 +## [UNRELEASED] neptune 1.8.0 + +### Features +- Added support for no synchronization callbacks ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478)) ### Fixes - Add newline at the end of generated `.patch` while tracking uncommitted changes ([#1473](https://github.com/neptune-ai/neptune-client/pull/1473)) From bf65d14e1f40545275e47594d5ddcdad8ed1df56 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Fri, 29 Sep 2023 11:38:02 +0200 Subject: [PATCH 04/23] Fixed imports --- .gitignore | 3 +++ src/neptune/internal/utils/__init__.py | 6 ++++++ src/neptune/metadata_containers/__init__.py | 6 +----- src/neptune/metadata_containers/abstract.py | 6 +++++- .../metadata_containers/metadata_container.py | 21 ++++++++++--------- src/neptune/metadata_containers/model.py | 6 ++---- .../metadata_containers/model_version.py | 6 ++---- src/neptune/metadata_containers/project.py | 6 ++---- src/neptune/metadata_containers/run.py | 6 ++---- src/neptune/typing.py | 2 +- 10 files changed, 35 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 216e5aff6..642e27645 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ stream.bin # zenml e2e .zen + +# mocks +MagicMock/ diff --git a/src/neptune/internal/utils/__init__.py b/src/neptune/internal/utils/__init__.py index c19882b63..cc8845f93 100644 --- a/src/neptune/internal/utils/__init__.py +++ b/src/neptune/internal/utils/__init__.py @@ -26,6 +26,7 @@ "is_string_like", "is_stringify_value", "verify_collection_type", + "verify_optional_callable", "is_collection", "base64_encode", "base64_decode", @@ -133,6 +134,11 @@ def verify_collection_type(var_name: str, var, expected_type: Union[type, tuple] verify_type("elements of collection '{}'".format(var_name), value, expected_type) +def verify_optional_callable(var_name: str, var): + if var and not callable(var): + raise TypeError("{} must be a callable (was {})".format(var_name, type(var))) + + def is_collection(var) -> bool: return isinstance(var, (list, set, tuple)) diff --git a/src/neptune/metadata_containers/__init__.py b/src/neptune/metadata_containers/__init__.py index d20696e47..930140042 100644 --- a/src/neptune/metadata_containers/__init__.py +++ b/src/neptune/metadata_containers/__init__.py @@ -15,17 +15,13 @@ # __all__ = [ "MetadataContainer", - "NeptuneObjectCallback", "Model", "ModelVersion", "Project", "Run", ] -from neptune.metadata_containers.metadata_container import ( - MetadataContainer, - NeptuneObjectCallback, -) +from neptune.metadata_containers.metadata_container import MetadataContainer from neptune.metadata_containers.model import Model from neptune.metadata_containers.model_version import ModelVersion from neptune.metadata_containers.project import Project diff --git a/src/neptune/metadata_containers/abstract.py b/src/neptune/metadata_containers/abstract.py index e7e42defd..29a15ba42 100644 --- a/src/neptune/metadata_containers/abstract.py +++ b/src/neptune/metadata_containers/abstract.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["SupportsNamespaces", "NeptuneObject"] +__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"] from abc import ( ABC, @@ -21,6 +21,7 @@ ) from typing import ( TYPE_CHECKING, + Callable, Optional, Union, ) @@ -74,3 +75,6 @@ class NeptuneObject(SupportsNamespaces, ABC): @abstractmethod def stop(self, *, seconds: Optional[Union[float, int]] = None) -> None: ... + + +NeptuneObjectCallback = Callable[[NeptuneObject], None] diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 224d42c88..64678eb3e 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["MetadataContainer", "NeptuneObjectCallback"] +__all__ = ["MetadataContainer"] import abc import atexit @@ -25,9 +25,7 @@ from contextlib import AbstractContextManager from functools import wraps from typing import ( - TYPE_CHECKING, Any, - Callable, Dict, Iterable, List, @@ -74,19 +72,22 @@ from neptune.internal.operation_processors.factory import get_operation_processor from neptune.internal.operation_processors.operation_processor import OperationProcessor from neptune.internal.state import ContainerState -from neptune.internal.utils import verify_type +from neptune.internal.utils import ( + verify_optional_callable, + verify_type, +) from neptune.internal.utils.logger import logger from neptune.internal.utils.paths import parse_path from neptune.internal.utils.uncaught_exception_handler import instance as uncaught_exception_handler from neptune.internal.value_to_attribute_visitor import ValueToAttributeVisitor -from neptune.metadata_containers.abstract import NeptuneObject +from neptune.metadata_containers.abstract import ( + NeptuneObject, + NeptuneObjectCallback, +) from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode from neptune.types.type_casting import cast_value -if TYPE_CHECKING: - NeptuneObjectCallback = Callable[["MetadataContainer"], None] - def ensure_not_stopped(fun): @wraps(fun) @@ -121,9 +122,9 @@ def __init__( verify_type("flush_period", flush_period, (int, float)) verify_type("proxies", proxies, (dict, type(None))) verify_type("async_lag_threshold", async_lag_threshold, (int, float)) - verify_type("async_lag_callback", async_lag_callback, (NeptuneObjectCallback, type(None))) + verify_optional_callable("async_lag_callback", async_lag_callback) verify_type("async_no_progress_threshold", async_no_progress_threshold, (int, float)) - verify_type("async_no_progress_callback", async_no_progress_callback, (NeptuneObjectCallback, type(None))) + verify_optional_callable("async_no_progress_callback", async_no_progress_callback) # TODO: Save/pass further all async lag parameters diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index 6a29ffd5a..7a008d621 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -52,10 +52,8 @@ from neptune.internal.state import ContainerState from neptune.internal.utils import verify_type from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.metadata_containers import ( - MetadataContainer, - NeptuneObjectCallback, -) +from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index 08d1a6597..e3305e214 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -45,10 +45,8 @@ from neptune.internal.state import ContainerState from neptune.internal.utils import verify_type from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.metadata_containers import ( - MetadataContainer, - NeptuneObjectCallback, -) +from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.types.mode import Mode from neptune.types.model_version_stage import ModelVersionStage diff --git a/src/neptune/metadata_containers/project.py b/src/neptune/metadata_containers/project.py index 680eb0101..f549ed890 100644 --- a/src/neptune/metadata_containers/project.py +++ b/src/neptune/metadata_containers/project.py @@ -45,10 +45,8 @@ verify_type, ) from neptune.internal.utils.run_state import RunState -from neptune.metadata_containers import ( - MetadataContainer, - NeptuneObjectCallback, -) +from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index abfd1a880..2193c0781 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -90,10 +90,8 @@ from neptune.internal.utils.source_code import upload_source_code from neptune.internal.utils.traceback_job import TracebackJob from neptune.internal.websockets.websocket_signals_background_job import WebsocketSignalsBackgroundJob -from neptune.metadata_containers import ( - MetadataContainer, - NeptuneObjectCallback, -) +from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.types import ( GitRef, StringSeries, diff --git a/src/neptune/typing.py b/src/neptune/typing.py index 98f0de639..d319d4d28 100644 --- a/src/neptune/typing.py +++ b/src/neptune/typing.py @@ -15,8 +15,8 @@ # __all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"] -from neptune.metadata_containers import NeptuneObjectCallback from neptune.metadata_containers.abstract import ( NeptuneObject, + NeptuneObjectCallback, SupportsNamespaces, ) From 9a7ceb57426b55ff5a47799a0ab889e920ac9488 Mon Sep 17 00:00:00 2001 From: Sabine Date: Fri, 29 Sep 2023 14:27:56 +0300 Subject: [PATCH 05/23] add arg descriptions --- src/neptune/metadata_containers/model.py | 21 +++++++++++++++---- .../metadata_containers/model_version.py | 21 +++++++++++++++---- src/neptune/metadata_containers/project.py | 21 +++++++++++++++---- src/neptune/metadata_containers/run.py | 21 +++++++++++++++---- 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index 7a008d621..30142fd57 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -112,10 +112,23 @@ def __init__( (in seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. - async_lag_callback: ? # TODO - async_lag_threshold: ? # TODO - async_no_progress_callback: ? # TODO - async_no_progress_threshold: ? # TODO + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a Model object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback + (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` + argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback + should take a Model object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: For how long there has been no synchronization progress since the object was + initialized. If a no-progress callback (default callback enabled via environment variable or custom + callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when + this duration is exceeded. Returns: Model object that is used to manage the model and log metadata to it. diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index e3305e214..b399cc658 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -106,10 +106,23 @@ def __init__( (in seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. - async_lag_callback: ? # TODO - async_lag_threshold: ? # TODO - async_no_progress_callback: ? # TODO - async_no_progress_threshold: ? # TODO + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a ModelVersion object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback + (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` + argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback + should take a ModelVersion object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: For how long there has been no synchronization progress since the object was + initialized. If a no-progress callback (default callback enabled via environment variable or custom + callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when + this duration is exceeded. Returns: ModelVersion object that is used to manage the model version and log metadata to it. diff --git a/src/neptune/metadata_containers/project.py b/src/neptune/metadata_containers/project.py index f549ed890..b9802c9eb 100644 --- a/src/neptune/metadata_containers/project.py +++ b/src/neptune/metadata_containers/project.py @@ -97,10 +97,23 @@ def __init__( Defaults to 5 (every 5 seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. - async_lag_callback: ? # TODO - async_lag_threshold: ? # TODO - async_no_progress_callback: ? # TODO - async_no_progress_threshold: ? # TODO + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a Project object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback + (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` + argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback + should take a Project object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: For how long there has been no synchronization progress since the object was + initialized. If a no-progress callback (default callback enabled via environment variable or custom + callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when + this duration is exceeded. Returns: Project object that can be used to interact with the project as a whole, diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index 2193c0781..f9f67dc32 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -236,10 +236,23 @@ def __init__( dependencies: If you pass `"infer"`, Neptune logs dependencies installed in the current environment. You can also pass a path to your dependency file directly. If left empty, no dependency file is uploaded. - async_lag_callback: ? # TODO - async_lag_threshold: ? # TODO - async_no_progress_callback: ? # TODO - async_no_progress_threshold: ? # TODO + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a Run object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback + (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` + argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback + should take a Run object as the argument and may use the `stop()` method to stop the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: For how long there has been no synchronization progress since the object was + initialized. If a no-progress callback (default callback enabled via environment variable or custom + callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when + this duration is exceeded. Returns: Run object that is used to manage the tracked run and log metadata to it. From 45543ba6fff6791c1a0aed6af457066d8b2c7c38 Mon Sep 17 00:00:00 2001 From: Sabine Date: Fri, 29 Sep 2023 14:44:23 +0300 Subject: [PATCH 06/23] tweak stop_synchronization_callback docstring --- src/neptune/utils.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/neptune/utils.py b/src/neptune/utils.py index e8a83d0fa..159388225 100644 --- a/src/neptune/utils.py +++ b/src/neptune/utils.py @@ -55,11 +55,10 @@ def stringify_unsupported(value: Any) -> Union[StringifyValue, Mapping]: def stop_synchronization_callback(neptune_object: NeptuneObject) -> None: - """ - Callback function that stops the synchronization of the experiment with Neptune. + """Default callback function that stops a Neptune object's synchronization with the server. Args: - neptune_object (NeptuneObject): A Neptune object (run, model, model version, and project). + neptune_object: A Neptune object (Run, Model, ModelVersion, or Project) to be stopped. Example: >>> import neptune @@ -69,7 +68,7 @@ def stop_synchronization_callback(neptune_object: NeptuneObject) -> None: ... ) For more information, see: - https://docs.neptune.ai/? + https://docs.neptune.ai/api/utils/stop_synchronization_callback/ """ logger.error("Stopping synchronization using the stop synchronization callback.") neptune_object.stop(seconds=DEFAULT_STOP_TIMEOUT) From 887dff554a6537304a547d8939c57c77e41d229f Mon Sep 17 00:00:00 2001 From: Sabine Date: Fri, 29 Sep 2023 14:45:12 +0300 Subject: [PATCH 07/23] tweak callback error message --- src/neptune/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/neptune/utils.py b/src/neptune/utils.py index 159388225..80439ff78 100644 --- a/src/neptune/utils.py +++ b/src/neptune/utils.py @@ -70,5 +70,7 @@ def stop_synchronization_callback(neptune_object: NeptuneObject) -> None: For more information, see: https://docs.neptune.ai/api/utils/stop_synchronization_callback/ """ - logger.error("Stopping synchronization using the stop synchronization callback.") + logger.error( + "Threshold for disrupted synchronization exceeded. Stopping the synchronization using the default callback." + ) neptune_object.stop(seconds=DEFAULT_STOP_TIMEOUT) From 9d1155df32efb93818532c6e84421aa5f1a45eac Mon Sep 17 00:00:00 2001 From: Sabine Date: Fri, 29 Sep 2023 14:49:06 +0300 Subject: [PATCH 08/23] tweak changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e41c9bde..62b273048 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## [UNRELEASED] neptune 1.8.0 ### Features -- Added support for no synchronization callbacks ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478)) +- Added support for callbacks that stop the synchronization if the lag or lack of progress exceeds a certain threshold ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478)) ### Fixes - Add newline at the end of generated `.patch` while tracking uncommitted changes ([#1473](https://github.com/neptune-ai/neptune-client/pull/1473)) From 128159d9cdf28d0220846d947d5d99787f031f61 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 12:45:30 +0200 Subject: [PATCH 09/23] Support for no progress callback --- src/neptune/envs.py | 6 ++ .../async_operation_processor.py | 58 +++++++++++++++++-- .../internal/operation_processors/factory.py | 24 ++++++-- src/neptune/internal/threading/daemon.py | 2 +- .../metadata_containers/metadata_container.py | 36 +++++++++++- 5 files changed, 113 insertions(+), 13 deletions(-) diff --git a/src/neptune/envs.py b/src/neptune/envs.py index 86a9222bc..b01b94d41 100644 --- a/src/neptune/envs.py +++ b/src/neptune/envs.py @@ -28,6 +28,8 @@ "NEPTUNE_FETCH_TABLE_STEP_SIZE", "NEPTUNE_SYNC_AFTER_STOP_TIMEOUT", "NEPTUNE_REQUEST_TIMEOUT", + "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK", + "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK", ] from neptune.common.envs import ( @@ -59,4 +61,8 @@ NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT" +NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK" + +NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK" + S3_ENDPOINT_URL = "S3_ENDPOINT_URL" diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 2ab814cea..49844ed8c 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -25,6 +25,7 @@ time, ) from typing import ( + Callable, List, Optional, ) @@ -36,6 +37,10 @@ from neptune.internal.container_type import ContainerType from neptune.internal.disk_queue import DiskQueue from neptune.internal.id_formats import UniqueId +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, +) from neptune.internal.operation import Operation from neptune.internal.operation_processors.operation_processor import OperationProcessor from neptune.internal.operation_processors.operation_storage import ( @@ -60,6 +65,10 @@ def __init__( lock: threading.RLock, sleep_time: float = 5, batch_size: int = 1000, + async_lag_callback: Optional[Callable[[], None]] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[Callable[[], None]] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): self._operation_storage = OperationStorage(self._init_data_path(container_id, container_type)) @@ -74,9 +83,15 @@ def __init__( self._container_type = container_type self._backend = backend self._batch_size = batch_size + self._async_lag_callback = async_lag_callback + self._async_lag_threshold = async_lag_threshold + self._async_no_progress_callback = async_no_progress_callback + self._async_no_progress_threshold = async_no_progress_threshold self._last_version = 0 self._consumed_version = 0 self._consumer = self.ConsumerThread(self, sleep_time, batch_size) + self._lock = lock + self._should_call_no_progress_callback = False # Caller is responsible for taking this lock self._waiting_cond = threading.Condition(lock=lock) @@ -91,6 +106,9 @@ def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._last_version = self._queue.put(op) if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() + + self._check_for_callbacks() + if wait: self.wait() @@ -107,6 +125,13 @@ def wait(self): if not self._consumer.is_running(): raise NeptuneSynchronizationAlreadyStoppedException() + def _check_for_callbacks(self): + if self._should_call_no_progress_callback: + with self._lock: + self._should_call_no_progress_callback = False + if self._async_no_progress_callback: + self._async_no_progress_callback() + def flush(self): self._queue.flush() @@ -225,6 +250,8 @@ def __init__( self._processor = processor self._batch_size = batch_size self._last_flush = 0 + self._last_ack = 0 + self._no_progress_exceeded = False def run(self): try: @@ -246,6 +273,13 @@ def work(self) -> None: return self.process_batch([element.obj for element in batch], batch[-1].ver) + def _check_for_network_interruptions_callbacks(self): + if not self._no_progress_exceeded and self._processor._async_no_progress_callback: + if monotonic() - self._last_ack > self._processor._async_no_progress_threshold: + self._no_progress_exceeded = True + with self._processor._lock: + self._processor._should_call_no_progress_callback = True + @Daemon.ConnectionRetryWrapper( kill_message=( "Killing Neptune asynchronous thread. All data is safe on disk and can be later" @@ -257,14 +291,26 @@ def process_batch(self, batch: List[Operation], version: int) -> None: version_to_ack = version - expected_count while True: # TODO: Handle Metadata errors - processed_count, errors = self._processor._backend.execute_operations( - container_id=self._processor._container_id, - container_type=self._processor._container_type, - operations=batch, - operation_storage=self._processor._operation_storage, - ) + try: + processed_count, errors = self._processor._backend.execute_operations( + container_id=self._processor._container_id, + container_type=self._processor._container_type, + operations=batch, + operation_storage=self._processor._operation_storage, + ) + except Exception as e: + self._check_for_network_interruptions_callbacks() + raise e from e + version_to_ack += processed_count batch = batch[processed_count:] + + if processed_count > 0: + self._last_ack = monotonic() + self._no_progress_exceeded = False + else: + self._check_for_network_interruptions_callbacks() + with self._processor._waiting_cond: self._processor._queue.ack(version_to_ack) diff --git a/src/neptune/internal/operation_processors/factory.py b/src/neptune/internal/operation_processors/factory.py index d3f5167a5..bf6a38707 100644 --- a/src/neptune/internal/operation_processors/factory.py +++ b/src/neptune/internal/operation_processors/factory.py @@ -17,10 +17,18 @@ __all__ = ["get_operation_processor"] import threading +from typing import ( + Callable, + Optional, +) from neptune.internal.backends.neptune_backend import NeptuneBackend from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import UniqueId +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, +) from neptune.types.mode import Mode from .async_operation_processor import AsyncOperationProcessor @@ -37,14 +45,22 @@ def get_operation_processor( backend: NeptuneBackend, lock: threading.RLock, flush_period: float, + async_lag_callback: Optional[Callable[[], None]] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[Callable[[], None]] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ) -> OperationProcessor: if mode == Mode.ASYNC: return AsyncOperationProcessor( - container_id, - container_type, - backend, - lock, + container_id=container_id, + container_type=container_type, + backend=backend, + lock=lock, sleep_time=flush_period, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, ) elif mode == Mode.SYNC: return SyncOperationProcessor(container_id, container_type, backend) diff --git a/src/neptune/internal/threading/daemon.py b/src/neptune/internal/threading/daemon.py index 4beb6b18d..e4a8c5c79 100644 --- a/src/neptune/internal/threading/daemon.py +++ b/src/neptune/internal/threading/daemon.py @@ -107,7 +107,7 @@ def work(self): class ConnectionRetryWrapper: INITIAL_RETRY_BACKOFF = 2 - MAX_RETRY_BACKOFF = 120 + MAX_RETRY_BACKOFF = 10 def __init__(self, kill_message): self.kill_message = kill_message diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 64678eb3e..878cf5de4 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -40,6 +40,10 @@ from neptune.common.exceptions import UNIX_STYLES from neptune.common.utils import reset_internal_ssl_state from neptune.common.warnings import warn_about_unsupported_type +from neptune.envs import ( + NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK, + NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK, +) from neptune.exceptions import ( MetadataInconsistency, NeptunePossibleLegacyUsageException, @@ -87,6 +91,7 @@ from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode from neptune.types.type_casting import cast_value +from neptune.utils import stop_synchronization_callback def ensure_not_stopped(fun): @@ -126,8 +131,6 @@ def __init__( verify_type("async_no_progress_threshold", async_no_progress_threshold, (int, float)) verify_optional_callable("async_no_progress_callback", async_no_progress_callback) - # TODO: Save/pass further all async lag parameters - self._mode: Mode = mode self._flush_period = flush_period self._lock: threading.RLock = threading.RLock() @@ -149,6 +152,15 @@ def __init__( self._workspace: str = self._api_object.workspace self._project_name: str = self._api_object.project_name + self._async_lag_callback = self._get_callback( + provided_by_user=async_lag_callback, + env_name=NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK, + ) + self._async_no_progress_callback = self._get_callback( + provided_by_user=async_no_progress_callback, + env_name=NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK, + ) + self._op_processor: OperationProcessor = get_operation_processor( mode=mode, container_id=self._id, @@ -156,6 +168,10 @@ def __init__( backend=self._backend, lock=self._lock, flush_period=flush_period, + async_lag_callback=self._async_lag_callback_method, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=self._async_no_progress_callback_method, + async_no_progress_threshold=async_no_progress_threshold, ) self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() self._structure: ContainerStructure[Attribute, NamespaceAttr] = ContainerStructure(NamespaceBuilder(self)) @@ -186,6 +202,22 @@ def __init__( On Linux it looks like it does not help much but does not break anything either. """ + def _async_lag_callback_method(self): + if self._async_lag_callback is not None: + self._async_lag_callback(self) + + def _async_no_progress_callback_method(self): + if self._async_no_progress_callback is not None: + self._async_no_progress_callback(self) + + @staticmethod + def _get_callback(provided_by_user, env_name): + if provided_by_user is not None: + return provided_by_user + if env_name in os.environ and os.getenv(env_name) == "TRUE": + return stop_synchronization_callback + return None + def _handle_fork_in_parent(self): reset_internal_ssl_state() if self._state == ContainerState.STARTED: From f1b8afa007efd6d6726f796e3aea1a6a0b8158ee Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 13:21:00 +0200 Subject: [PATCH 10/23] Added support for lag callback --- .../async_operation_processor.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 49844ed8c..3f5dc8a7c 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -91,6 +91,9 @@ def __init__( self._consumed_version = 0 self._consumer = self.ConsumerThread(self, sleep_time, batch_size) self._lock = lock + self._last_ack = None + self._lag_exceeded = False + self._should_call_lag_callback = False self._should_call_no_progress_callback = False # Caller is responsible for taking this lock @@ -104,6 +107,13 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._last_version = self._queue.put(op) + + if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: + with self._lock: + self._lag_exceeded = True + if self._async_no_progress_callback: + self._async_no_progress_callback() + if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() @@ -129,8 +139,8 @@ def _check_for_callbacks(self): if self._should_call_no_progress_callback: with self._lock: self._should_call_no_progress_callback = False - if self._async_no_progress_callback: - self._async_no_progress_callback() + if self._async_no_progress_callback: + self._async_no_progress_callback() def flush(self): self._queue.flush() @@ -321,6 +331,8 @@ def process_batch(self, batch: List[Operation], version: int) -> None: ) self._processor._consumed_version = version_to_ack + self._processor._last_ack = monotonic() + self._processor._lag_exceeded = False if version_to_ack == version: self._processor._waiting_cond.notify_all() From 572ee61929a229e7c2d68dc910448d99148d3927 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 14:28:56 +0200 Subject: [PATCH 11/23] Auto review --- src/neptune/internal/init/parameters.py | 2 ++ .../internal/operation_processors/async_operation_processor.py | 3 ++- src/neptune/internal/threading/daemon.py | 2 +- src/neptune/utils.py | 3 +-- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/neptune/internal/init/parameters.py b/src/neptune/internal/init/parameters.py index 4ae0b8618..4e897957b 100644 --- a/src/neptune/internal/init/parameters.py +++ b/src/neptune/internal/init/parameters.py @@ -19,6 +19,7 @@ "OFFLINE_PROJECT_QUALIFIED_NAME", "ASYNC_LAG_THRESHOLD", "ASYNC_NO_PROGRESS_THRESHOLD", + "DEFAULT_STOP_TIMEOUT", ] DEFAULT_FLUSH_PERIOD = 5 @@ -26,3 +27,4 @@ OFFLINE_PROJECT_QUALIFIED_NAME = "offline/project-placeholder" ASYNC_LAG_THRESHOLD = 1800.0 ASYNC_NO_PROGRESS_THRESHOLD = 300.0 +DEFAULT_STOP_TIMEOUT = 60.0 diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 3f5dc8a7c..905c7530a 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -40,6 +40,7 @@ from neptune.internal.init.parameters import ( ASYNC_LAG_THRESHOLD, ASYNC_NO_PROGRESS_THRESHOLD, + DEFAULT_STOP_TIMEOUT, ) from neptune.internal.operation import Operation from neptune.internal.operation_processors.operation_processor import OperationProcessor @@ -55,7 +56,7 @@ class AsyncOperationProcessor(OperationProcessor): STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30 - STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300")) + STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, DEFAULT_STOP_TIMEOUT)) def __init__( self, diff --git a/src/neptune/internal/threading/daemon.py b/src/neptune/internal/threading/daemon.py index e4a8c5c79..4beb6b18d 100644 --- a/src/neptune/internal/threading/daemon.py +++ b/src/neptune/internal/threading/daemon.py @@ -107,7 +107,7 @@ def work(self): class ConnectionRetryWrapper: INITIAL_RETRY_BACKOFF = 2 - MAX_RETRY_BACKOFF = 10 + MAX_RETRY_BACKOFF = 120 def __init__(self, kill_message): self.kill_message = kill_message diff --git a/src/neptune/utils.py b/src/neptune/utils.py index 80439ff78..298a5b41e 100644 --- a/src/neptune/utils.py +++ b/src/neptune/utils.py @@ -23,12 +23,11 @@ Union, ) +from neptune.internal.init.parameters import DEFAULT_STOP_TIMEOUT from neptune.internal.types.stringify_value import StringifyValue from neptune.internal.utils.logger import logger from neptune.typing import NeptuneObject -DEFAULT_STOP_TIMEOUT = 60.0 - def stringify_unsupported(value: Any) -> Union[StringifyValue, Mapping]: """Helper function that converts unsupported values in a collection or dictionary to strings. From df28866490d555be1f7505469d12e6162c7621a3 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:11:21 +0200 Subject: [PATCH 12/23] Auto review --- .../metadata_containers/metadata_container.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 878cf5de4..0c1022c71 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -152,12 +152,14 @@ def __init__( self._workspace: str = self._api_object.workspace self._project_name: str = self._api_object.project_name + self._async_lag_threshold = async_lag_threshold self._async_lag_callback = self._get_callback( - provided_by_user=async_lag_callback, + provided=async_lag_callback, env_name=NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK, ) + self._async_no_progress_threshold = async_no_progress_threshold self._async_no_progress_callback = self._get_callback( - provided_by_user=async_no_progress_callback, + provided=async_no_progress_callback, env_name=NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK, ) @@ -169,9 +171,9 @@ def __init__( lock=self._lock, flush_period=flush_period, async_lag_callback=self._async_lag_callback_method, - async_lag_threshold=async_lag_threshold, + async_lag_threshold=self.async_lag_threshold, async_no_progress_callback=self._async_no_progress_callback_method, - async_no_progress_threshold=async_no_progress_threshold, + async_no_progress_threshold=self.async_no_progress_threshold, ) self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() self._structure: ContainerStructure[Attribute, NamespaceAttr] = ContainerStructure(NamespaceBuilder(self)) @@ -202,18 +204,18 @@ def __init__( On Linux it looks like it does not help much but does not break anything either. """ - def _async_lag_callback_method(self): + def _async_lag_callback_method(self) -> None: if self._async_lag_callback is not None: self._async_lag_callback(self) - def _async_no_progress_callback_method(self): + def _async_no_progress_callback_method(self) -> None: if self._async_no_progress_callback is not None: self._async_no_progress_callback(self) @staticmethod - def _get_callback(provided_by_user, env_name): - if provided_by_user is not None: - return provided_by_user + def _get_callback(provided: Optional[NeptuneObjectCallback], env_name: str) -> Optional[NeptuneObjectCallback]: + if provided is not None: + return provided if env_name in os.environ and os.getenv(env_name) == "TRUE": return stop_synchronization_callback return None @@ -239,6 +241,10 @@ def _handle_fork_in_child(self): backend=self._backend, lock=self._lock, flush_period=self._flush_period, + async_lag_callback=self._async_lag_callback_method, + async_lag_threshold=self.async_lag_threshold, + async_no_progress_callback=self._async_no_progress_callback_method, + async_no_progress_threshold=self.async_no_progress_threshold, ) # TODO: Every implementation of background job should handle fork by itself. From 4c4a2c1f9ce2241e4f170588abc2eeeef7343a2c Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:17:24 +0200 Subject: [PATCH 13/23] Auto review 2 --- .../async_operation_processor.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 905c7530a..6b728823e 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -84,9 +84,9 @@ def __init__( self._container_type = container_type self._backend = backend self._batch_size = batch_size - self._async_lag_callback = async_lag_callback + self._async_lag_callback = async_lag_callback or (lambda: None) self._async_lag_threshold = async_lag_threshold - self._async_no_progress_callback = async_no_progress_callback + self._async_no_progress_callback = async_no_progress_callback or (lambda: None) self._async_no_progress_threshold = async_no_progress_threshold self._last_version = 0 self._consumed_version = 0 @@ -94,7 +94,6 @@ def __init__( self._lock = lock self._last_ack = None self._lag_exceeded = False - self._should_call_lag_callback = False self._should_call_no_progress_callback = False # Caller is responsible for taking this lock @@ -112,13 +111,12 @@ def enqueue_operation(self, op: Operation, *, wait: bool) -> None: if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: with self._lock: self._lag_exceeded = True - if self._async_no_progress_callback: - self._async_no_progress_callback() + self._async_no_progress_callback() if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() - self._check_for_callbacks() + self._check_no_progress_callback() if wait: self.wait() @@ -136,12 +134,11 @@ def wait(self): if not self._consumer.is_running(): raise NeptuneSynchronizationAlreadyStoppedException() - def _check_for_callbacks(self): - if self._should_call_no_progress_callback: - with self._lock: + def _check_no_progress_callback(self): + with self._lock: + if self._should_call_no_progress_callback: + self._async_no_progress_callback() self._should_call_no_progress_callback = False - if self._async_no_progress_callback: - self._async_no_progress_callback() def flush(self): self._queue.flush() From 7543e371d2faad14aba5eb2cec88de8fff3cfc8d Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:21:58 +0200 Subject: [PATCH 14/23] Auto review 3 --- .../async_operation_processor.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 6b728823e..57bb97df0 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -106,18 +106,12 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, process_path) def enqueue_operation(self, op: Operation, *, wait: bool) -> None: - self._last_version = self._queue.put(op) - - if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: - with self._lock: - self._lag_exceeded = True - self._async_no_progress_callback() + self._check_lag() + self._check_no_progress() + self._last_version = self._queue.put(op) if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() - - self._check_no_progress_callback() - if wait: self.wait() @@ -134,7 +128,13 @@ def wait(self): if not self._consumer.is_running(): raise NeptuneSynchronizationAlreadyStoppedException() - def _check_no_progress_callback(self): + def _check_lag(self): + with self._lock: + if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: + self._lag_exceeded = True + self._async_no_progress_callback() + + def _check_no_progress(self): with self._lock: if self._should_call_no_progress_callback: self._async_no_progress_callback() From 1df03d7724c8d3fc47c6351687c8b8a7e2ec72d9 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:27:28 +0200 Subject: [PATCH 15/23] Auto review 4 --- .../async_operation_processor.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 57bb97df0..38771f095 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -281,11 +281,11 @@ def work(self) -> None: return self.process_batch([element.obj for element in batch], batch[-1].ver) - def _check_for_network_interruptions_callbacks(self): - if not self._no_progress_exceeded and self._processor._async_no_progress_callback: - if monotonic() - self._last_ack > self._processor._async_no_progress_threshold: - self._no_progress_exceeded = True + def _check_no_progress(self): + if not self._no_progress_exceeded: + if monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold: with self._processor._lock: + self._no_progress_exceeded = True self._processor._should_call_no_progress_callback = True @Daemon.ConnectionRetryWrapper( @@ -307,17 +307,19 @@ def process_batch(self, batch: List[Operation], version: int) -> None: operation_storage=self._processor._operation_storage, ) except Exception as e: - self._check_for_network_interruptions_callbacks() + self._check_no_progress() raise e from e version_to_ack += processed_count batch = batch[processed_count:] if processed_count > 0: - self._last_ack = monotonic() - self._no_progress_exceeded = False + with self._processor._lock: + self._processor._last_ack = monotonic() + self._processor._lag_exceeded = False + self._no_progress_exceeded = False else: - self._check_for_network_interruptions_callbacks() + self._check_no_progress() with self._processor._waiting_cond: self._processor._queue.ack(version_to_ack) @@ -329,8 +331,6 @@ def process_batch(self, batch: List[Operation], version: int) -> None: ) self._processor._consumed_version = version_to_ack - self._processor._last_ack = monotonic() - self._processor._lag_exceeded = False if version_to_ack == version: self._processor._waiting_cond.notify_all() From 4bcdcb2a1ecc9a31b80be096f100893c3f4d24bd Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:30:30 +0200 Subject: [PATCH 16/23] Fixes --- src/neptune/metadata_containers/metadata_container.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 0c1022c71..be53b38d9 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -171,9 +171,9 @@ def __init__( lock=self._lock, flush_period=flush_period, async_lag_callback=self._async_lag_callback_method, - async_lag_threshold=self.async_lag_threshold, + async_lag_threshold=self._async_lag_threshold, async_no_progress_callback=self._async_no_progress_callback_method, - async_no_progress_threshold=self.async_no_progress_threshold, + async_no_progress_threshold=self._async_no_progress_threshold, ) self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() self._structure: ContainerStructure[Attribute, NamespaceAttr] = ContainerStructure(NamespaceBuilder(self)) From d63465f755ed074e5d8ffa6c1290ea31d30acf79 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:42:00 +0200 Subject: [PATCH 17/23] Auto review 5 --- .../async_operation_processor.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 38771f095..f4d95ff8c 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -129,16 +129,18 @@ def wait(self): raise NeptuneSynchronizationAlreadyStoppedException() def _check_lag(self): - with self._lock: - if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: - self._lag_exceeded = True - self._async_no_progress_callback() + if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: + with self._lock: + if not self._lag_exceeded: + self._async_no_progress_callback() + self._lag_exceeded = True def _check_no_progress(self): - with self._lock: - if self._should_call_no_progress_callback: - self._async_no_progress_callback() - self._should_call_no_progress_callback = False + if self._should_call_no_progress_callback: + with self._lock: + if self._should_call_no_progress_callback: + self._async_no_progress_callback() + self._should_call_no_progress_callback = False def flush(self): self._queue.flush() @@ -258,7 +260,6 @@ def __init__( self._processor = processor self._batch_size = batch_size self._last_flush = 0 - self._last_ack = 0 self._no_progress_exceeded = False def run(self): From aaa20b776c7392ca05272ef57f48272ba836ae51 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:46:02 +0200 Subject: [PATCH 18/23] Auto review 6 --- .../async_operation_processor.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index f4d95ff8c..3e419bb60 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -285,9 +285,8 @@ def work(self) -> None: def _check_no_progress(self): if not self._no_progress_exceeded: if monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold: - with self._processor._lock: - self._no_progress_exceeded = True - self._processor._should_call_no_progress_callback = True + self._no_progress_exceeded = True + self._processor._should_call_no_progress_callback = True @Daemon.ConnectionRetryWrapper( kill_message=( @@ -309,20 +308,17 @@ def process_batch(self, batch: List[Operation], version: int) -> None: ) except Exception as e: self._check_no_progress() + # Let default retry logic handle this raise e from e version_to_ack += processed_count batch = batch[processed_count:] - if processed_count > 0: - with self._processor._lock: - self._processor._last_ack = monotonic() - self._processor._lag_exceeded = False - self._no_progress_exceeded = False - else: - self._check_no_progress() - with self._processor._waiting_cond: + self._processor._last_ack = monotonic() + self._processor._lag_exceeded = False + self._no_progress_exceeded = False + self._processor._queue.ack(version_to_ack) for error in errors: From 0d80cd040f47efa04e8d886c306a209763e2cbd6 Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 15:49:46 +0200 Subject: [PATCH 19/23] Auto review 7 --- .../operation_processors/async_operation_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 3e419bb60..6419c2179 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -311,15 +311,15 @@ def process_batch(self, batch: List[Operation], version: int) -> None: # Let default retry logic handle this raise e from e + self._no_progress_exceeded = False + version_to_ack += processed_count batch = batch[processed_count:] with self._processor._waiting_cond: + self._processor._queue.ack(version_to_ack) self._processor._last_ack = monotonic() self._processor._lag_exceeded = False - self._no_progress_exceeded = False - - self._processor._queue.ack(version_to_ack) for error in errors: _logger.error( From b9709d98b7157ff89a625b42764ea064d0abfecb Mon Sep 17 00:00:00 2001 From: Sabine Date: Mon, 2 Oct 2023 17:59:00 +0300 Subject: [PATCH 20/23] clarify arg descrptions --- src/neptune/metadata_containers/model.py | 21 +++++++++--------- .../metadata_containers/model_version.py | 22 ++++++++++--------- src/neptune/metadata_containers/project.py | 20 +++++++++-------- src/neptune/metadata_containers/run.py | 20 +++++++++-------- 4 files changed, 45 insertions(+), 38 deletions(-) diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index 30142fd57..69a17bce4 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -114,21 +114,22 @@ def __init__( For more information about proxies, see the Requests documentation. async_lag_callback: Custom callback which is called if the lag between a queued operation and its synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback - should take a Model object as the argument and may use the `stop()` method to stop the object. + should take a Model object as the argument and can contain any custom code, such as calling `stop()` on + the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. - async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback - (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` - argument) is enabled, the callback is called when this duration is exceeded. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. async_no_progress_callback: Custom callback which is called if there has been no synchronization progress - whatsoever for the duration defined by `async_no_progress_threshold`. The callback - should take a Model object as the argument and may use the `stop()` method to stop the object. + whatsoever for the duration defined by `async_no_progress_threshold`. The callback should take a Model + object as the argument and can contain any custom code, such as calling `stop()` on the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. - async_no_progress_threshold: For how long there has been no synchronization progress since the object was - initialized. If a no-progress callback (default callback enabled via environment variable or custom - callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when - this duration is exceeded. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: Model object that is used to manage the model and log metadata to it. diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index b399cc658..2ac34c846 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -108,21 +108,23 @@ def __init__( For more information about proxies, see the Requests documentation. async_lag_callback: Custom callback which is called if the lag between a queued operation and its synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback - should take a ModelVersion object as the argument and may use the `stop()` method to stop the object. + should take a ModelVersion object as the argument and can contain any custom code, such as calling + `stop()` on the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. - async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback - (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` - argument) is enabled, the callback is called when this duration is exceeded. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. async_no_progress_callback: Custom callback which is called if there has been no synchronization progress - whatsoever for the duration defined by `async_no_progress_threshold`. The callback - should take a ModelVersion object as the argument and may use the `stop()` method to stop the object. + whatsoever for the duration defined by `async_no_progress_threshold`. The callback should take a + ModelVersion object as the argument and can contain any custom code, such as calling `stop()` on the + object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. - async_no_progress_threshold: For how long there has been no synchronization progress since the object was - initialized. If a no-progress callback (default callback enabled via environment variable or custom - callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when - this duration is exceeded. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: ModelVersion object that is used to manage the model version and log metadata to it. diff --git a/src/neptune/metadata_containers/project.py b/src/neptune/metadata_containers/project.py index b9802c9eb..87c914b33 100644 --- a/src/neptune/metadata_containers/project.py +++ b/src/neptune/metadata_containers/project.py @@ -99,21 +99,23 @@ def __init__( For more information about proxies, see the Requests documentation. async_lag_callback: Custom callback which is called if the lag between a queued operation and its synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback - should take a Project object as the argument and may use the `stop()` method to stop the object. + should take a Project object as the argument and can contain any custom code, such as calling `stop()` + on the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. - async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback - (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` - argument) is enabled, the callback is called when this duration is exceeded. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. async_no_progress_callback: Custom callback which is called if there has been no synchronization progress whatsoever for the duration defined by `async_no_progress_threshold`. The callback - should take a Project object as the argument and may use the `stop()` method to stop the object. + should take a Project object as the argument and can contain any custom code, such as calling `stop()` + on the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. - async_no_progress_threshold: For how long there has been no synchronization progress since the object was - initialized. If a no-progress callback (default callback enabled via environment variable or custom - callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when - this duration is exceeded. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: Project object that can be used to interact with the project as a whole, diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index f9f67dc32..6033a3b24 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -238,21 +238,23 @@ def __init__( If left empty, no dependency file is uploaded. async_lag_callback: Custom callback which is called if the lag between a queued operation and its synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback - should take a Run object as the argument and may use the `stop()` method to stop the object. + should take a Run object as the argument and can contain any custom code, such as calling `stop()` on + the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. - async_lag_threshold: Duration between the queueing and synchronization of an operation. If a lag callback - (default callback enabled via environment variable or custom callback passed to the `async_lag_callback` - argument) is enabled, the callback is called when this duration is exceeded. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. async_no_progress_callback: Custom callback which is called if there has been no synchronization progress whatsoever for the duration defined by `async_no_progress_threshold`. The callback - should take a Run object as the argument and may use the `stop()` method to stop the object. + should take a Run object as the argument and can contain any custom code, such as calling `stop()` on + the object. Note: Instead of using this argument, you can use Neptune's default callback by setting the `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. - async_no_progress_threshold: For how long there has been no synchronization progress since the object was - initialized. If a no-progress callback (default callback enabled via environment variable or custom - callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called when - this duration is exceeded. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: Run object that is used to manage the tracked run and log metadata to it. From 8f9526769f4c14841600c0fe19b6f480042f950f Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 18:11:21 +0200 Subject: [PATCH 21/23] Fixes --- .../internal/operation_processors/async_operation_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 6419c2179..f9bc4668a 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -106,10 +106,11 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, process_path) def enqueue_operation(self, op: Operation, *, wait: bool) -> None: + self._last_version = self._queue.put(op) + self._check_lag() self._check_no_progress() - self._last_version = self._queue.put(op) if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() if wait: From cfcdce25311728750120d17babfe5d9e3177747a Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 18:23:57 +0200 Subject: [PATCH 22/23] Code review --- .../metadata_containers/metadata_container.py | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index be53b38d9..9ca07ad2b 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -23,7 +23,10 @@ import time import traceback from contextlib import AbstractContextManager -from functools import wraps +from functools import ( + partial, + wraps, +) from typing import ( Any, Dict, @@ -170,9 +173,11 @@ def __init__( backend=self._backend, lock=self._lock, flush_period=flush_period, - async_lag_callback=self._async_lag_callback_method, + async_lag_callback=partial(self._async_lag_callback, self) if self._async_lag_callback else None, async_lag_threshold=self._async_lag_threshold, - async_no_progress_callback=self._async_no_progress_callback_method, + async_no_progress_callback=partial(self._async_no_progress_callback, self) + if self._async_no_progress_callback + else None, async_no_progress_threshold=self._async_no_progress_threshold, ) self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() @@ -204,19 +209,11 @@ def __init__( On Linux it looks like it does not help much but does not break anything either. """ - def _async_lag_callback_method(self) -> None: - if self._async_lag_callback is not None: - self._async_lag_callback(self) - - def _async_no_progress_callback_method(self) -> None: - if self._async_no_progress_callback is not None: - self._async_no_progress_callback(self) - @staticmethod def _get_callback(provided: Optional[NeptuneObjectCallback], env_name: str) -> Optional[NeptuneObjectCallback]: if provided is not None: return provided - if env_name in os.environ and os.getenv(env_name) == "TRUE": + if os.getenv(env_name, "") == "TRUE": return stop_synchronization_callback return None @@ -241,10 +238,12 @@ def _handle_fork_in_child(self): backend=self._backend, lock=self._lock, flush_period=self._flush_period, - async_lag_callback=self._async_lag_callback_method, - async_lag_threshold=self.async_lag_threshold, - async_no_progress_callback=self._async_no_progress_callback_method, - async_no_progress_threshold=self.async_no_progress_threshold, + async_lag_callback=partial(self._async_lag_callback, self) if self._async_lag_callback else None, + async_lag_threshold=self._async_lag_threshold, + async_no_progress_callback=partial(self._async_no_progress_callback, self) + if self._async_no_progress_callback + else None, + async_no_progress_threshold=self._async_no_progress_threshold, ) # TODO: Every implementation of background job should handle fork by itself. From 94881c6edb62a479e36e87cd20059ec535b0148c Mon Sep 17 00:00:00 2001 From: Rafal Jankowski Date: Mon, 2 Oct 2023 22:33:16 +0200 Subject: [PATCH 23/23] Code review --- .../async_operation_processor.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index f9bc4668a..1e03bab05 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -130,18 +130,22 @@ def wait(self): raise NeptuneSynchronizationAlreadyStoppedException() def _check_lag(self): - if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: - with self._lock: - if not self._lag_exceeded: - self._async_no_progress_callback() - self._lag_exceeded = True + if self._lag_exceeded or not self._last_ack or monotonic() - self._last_ack <= self._async_lag_threshold: + return + + with self._lock: + if not self._lag_exceeded: + self._async_no_progress_callback() + self._lag_exceeded = True def _check_no_progress(self): - if self._should_call_no_progress_callback: - with self._lock: - if self._should_call_no_progress_callback: - self._async_no_progress_callback() - self._should_call_no_progress_callback = False + if not self._should_call_no_progress_callback: + return + + with self._lock: + if self._should_call_no_progress_callback: + self._async_no_progress_callback() + self._should_call_no_progress_callback = False def flush(self): self._queue.flush()