Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for no synchronization callbacks #1478

Merged
merged 27 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9b111e2
Added NeptuneObject interface and stop synchronization util
Raalsky Sep 29, 2023
3c642a3
Parameters added to neptune objects
Raalsky Sep 29, 2023
e96eb9f
Changelog updated
Raalsky Sep 29, 2023
bf65d14
Fixed imports
Raalsky Sep 29, 2023
9a7ceb5
add arg descriptions
normandy7 Sep 29, 2023
45543ba
tweak stop_synchronization_callback docstring
normandy7 Sep 29, 2023
887dff5
tweak callback error message
normandy7 Sep 29, 2023
9d1155d
tweak changelog entry
normandy7 Sep 29, 2023
128159d
Support for no progress callback
Raalsky Oct 2, 2023
020f33a
Merge branch 'rj/async-callbacks' of github.com:neptune-ai/neptune-cl…
Raalsky Oct 2, 2023
f1b8afa
Added support for lag callback
Raalsky Oct 2, 2023
572ee61
Auto review
Raalsky Oct 2, 2023
df28866
Auto review
Raalsky Oct 2, 2023
4c4a2c1
Auto review 2
Raalsky Oct 2, 2023
7543e37
Auto review 3
Raalsky Oct 2, 2023
1df03d7
Auto review 4
Raalsky Oct 2, 2023
4bcdcb2
Fixes
Raalsky Oct 2, 2023
d63465f
Auto review 5
Raalsky Oct 2, 2023
aaa20b7
Auto review 6
Raalsky Oct 2, 2023
0d80cd0
Auto review 7
Raalsky Oct 2, 2023
b9709d9
clarify arg descrptions
normandy7 Oct 2, 2023
8f95267
Fixes
Raalsky Oct 2, 2023
a5e69ea
Merge branch 'rj/async-callbacks' of github.com:neptune-ai/neptune-cl…
Raalsky Oct 2, 2023
cfcdce2
Code review
Raalsky Oct 2, 2023
5961f7d
Merge branch 'master' into rj/async-callbacks
Raalsky Oct 2, 2023
94881c6
Code review
Raalsky Oct 2, 2023
69276c8
Merge branch 'rj/async-callbacks' of github.com:neptune-ai/neptune-cl…
Raalsky Oct 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ stream.bin

# zenml e2e
.zen

# mocks
MagicMock/
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## [UNRELEASED] neptune 1.7.1
## [UNRELEASED] neptune 1.8.0

### Features
- Added support for callbacks that stop the synchronization if the lag or lack of progress exceeds a certain threshold ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478))

### Fixes
- Add newline at the end of generated `.patch` while tracking uncommitted changes ([#1473](https://github.com/neptune-ai/neptune-client/pull/1473))
Expand Down
6 changes: 6 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
"NEPTUNE_FETCH_TABLE_STEP_SIZE",
"NEPTUNE_SYNC_AFTER_STOP_TIMEOUT",
"NEPTUNE_REQUEST_TIMEOUT",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK",
]

from neptune.common.envs import (
Expand Down Expand Up @@ -59,4 +61,8 @@

NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT"

NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK"

NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK"

S3_ENDPOINT_URL = "S3_ENDPOINT_URL"
12 changes: 11 additions & 1 deletion src/neptune/internal/init/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["DEFAULT_FLUSH_PERIOD", "DEFAULT_NAME", "OFFLINE_PROJECT_QUALIFIED_NAME"]
__all__ = [
"DEFAULT_FLUSH_PERIOD",
"DEFAULT_NAME",
"OFFLINE_PROJECT_QUALIFIED_NAME",
"ASYNC_LAG_THRESHOLD",
"ASYNC_NO_PROGRESS_THRESHOLD",
"DEFAULT_STOP_TIMEOUT",
]

DEFAULT_FLUSH_PERIOD = 5
DEFAULT_NAME = "Untitled"
OFFLINE_PROJECT_QUALIFIED_NAME = "offline/project-placeholder"
ASYNC_LAG_THRESHOLD = 1800.0
ASYNC_NO_PROGRESS_THRESHOLD = 300.0
DEFAULT_STOP_TIMEOUT = 60.0
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
time,
)
from typing import (
Callable,
List,
Optional,
)
Expand All @@ -36,6 +37,11 @@
from neptune.internal.container_type import ContainerType
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
DEFAULT_STOP_TIMEOUT,
)
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import (
Expand All @@ -50,7 +56,7 @@

class AsyncOperationProcessor(OperationProcessor):
STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300"))
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, DEFAULT_STOP_TIMEOUT))

def __init__(
self,
Expand All @@ -60,6 +66,10 @@ def __init__(
lock: threading.RLock,
sleep_time: float = 5,
batch_size: int = 1000,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
):
self._operation_storage = OperationStorage(self._init_data_path(container_id, container_type))

Expand All @@ -74,9 +84,18 @@ def __init__(
self._container_type = container_type
self._backend = backend
self._batch_size = batch_size
self._async_lag_callback = async_lag_callback
self._async_lag_threshold = async_lag_threshold
self._async_no_progress_callback = async_no_progress_callback
self._async_no_progress_threshold = async_no_progress_threshold
self._last_version = 0
self._consumed_version = 0
self._consumer = self.ConsumerThread(self, sleep_time, batch_size)
self._lock = lock
self._last_ack = None
self._lag_exceeded = False
self._should_call_lag_callback = False
self._should_call_no_progress_callback = False

# Caller is responsible for taking this lock
self._waiting_cond = threading.Condition(lock=lock)
Expand All @@ -89,8 +108,18 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._last_version = self._queue.put(op)

if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold:
with self._lock:
self._lag_exceeded = True
if self._async_no_progress_callback:
self._async_no_progress_callback()

if self._queue.size() > self._batch_size / 2:
self._consumer.wake_up()

self._check_for_callbacks()

if wait:
self.wait()

Expand All @@ -107,6 +136,13 @@ def wait(self):
if not self._consumer.is_running():
raise NeptuneSynchronizationAlreadyStoppedException()

def _check_for_callbacks(self):
if self._should_call_no_progress_callback:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rewrite it slightly so that it's less indented?

if not self._should_call_no_progress_callback:
    return
with self._lock:
    if self._should_call_no_progress_callback:
        self._async_no_progress_callback()
        self._should_call_no_progress_callback = False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is it necessary to check this condition twice?
if self._should_call_no_progress_callback

Copy link
Contributor Author

@Raalsky Raalsky Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common pattern as lock acquiring is heavy operation: https://en.wikipedia.org/wiki/Double-checked_locking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

with self._lock:
self._should_call_no_progress_callback = False
if self._async_no_progress_callback:
self._async_no_progress_callback()

def flush(self):
self._queue.flush()

Expand Down Expand Up @@ -225,6 +261,8 @@ def __init__(
self._processor = processor
self._batch_size = batch_size
self._last_flush = 0
self._last_ack = 0
self._no_progress_exceeded = False

def run(self):
try:
Expand All @@ -246,6 +284,13 @@ def work(self) -> None:
return
self.process_batch([element.obj for element in batch], batch[-1].ver)

def _check_for_network_interruptions_callbacks(self):
if not self._no_progress_exceeded and self._processor._async_no_progress_callback:
if monotonic() - self._last_ack > self._processor._async_no_progress_threshold:
self._no_progress_exceeded = True
with self._processor._lock:
self._processor._should_call_no_progress_callback = True

@Daemon.ConnectionRetryWrapper(
kill_message=(
"Killing Neptune asynchronous thread. All data is safe on disk and can be later"
Expand All @@ -257,14 +302,26 @@ def process_batch(self, batch: List[Operation], version: int) -> None:
version_to_ack = version - expected_count
while True:
# TODO: Handle Metadata errors
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
try:
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
except Exception as e:
self._check_for_network_interruptions_callbacks()
raise e from e

version_to_ack += processed_count
batch = batch[processed_count:]

if processed_count > 0:
self._last_ack = monotonic()
self._no_progress_exceeded = False
else:
self._check_for_network_interruptions_callbacks()

with self._processor._waiting_cond:
self._processor._queue.ack(version_to_ack)

Expand All @@ -275,6 +332,8 @@ def process_batch(self, batch: List[Operation], version: int) -> None:
)

self._processor._consumed_version = version_to_ack
self._processor._last_ack = monotonic()
self._processor._lag_exceeded = False

if version_to_ack == version:
self._processor._waiting_cond.notify_all()
Expand Down
24 changes: 20 additions & 4 deletions src/neptune/internal/operation_processors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
__all__ = ["get_operation_processor"]

import threading
from typing import (
Callable,
Optional,
)

from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
)
from neptune.types.mode import Mode

from .async_operation_processor import AsyncOperationProcessor
Expand All @@ -37,14 +45,22 @@ def get_operation_processor(
backend: NeptuneBackend,
lock: threading.RLock,
flush_period: float,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
) -> OperationProcessor:
if mode == Mode.ASYNC:
return AsyncOperationProcessor(
container_id,
container_type,
backend,
lock,
container_id=container_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=flush_period,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
)
elif mode == Mode.SYNC:
return SyncOperationProcessor(container_id, container_type, backend)
Expand Down
6 changes: 6 additions & 0 deletions src/neptune/internal/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"is_string_like",
"is_stringify_value",
"verify_collection_type",
"verify_optional_callable",
"is_collection",
"base64_encode",
"base64_decode",
Expand Down Expand Up @@ -133,6 +134,11 @@ def verify_collection_type(var_name: str, var, expected_type: Union[type, tuple]
verify_type("elements of collection '{}'".format(var_name), value, expected_type)


def verify_optional_callable(var_name: str, var):
if var and not callable(var):
raise TypeError("{} must be a callable (was {})".format(var_name, type(var)))


def is_collection(var) -> bool:
return isinstance(var, (list, set, tuple))

Expand Down
18 changes: 16 additions & 2 deletions src/neptune/metadata_containers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["SupportsNamespaces"]
__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"]

from abc import (
ABC,
abstractmethod,
)
from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
Callable,
Optional,
Union,
)

if TYPE_CHECKING:
from neptune.handler import Handler
Expand Down Expand Up @@ -64,3 +69,12 @@ def __delitem__(self, path) -> None:
@abstractmethod
def get_root_object(self) -> "SupportsNamespaces":
...


class NeptuneObject(SupportsNamespaces, ABC):
@abstractmethod
def stop(self, *, seconds: Optional[Union[float, int]] = None) -> None:
...


NeptuneObjectCallback = Callable[[NeptuneObject], None]