diff --git a/.gitignore b/.gitignore index 216e5aff6..642e27645 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ stream.bin # zenml e2e .zen + +# mocks +MagicMock/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 7901c7406..9624f5a26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## [UNRELEASED] neptune 1.8.0 +### Features +- Added support for callbacks that stop the synchronization if the lag or lack of progress exceeds a certain threshold ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478)) + ### Fixes - Add newline at the end of generated `.patch` while tracking uncommitted changes ([#1473](https://github.com/neptune-ai/neptune-client/pull/1473)) - Clarify `NeptuneLimitExceedException` error message ([#1480](https://github.com/neptune-ai/neptune-client/pull/1480)) diff --git a/src/neptune/envs.py b/src/neptune/envs.py index 86a9222bc..b01b94d41 100644 --- a/src/neptune/envs.py +++ b/src/neptune/envs.py @@ -28,6 +28,8 @@ "NEPTUNE_FETCH_TABLE_STEP_SIZE", "NEPTUNE_SYNC_AFTER_STOP_TIMEOUT", "NEPTUNE_REQUEST_TIMEOUT", + "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK", + "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK", ] from neptune.common.envs import ( @@ -59,4 +61,8 @@ NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT" +NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK" + +NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK" + S3_ENDPOINT_URL = "S3_ENDPOINT_URL" diff --git a/src/neptune/internal/init/parameters.py b/src/neptune/internal/init/parameters.py index db6e85b82..4e897957b 100644 --- a/src/neptune/internal/init/parameters.py +++ b/src/neptune/internal/init/parameters.py @@ -13,8 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["DEFAULT_FLUSH_PERIOD", "DEFAULT_NAME", "OFFLINE_PROJECT_QUALIFIED_NAME"] +__all__ = [ + "DEFAULT_FLUSH_PERIOD", + "DEFAULT_NAME", + "OFFLINE_PROJECT_QUALIFIED_NAME", + "ASYNC_LAG_THRESHOLD", + "ASYNC_NO_PROGRESS_THRESHOLD", + "DEFAULT_STOP_TIMEOUT", +] DEFAULT_FLUSH_PERIOD = 5 DEFAULT_NAME = "Untitled" OFFLINE_PROJECT_QUALIFIED_NAME = "offline/project-placeholder" +ASYNC_LAG_THRESHOLD = 1800.0 +ASYNC_NO_PROGRESS_THRESHOLD = 300.0 +DEFAULT_STOP_TIMEOUT = 60.0 diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 2ab814cea..1e03bab05 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -25,6 +25,7 @@ time, ) from typing import ( + Callable, List, Optional, ) @@ -36,6 +37,11 @@ from neptune.internal.container_type import ContainerType from neptune.internal.disk_queue import DiskQueue from neptune.internal.id_formats import UniqueId +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, + DEFAULT_STOP_TIMEOUT, +) from neptune.internal.operation import Operation from neptune.internal.operation_processors.operation_processor import OperationProcessor from neptune.internal.operation_processors.operation_storage import ( @@ -50,7 +56,7 @@ class AsyncOperationProcessor(OperationProcessor): STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30 - STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300")) + STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, DEFAULT_STOP_TIMEOUT)) def __init__( self, @@ -60,6 +66,10 @@ def __init__( lock: threading.RLock, sleep_time: float = 5, batch_size: int = 1000, + async_lag_callback: Optional[Callable[[], None]] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[Callable[[], None]] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): self._operation_storage = OperationStorage(self._init_data_path(container_id, container_type)) @@ -74,9 +84,17 @@ def __init__( self._container_type = container_type self._backend = backend self._batch_size = batch_size + self._async_lag_callback = async_lag_callback or (lambda: None) + self._async_lag_threshold = async_lag_threshold + self._async_no_progress_callback = async_no_progress_callback or (lambda: None) + self._async_no_progress_threshold = async_no_progress_threshold self._last_version = 0 self._consumed_version = 0 self._consumer = self.ConsumerThread(self, sleep_time, batch_size) + self._lock = lock + self._last_ack = None + self._lag_exceeded = False + self._should_call_no_progress_callback = False # Caller is responsible for taking this lock self._waiting_cond = threading.Condition(lock=lock) @@ -89,6 +107,10 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa def enqueue_operation(self, op: Operation, *, wait: bool) -> None: self._last_version = self._queue.put(op) + + self._check_lag() + self._check_no_progress() + if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() if wait: @@ -107,6 +129,24 @@ def wait(self): if not self._consumer.is_running(): raise NeptuneSynchronizationAlreadyStoppedException() + def _check_lag(self): + if self._lag_exceeded or not self._last_ack or monotonic() - self._last_ack <= self._async_lag_threshold: + return + + with self._lock: + if not self._lag_exceeded: + self._async_no_progress_callback() + self._lag_exceeded = True + + def _check_no_progress(self): + if not self._should_call_no_progress_callback: + return + + with self._lock: + if self._should_call_no_progress_callback: + self._async_no_progress_callback() + self._should_call_no_progress_callback = False + def flush(self): self._queue.flush() @@ -225,6 +265,7 @@ def __init__( self._processor = processor self._batch_size = batch_size self._last_flush = 0 + self._no_progress_exceeded = False def run(self): try: @@ -246,6 +287,12 @@ def work(self) -> None: return self.process_batch([element.obj for element in batch], batch[-1].ver) + def _check_no_progress(self): + if not self._no_progress_exceeded: + if monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold: + self._no_progress_exceeded = True + self._processor._should_call_no_progress_callback = True + @Daemon.ConnectionRetryWrapper( kill_message=( "Killing Neptune asynchronous thread. All data is safe on disk and can be later" @@ -257,16 +304,27 @@ def process_batch(self, batch: List[Operation], version: int) -> None: version_to_ack = version - expected_count while True: # TODO: Handle Metadata errors - processed_count, errors = self._processor._backend.execute_operations( - container_id=self._processor._container_id, - container_type=self._processor._container_type, - operations=batch, - operation_storage=self._processor._operation_storage, - ) + try: + processed_count, errors = self._processor._backend.execute_operations( + container_id=self._processor._container_id, + container_type=self._processor._container_type, + operations=batch, + operation_storage=self._processor._operation_storage, + ) + except Exception as e: + self._check_no_progress() + # Let default retry logic handle this + raise e from e + + self._no_progress_exceeded = False + version_to_ack += processed_count batch = batch[processed_count:] + with self._processor._waiting_cond: self._processor._queue.ack(version_to_ack) + self._processor._last_ack = monotonic() + self._processor._lag_exceeded = False for error in errors: _logger.error( diff --git a/src/neptune/internal/operation_processors/factory.py b/src/neptune/internal/operation_processors/factory.py index d3f5167a5..bf6a38707 100644 --- a/src/neptune/internal/operation_processors/factory.py +++ b/src/neptune/internal/operation_processors/factory.py @@ -17,10 +17,18 @@ __all__ = ["get_operation_processor"] import threading +from typing import ( + Callable, + Optional, +) from neptune.internal.backends.neptune_backend import NeptuneBackend from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import UniqueId +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, +) from neptune.types.mode import Mode from .async_operation_processor import AsyncOperationProcessor @@ -37,14 +45,22 @@ def get_operation_processor( backend: NeptuneBackend, lock: threading.RLock, flush_period: float, + async_lag_callback: Optional[Callable[[], None]] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[Callable[[], None]] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ) -> OperationProcessor: if mode == Mode.ASYNC: return AsyncOperationProcessor( - container_id, - container_type, - backend, - lock, + container_id=container_id, + container_type=container_type, + backend=backend, + lock=lock, sleep_time=flush_period, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, ) elif mode == Mode.SYNC: return SyncOperationProcessor(container_id, container_type, backend) diff --git a/src/neptune/internal/utils/__init__.py b/src/neptune/internal/utils/__init__.py index c19882b63..cc8845f93 100644 --- a/src/neptune/internal/utils/__init__.py +++ b/src/neptune/internal/utils/__init__.py @@ -26,6 +26,7 @@ "is_string_like", "is_stringify_value", "verify_collection_type", + "verify_optional_callable", "is_collection", "base64_encode", "base64_decode", @@ -133,6 +134,11 @@ def verify_collection_type(var_name: str, var, expected_type: Union[type, tuple] verify_type("elements of collection '{}'".format(var_name), value, expected_type) +def verify_optional_callable(var_name: str, var): + if var and not callable(var): + raise TypeError("{} must be a callable (was {})".format(var_name, type(var))) + + def is_collection(var) -> bool: return isinstance(var, (list, set, tuple)) diff --git a/src/neptune/metadata_containers/abstract.py b/src/neptune/metadata_containers/abstract.py index c1263d05e..29a15ba42 100644 --- a/src/neptune/metadata_containers/abstract.py +++ b/src/neptune/metadata_containers/abstract.py @@ -13,13 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["SupportsNamespaces"] +__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"] from abc import ( ABC, abstractmethod, ) -from typing import TYPE_CHECKING +from typing import ( + TYPE_CHECKING, + Callable, + Optional, + Union, +) if TYPE_CHECKING: from neptune.handler import Handler @@ -64,3 +69,12 @@ def __delitem__(self, path) -> None: @abstractmethod def get_root_object(self) -> "SupportsNamespaces": ... + + +class NeptuneObject(SupportsNamespaces, ABC): + @abstractmethod + def stop(self, *, seconds: Optional[Union[float, int]] = None) -> None: + ... + + +NeptuneObjectCallback = Callable[[NeptuneObject], None] diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 1aa03ce10..9ca07ad2b 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -23,7 +23,10 @@ import time import traceback from contextlib import AbstractContextManager -from functools import wraps +from functools import ( + partial, + wraps, +) from typing import ( Any, Dict, @@ -40,6 +43,10 @@ from neptune.common.exceptions import UNIX_STYLES from neptune.common.utils import reset_internal_ssl_state from neptune.common.warnings import warn_about_unsupported_type +from neptune.envs import ( + NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK, + NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK, +) from neptune.exceptions import ( MetadataInconsistency, NeptunePossibleLegacyUsageException, @@ -63,20 +70,31 @@ UniqueId, conform_optional, ) -from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, + DEFAULT_FLUSH_PERIOD, +) from neptune.internal.operation import DeleteAttribute from neptune.internal.operation_processors.factory import get_operation_processor from neptune.internal.operation_processors.operation_processor import OperationProcessor from neptune.internal.state import ContainerState -from neptune.internal.utils import verify_type +from neptune.internal.utils import ( + verify_optional_callable, + verify_type, +) from neptune.internal.utils.logger import logger from neptune.internal.utils.paths import parse_path from neptune.internal.utils.uncaught_exception_handler import instance as uncaught_exception_handler from neptune.internal.value_to_attribute_visitor import ValueToAttributeVisitor -from neptune.metadata_containers.abstract import SupportsNamespaces +from neptune.metadata_containers.abstract import ( + NeptuneObject, + NeptuneObjectCallback, +) from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode from neptune.types.type_casting import cast_value +from neptune.utils import stop_synchronization_callback def ensure_not_stopped(fun): @@ -88,7 +106,7 @@ def inner_fun(self: "MetadataContainer", *args, **kwargs): return inner_fun -class MetadataContainer(AbstractContextManager, SupportsNamespaces): +class MetadataContainer(AbstractContextManager, NeptuneObject): container_type: ContainerType LEGACY_METHODS = set() @@ -101,12 +119,20 @@ def __init__( mode: Mode = Mode.ASYNC, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): verify_type("project", project, (str, type(None))) verify_type("api_token", api_token, (str, type(None))) verify_type("mode", mode, Mode) verify_type("flush_period", flush_period, (int, float)) verify_type("proxies", proxies, (dict, type(None))) + verify_type("async_lag_threshold", async_lag_threshold, (int, float)) + verify_optional_callable("async_lag_callback", async_lag_callback) + verify_type("async_no_progress_threshold", async_no_progress_threshold, (int, float)) + verify_optional_callable("async_no_progress_callback", async_no_progress_callback) self._mode: Mode = mode self._flush_period = flush_period @@ -129,6 +155,17 @@ def __init__( self._workspace: str = self._api_object.workspace self._project_name: str = self._api_object.project_name + self._async_lag_threshold = async_lag_threshold + self._async_lag_callback = self._get_callback( + provided=async_lag_callback, + env_name=NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK, + ) + self._async_no_progress_threshold = async_no_progress_threshold + self._async_no_progress_callback = self._get_callback( + provided=async_no_progress_callback, + env_name=NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK, + ) + self._op_processor: OperationProcessor = get_operation_processor( mode=mode, container_id=self._id, @@ -136,6 +173,12 @@ def __init__( backend=self._backend, lock=self._lock, flush_period=flush_period, + async_lag_callback=partial(self._async_lag_callback, self) if self._async_lag_callback else None, + async_lag_threshold=self._async_lag_threshold, + async_no_progress_callback=partial(self._async_no_progress_callback, self) + if self._async_no_progress_callback + else None, + async_no_progress_threshold=self._async_no_progress_threshold, ) self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() self._structure: ContainerStructure[Attribute, NamespaceAttr] = ContainerStructure(NamespaceBuilder(self)) @@ -166,6 +209,14 @@ def __init__( On Linux it looks like it does not help much but does not break anything either. """ + @staticmethod + def _get_callback(provided: Optional[NeptuneObjectCallback], env_name: str) -> Optional[NeptuneObjectCallback]: + if provided is not None: + return provided + if os.getenv(env_name, "") == "TRUE": + return stop_synchronization_callback + return None + def _handle_fork_in_parent(self): reset_internal_ssl_state() if self._state == ContainerState.STARTED: @@ -187,6 +238,12 @@ def _handle_fork_in_child(self): backend=self._backend, lock=self._lock, flush_period=self._flush_period, + async_lag_callback=partial(self._async_lag_callback, self) if self._async_lag_callback else None, + async_lag_threshold=self._async_lag_threshold, + async_no_progress_callback=partial(self._async_no_progress_callback, self) + if self._async_no_progress_callback + else None, + async_no_progress_threshold=self._async_no_progress_threshold, ) # TODO: Every implementation of background job should handle fork by itself. diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index 20eafa965..69a17bce4 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -43,6 +43,8 @@ from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, DEFAULT_FLUSH_PERIOD, DEFAULT_NAME, OFFLINE_PROJECT_QUALIFIED_NAME, @@ -51,6 +53,7 @@ from neptune.internal.utils import verify_type from neptune.internal.utils.ping_background_job import PingBackgroundJob from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode @@ -71,6 +74,10 @@ def __init__( mode: Optional[str] = None, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): """Initializes a Model object from an existing or new model. @@ -105,6 +112,24 @@ def __init__( (in seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a Model object as the argument and can contain any custom code, such as calling `stop()` on + the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback should take a Model + object as the argument and can contain any custom code, such as calling `stop()` on the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: Model object that is used to manage the model and log metadata to it. @@ -166,7 +191,17 @@ def __init__( if mode == Mode.DEBUG: project = OFFLINE_PROJECT_QUALIFIED_NAME - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: project_workspace = self._project_api_object.workspace diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index 2f43e722a..2ac34c846 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -35,6 +35,8 @@ from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, DEFAULT_FLUSH_PERIOD, DEFAULT_NAME, OFFLINE_PROJECT_QUALIFIED_NAME, @@ -44,6 +46,7 @@ from neptune.internal.utils import verify_type from neptune.internal.utils.ping_background_job import PingBackgroundJob from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.types.mode import Mode from neptune.types.model_version_stage import ModelVersionStage @@ -64,6 +67,10 @@ def __init__( mode: Optional[str] = None, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ) -> None: """Initializes a ModelVersion object from an existing or new model version. @@ -99,6 +106,25 @@ def __init__( (in seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a ModelVersion object as the argument and can contain any custom code, such as calling + `stop()` on the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback should take a + ModelVersion object as the argument and can contain any custom code, such as calling `stop()` on the + object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: ModelVersion object that is used to manage the model version and log metadata to it. @@ -162,7 +188,17 @@ def __init__( if mode == Mode.DEBUG: project = OFFLINE_PROJECT_QUALIFIED_NAME - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: project_workspace = self._project_api_object.workspace diff --git a/src/neptune/metadata_containers/project.py b/src/neptune/metadata_containers/project.py index a6f7605a5..87c914b33 100644 --- a/src/neptune/metadata_containers/project.py +++ b/src/neptune/metadata_containers/project.py @@ -34,7 +34,11 @@ NQLQueryAttribute, ) from neptune.internal.container_type import ContainerType -from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD +from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, + DEFAULT_FLUSH_PERIOD, +) from neptune.internal.state import ContainerState from neptune.internal.utils import ( as_list, @@ -42,6 +46,7 @@ ) from neptune.internal.utils.run_state import RunState from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode @@ -59,6 +64,10 @@ def __init__( mode: Optional[str] = None, flush_period: float = DEFAULT_FLUSH_PERIOD, proxies: Optional[dict] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, ): """Starts a connection to an existing Neptune project. @@ -88,6 +97,25 @@ def __init__( Defaults to 5 (every 5 seconds). proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. For more information about proxies, see the Requests documentation. + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a Project object as the argument and can contain any custom code, such as calling `stop()` + on the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback + should take a Project object as the argument and can contain any custom code, such as calling `stop()` + on the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: Project object that can be used to interact with the project as a whole, @@ -128,7 +156,17 @@ def __init__( if mode == Mode.OFFLINE: raise NeptuneException("Project can't be initialized in OFFLINE mode") - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: return ApiExperiment( diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index 7c2ab1056..6033a3b24 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -56,6 +56,8 @@ from neptune.internal.hardware.hardware_metric_reporting_job import HardwareMetricReportingJob from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( + ASYNC_LAG_THRESHOLD, + ASYNC_NO_PROGRESS_THRESHOLD, DEFAULT_FLUSH_PERIOD, DEFAULT_NAME, OFFLINE_PROJECT_QUALIFIED_NAME, @@ -89,6 +91,7 @@ from neptune.internal.utils.traceback_job import TracebackJob from neptune.internal.websockets.websocket_signals_background_job import WebsocketSignalsBackgroundJob from neptune.metadata_containers import MetadataContainer +from neptune.metadata_containers.abstract import NeptuneObjectCallback from neptune.types import ( GitRef, StringSeries, @@ -147,6 +150,10 @@ def __init__( capture_traceback: bool = True, git_ref: Optional[Union[GitRef, GitRefDisabled, bool]] = None, dependencies: Optional[Union[str, os.PathLike]] = None, + async_lag_callback: Optional[NeptuneObjectCallback] = None, + async_lag_threshold: float = ASYNC_LAG_THRESHOLD, + async_no_progress_callback: Optional[NeptuneObjectCallback] = None, + async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, **kwargs, ): """Starts a new tracked run that logs ML model-building metadata to neptune.ai. @@ -229,6 +236,25 @@ def __init__( dependencies: If you pass `"infer"`, Neptune logs dependencies installed in the current environment. You can also pass a path to your dependency file directly. If left empty, no dependency file is uploaded. + async_lag_callback: Custom callback which is called if the lag between a queued operation and its + synchronization with the server exceeds the duration defined by `async_lag_threshold`. The callback + should take a Run object as the argument and can contain any custom code, such as calling `stop()` on + the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK` environment variable to `TRUE`. + async_lag_threshold: In seconds, duration between the queueing and synchronization of an operation. + If a lag callback (default callback enabled via environment variable or custom callback passed to the + `async_lag_callback` argument) is enabled, the callback is called when this duration is exceeded. + async_no_progress_callback: Custom callback which is called if there has been no synchronization progress + whatsoever for the duration defined by `async_no_progress_threshold`. The callback + should take a Run object as the argument and can contain any custom code, such as calling `stop()` on + the object. + Note: Instead of using this argument, you can use Neptune's default callback by setting the + `NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK` environment variable to `TRUE`. + async_no_progress_threshold: In seconds, for how long there has been no synchronization progress since the + object was initialized. If a no-progress callback (default callback enabled via environment variable or + custom callback passed to the `async_no_progress_callback` argument) is enabled, the callback is called + when this duration is exceeded. Returns: Run object that is used to manage the tracked run and log metadata to it. @@ -382,7 +408,17 @@ def __init__( if mode == Mode.OFFLINE or mode == Mode.DEBUG: project = OFFLINE_PROJECT_QUALIFIED_NAME - super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + super().__init__( + project=project, + api_token=api_token, + mode=mode, + flush_period=flush_period, + proxies=proxies, + async_lag_callback=async_lag_callback, + async_lag_threshold=async_lag_threshold, + async_no_progress_callback=async_no_progress_callback, + async_no_progress_threshold=async_no_progress_threshold, + ) def _get_or_create_api_object(self) -> ApiExperiment: project_workspace = self._project_api_object.workspace diff --git a/src/neptune/typing.py b/src/neptune/typing.py index 267baa88b..d319d4d28 100644 --- a/src/neptune/typing.py +++ b/src/neptune/typing.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = ["SupportsNamespaces"] +__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"] -from neptune.metadata_containers.abstract import SupportsNamespaces +from neptune.metadata_containers.abstract import ( + NeptuneObject, + NeptuneObjectCallback, + SupportsNamespaces, +) diff --git a/src/neptune/utils.py b/src/neptune/utils.py index 198e262f0..298a5b41e 100644 --- a/src/neptune/utils.py +++ b/src/neptune/utils.py @@ -14,7 +14,7 @@ # limitations under the License. # """Utility functions to support ML metadata logging with neptune.ai.""" -__all__ = ["stringify_unsupported"] +__all__ = ["stringify_unsupported", "stop_synchronization_callback"] from typing import ( Any, @@ -23,7 +23,10 @@ Union, ) +from neptune.internal.init.parameters import DEFAULT_STOP_TIMEOUT from neptune.internal.types.stringify_value import StringifyValue +from neptune.internal.utils.logger import logger +from neptune.typing import NeptuneObject def stringify_unsupported(value: Any) -> Union[StringifyValue, Mapping]: @@ -48,3 +51,25 @@ def stringify_unsupported(value: Any) -> Union[StringifyValue, Mapping]: return {str(k): stringify_unsupported(v) for k, v in value.items()} return StringifyValue(value=value) + + +def stop_synchronization_callback(neptune_object: NeptuneObject) -> None: + """Default callback function that stops a Neptune object's synchronization with the server. + + Args: + neptune_object: A Neptune object (Run, Model, ModelVersion, or Project) to be stopped. + + Example: + >>> import neptune + >>> from neptune.utils import stop_synchronization_callback + >>> run = neptune.init_run( + ... async_no_progress_callback = stop_synchronization_callback + ... ) + + For more information, see: + https://docs.neptune.ai/api/utils/stop_synchronization_callback/ + """ + logger.error( + "Threshold for disrupted synchronization exceeded. Stopping the synchronization using the default callback." + ) + neptune_object.stop(seconds=DEFAULT_STOP_TIMEOUT)