Skip to content

Commit

Permalink
Added support for no synchronization callbacks (#1478)
Browse files Browse the repository at this point in the history
Co-authored-by: Sabine <sabine.nyholm@neptune.ai>
  • Loading branch information
Raalsky and normandy7 committed Oct 3, 2023
1 parent cbb85bf commit 463ce96
Show file tree
Hide file tree
Showing 15 changed files with 374 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ stream.bin

# zenml e2e
.zen

# mocks
MagicMock/
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## [UNRELEASED] neptune 1.8.0

### Features
- Added support for callbacks that stop the synchronization if the lag or lack of progress exceeds a certain threshold ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478))

### Fixes
- Add newline at the end of generated `.patch` while tracking uncommitted changes ([#1473](https://github.com/neptune-ai/neptune-client/pull/1473))
- Clarify `NeptuneLimitExceedException` error message ([#1480](https://github.com/neptune-ai/neptune-client/pull/1480))
Expand Down
6 changes: 6 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
"NEPTUNE_FETCH_TABLE_STEP_SIZE",
"NEPTUNE_SYNC_AFTER_STOP_TIMEOUT",
"NEPTUNE_REQUEST_TIMEOUT",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK",
]

from neptune.common.envs import (
Expand Down Expand Up @@ -59,4 +61,8 @@

NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT"

NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK"

NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK"

S3_ENDPOINT_URL = "S3_ENDPOINT_URL"
12 changes: 11 additions & 1 deletion src/neptune/internal/init/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["DEFAULT_FLUSH_PERIOD", "DEFAULT_NAME", "OFFLINE_PROJECT_QUALIFIED_NAME"]
__all__ = [
"DEFAULT_FLUSH_PERIOD",
"DEFAULT_NAME",
"OFFLINE_PROJECT_QUALIFIED_NAME",
"ASYNC_LAG_THRESHOLD",
"ASYNC_NO_PROGRESS_THRESHOLD",
"DEFAULT_STOP_TIMEOUT",
]

DEFAULT_FLUSH_PERIOD = 5
DEFAULT_NAME = "Untitled"
OFFLINE_PROJECT_QUALIFIED_NAME = "offline/project-placeholder"
ASYNC_LAG_THRESHOLD = 1800.0
ASYNC_NO_PROGRESS_THRESHOLD = 300.0
DEFAULT_STOP_TIMEOUT = 60.0
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
time,
)
from typing import (
Callable,
List,
Optional,
)
Expand All @@ -36,6 +37,11 @@
from neptune.internal.container_type import ContainerType
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
DEFAULT_STOP_TIMEOUT,
)
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import (
Expand All @@ -50,7 +56,7 @@

class AsyncOperationProcessor(OperationProcessor):
STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300"))
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, DEFAULT_STOP_TIMEOUT))

def __init__(
self,
Expand All @@ -60,6 +66,10 @@ def __init__(
lock: threading.RLock,
sleep_time: float = 5,
batch_size: int = 1000,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
):
self._operation_storage = OperationStorage(self._init_data_path(container_id, container_type))

Expand All @@ -74,9 +84,17 @@ def __init__(
self._container_type = container_type
self._backend = backend
self._batch_size = batch_size
self._async_lag_callback = async_lag_callback or (lambda: None)
self._async_lag_threshold = async_lag_threshold
self._async_no_progress_callback = async_no_progress_callback or (lambda: None)
self._async_no_progress_threshold = async_no_progress_threshold
self._last_version = 0
self._consumed_version = 0
self._consumer = self.ConsumerThread(self, sleep_time, batch_size)
self._lock = lock
self._last_ack = None
self._lag_exceeded = False
self._should_call_no_progress_callback = False

# Caller is responsible for taking this lock
self._waiting_cond = threading.Condition(lock=lock)
Expand All @@ -89,6 +107,10 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._last_version = self._queue.put(op)

self._check_lag()
self._check_no_progress()

if self._queue.size() > self._batch_size / 2:
self._consumer.wake_up()
if wait:
Expand All @@ -107,6 +129,24 @@ def wait(self):
if not self._consumer.is_running():
raise NeptuneSynchronizationAlreadyStoppedException()

def _check_lag(self):
if self._lag_exceeded or not self._last_ack or monotonic() - self._last_ack <= self._async_lag_threshold:
return

with self._lock:
if not self._lag_exceeded:
self._async_no_progress_callback()
self._lag_exceeded = True

def _check_no_progress(self):
if not self._should_call_no_progress_callback:
return

with self._lock:
if self._should_call_no_progress_callback:
self._async_no_progress_callback()
self._should_call_no_progress_callback = False

def flush(self):
self._queue.flush()

Expand Down Expand Up @@ -225,6 +265,7 @@ def __init__(
self._processor = processor
self._batch_size = batch_size
self._last_flush = 0
self._no_progress_exceeded = False

def run(self):
try:
Expand All @@ -246,6 +287,12 @@ def work(self) -> None:
return
self.process_batch([element.obj for element in batch], batch[-1].ver)

def _check_no_progress(self):
if not self._no_progress_exceeded:
if monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold:
self._no_progress_exceeded = True
self._processor._should_call_no_progress_callback = True

@Daemon.ConnectionRetryWrapper(
kill_message=(
"Killing Neptune asynchronous thread. All data is safe on disk and can be later"
Expand All @@ -257,16 +304,27 @@ def process_batch(self, batch: List[Operation], version: int) -> None:
version_to_ack = version - expected_count
while True:
# TODO: Handle Metadata errors
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
try:
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
except Exception as e:
self._check_no_progress()
# Let default retry logic handle this
raise e from e

self._no_progress_exceeded = False

version_to_ack += processed_count
batch = batch[processed_count:]

with self._processor._waiting_cond:
self._processor._queue.ack(version_to_ack)
self._processor._last_ack = monotonic()
self._processor._lag_exceeded = False

for error in errors:
_logger.error(
Expand Down
24 changes: 20 additions & 4 deletions src/neptune/internal/operation_processors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
__all__ = ["get_operation_processor"]

import threading
from typing import (
Callable,
Optional,
)

from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
)
from neptune.types.mode import Mode

from .async_operation_processor import AsyncOperationProcessor
Expand All @@ -37,14 +45,22 @@ def get_operation_processor(
backend: NeptuneBackend,
lock: threading.RLock,
flush_period: float,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
) -> OperationProcessor:
if mode == Mode.ASYNC:
return AsyncOperationProcessor(
container_id,
container_type,
backend,
lock,
container_id=container_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=flush_period,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
)
elif mode == Mode.SYNC:
return SyncOperationProcessor(container_id, container_type, backend)
Expand Down
6 changes: 6 additions & 0 deletions src/neptune/internal/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"is_string_like",
"is_stringify_value",
"verify_collection_type",
"verify_optional_callable",
"is_collection",
"base64_encode",
"base64_decode",
Expand Down Expand Up @@ -133,6 +134,11 @@ def verify_collection_type(var_name: str, var, expected_type: Union[type, tuple]
verify_type("elements of collection '{}'".format(var_name), value, expected_type)


def verify_optional_callable(var_name: str, var):
if var and not callable(var):
raise TypeError("{} must be a callable (was {})".format(var_name, type(var)))


def is_collection(var) -> bool:
return isinstance(var, (list, set, tuple))

Expand Down
18 changes: 16 additions & 2 deletions src/neptune/metadata_containers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["SupportsNamespaces"]
__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"]

from abc import (
ABC,
abstractmethod,
)
from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
Callable,
Optional,
Union,
)

if TYPE_CHECKING:
from neptune.handler import Handler
Expand Down Expand Up @@ -64,3 +69,12 @@ def __delitem__(self, path) -> None:
@abstractmethod
def get_root_object(self) -> "SupportsNamespaces":
...


class NeptuneObject(SupportsNamespaces, ABC):
@abstractmethod
def stop(self, *, seconds: Optional[Union[float, int]] = None) -> None:
...


NeptuneObjectCallback = Callable[[NeptuneObject], None]

0 comments on commit 463ce96

Please sign in to comment.