Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for no synchronization callbacks #1478

Merged
merged 27 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9b111e2
Added NeptuneObject interface and stop synchronization util
Raalsky Sep 29, 2023
3c642a3
Parameters added to neptune objects
Raalsky Sep 29, 2023
e96eb9f
Changelog updated
Raalsky Sep 29, 2023
bf65d14
Fixed imports
Raalsky Sep 29, 2023
9a7ceb5
add arg descriptions
normandy7 Sep 29, 2023
45543ba
tweak stop_synchronization_callback docstring
normandy7 Sep 29, 2023
887dff5
tweak callback error message
normandy7 Sep 29, 2023
9d1155d
tweak changelog entry
normandy7 Sep 29, 2023
128159d
Support for no progress callback
Raalsky Oct 2, 2023
020f33a
Merge branch 'rj/async-callbacks' of github.com:neptune-ai/neptune-cl…
Raalsky Oct 2, 2023
f1b8afa
Added support for lag callback
Raalsky Oct 2, 2023
572ee61
Auto review
Raalsky Oct 2, 2023
df28866
Auto review
Raalsky Oct 2, 2023
4c4a2c1
Auto review 2
Raalsky Oct 2, 2023
7543e37
Auto review 3
Raalsky Oct 2, 2023
1df03d7
Auto review 4
Raalsky Oct 2, 2023
4bcdcb2
Fixes
Raalsky Oct 2, 2023
d63465f
Auto review 5
Raalsky Oct 2, 2023
aaa20b7
Auto review 6
Raalsky Oct 2, 2023
0d80cd0
Auto review 7
Raalsky Oct 2, 2023
b9709d9
clarify arg descrptions
normandy7 Oct 2, 2023
8f95267
Fixes
Raalsky Oct 2, 2023
a5e69ea
Merge branch 'rj/async-callbacks' of github.com:neptune-ai/neptune-cl…
Raalsky Oct 2, 2023
cfcdce2
Code review
Raalsky Oct 2, 2023
5961f7d
Merge branch 'master' into rj/async-callbacks
Raalsky Oct 2, 2023
94881c6
Code review
Raalsky Oct 2, 2023
69276c8
Merge branch 'rj/async-callbacks' of github.com:neptune-ai/neptune-cl…
Raalsky Oct 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ stream.bin

# zenml e2e
.zen

# mocks
MagicMock/
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## [UNRELEASED] neptune 1.7.1
## [UNRELEASED] neptune 1.8.0

### Features
- Added support for callbacks that stop the synchronization if the lag or lack of progress exceeds a certain threshold ([#1478](https://github.com/neptune-ai/neptune-client/pull/1478))

### Fixes
- Add newline at the end of generated `.patch` while tracking uncommitted changes ([#1473](https://github.com/neptune-ai/neptune-client/pull/1473))
Expand Down
6 changes: 6 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
"NEPTUNE_FETCH_TABLE_STEP_SIZE",
"NEPTUNE_SYNC_AFTER_STOP_TIMEOUT",
"NEPTUNE_REQUEST_TIMEOUT",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK",
"NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK",
]

from neptune.common.envs import (
Expand Down Expand Up @@ -59,4 +61,8 @@

NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT"

NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK"

NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK"

S3_ENDPOINT_URL = "S3_ENDPOINT_URL"
12 changes: 11 additions & 1 deletion src/neptune/internal/init/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["DEFAULT_FLUSH_PERIOD", "DEFAULT_NAME", "OFFLINE_PROJECT_QUALIFIED_NAME"]
__all__ = [
"DEFAULT_FLUSH_PERIOD",
"DEFAULT_NAME",
"OFFLINE_PROJECT_QUALIFIED_NAME",
"ASYNC_LAG_THRESHOLD",
"ASYNC_NO_PROGRESS_THRESHOLD",
"DEFAULT_STOP_TIMEOUT",
]

DEFAULT_FLUSH_PERIOD = 5
DEFAULT_NAME = "Untitled"
OFFLINE_PROJECT_QUALIFIED_NAME = "offline/project-placeholder"
ASYNC_LAG_THRESHOLD = 1800.0
ASYNC_NO_PROGRESS_THRESHOLD = 300.0
DEFAULT_STOP_TIMEOUT = 60.0
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
time,
)
from typing import (
Callable,
List,
Optional,
)
Expand All @@ -36,6 +37,11 @@
from neptune.internal.container_type import ContainerType
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
DEFAULT_STOP_TIMEOUT,
)
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
from neptune.internal.operation_processors.operation_storage import (
Expand All @@ -50,7 +56,7 @@

class AsyncOperationProcessor(OperationProcessor):
STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300"))
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, DEFAULT_STOP_TIMEOUT))

def __init__(
self,
Expand All @@ -60,6 +66,10 @@ def __init__(
lock: threading.RLock,
sleep_time: float = 5,
batch_size: int = 1000,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
):
self._operation_storage = OperationStorage(self._init_data_path(container_id, container_type))

Expand All @@ -74,9 +84,17 @@ def __init__(
self._container_type = container_type
self._backend = backend
self._batch_size = batch_size
self._async_lag_callback = async_lag_callback or (lambda: None)
self._async_lag_threshold = async_lag_threshold
self._async_no_progress_callback = async_no_progress_callback or (lambda: None)
self._async_no_progress_threshold = async_no_progress_threshold
self._last_version = 0
self._consumed_version = 0
self._consumer = self.ConsumerThread(self, sleep_time, batch_size)
self._lock = lock
self._last_ack = None
self._lag_exceeded = False
self._should_call_no_progress_callback = False

# Caller is responsible for taking this lock
self._waiting_cond = threading.Condition(lock=lock)
Expand All @@ -89,6 +107,10 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._last_version = self._queue.put(op)

self._check_lag()
self._check_no_progress()

if self._queue.size() > self._batch_size / 2:
self._consumer.wake_up()
if wait:
Expand All @@ -107,6 +129,20 @@ def wait(self):
if not self._consumer.is_running():
raise NeptuneSynchronizationAlreadyStoppedException()

def _check_lag(self):
if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here a de-dent?

        if (
            self._lag_exceeded or 
            not self._last_ack or 
            monotonic() - self._last_ack <= self._async_lag_threshold
        ):
            return
        
        with self._lock:
            if not self._lag_exceeded:
                self._async_no_progress_callback()
                self._lag_exceeded = True

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you wish 😉

with self._lock:
if not self._lag_exceeded:
self._async_no_progress_callback()
self._lag_exceeded = True

def _check_no_progress(self):
if self._should_call_no_progress_callback:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rewrite it slightly so that it's less indented?

if not self._should_call_no_progress_callback:
    return
with self._lock:
    if self._should_call_no_progress_callback:
        self._async_no_progress_callback()
        self._should_call_no_progress_callback = False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is it necessary to check this condition twice?
if self._should_call_no_progress_callback

Copy link
Contributor Author

@Raalsky Raalsky Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common pattern as lock acquiring is heavy operation: https://en.wikipedia.org/wiki/Double-checked_locking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

with self._lock:
if self._should_call_no_progress_callback:
self._async_no_progress_callback()
self._should_call_no_progress_callback = False

def flush(self):
self._queue.flush()

Expand Down Expand Up @@ -225,6 +261,7 @@ def __init__(
self._processor = processor
self._batch_size = batch_size
self._last_flush = 0
self._no_progress_exceeded = False

def run(self):
try:
Expand All @@ -246,6 +283,12 @@ def work(self) -> None:
return
self.process_batch([element.obj for element in batch], batch[-1].ver)

def _check_no_progress(self):
if not self._no_progress_exceeded:
if monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold:
self._no_progress_exceeded = True
self._processor._should_call_no_progress_callback = True

@Daemon.ConnectionRetryWrapper(
kill_message=(
"Killing Neptune asynchronous thread. All data is safe on disk and can be later"
Expand All @@ -257,16 +300,27 @@ def process_batch(self, batch: List[Operation], version: int) -> None:
version_to_ack = version - expected_count
while True:
# TODO: Handle Metadata errors
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
try:
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
except Exception as e:
self._check_no_progress()
# Let default retry logic handle this
raise e from e

self._no_progress_exceeded = False

version_to_ack += processed_count
batch = batch[processed_count:]

with self._processor._waiting_cond:
self._processor._queue.ack(version_to_ack)
self._processor._last_ack = monotonic()
self._processor._lag_exceeded = False

for error in errors:
_logger.error(
Expand Down
24 changes: 20 additions & 4 deletions src/neptune/internal/operation_processors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
__all__ = ["get_operation_processor"]

import threading
from typing import (
Callable,
Optional,
)

from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
)
from neptune.types.mode import Mode

from .async_operation_processor import AsyncOperationProcessor
Expand All @@ -37,14 +45,22 @@ def get_operation_processor(
backend: NeptuneBackend,
lock: threading.RLock,
flush_period: float,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
) -> OperationProcessor:
if mode == Mode.ASYNC:
return AsyncOperationProcessor(
container_id,
container_type,
backend,
lock,
container_id=container_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=flush_period,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
)
elif mode == Mode.SYNC:
return SyncOperationProcessor(container_id, container_type, backend)
Expand Down
6 changes: 6 additions & 0 deletions src/neptune/internal/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"is_string_like",
"is_stringify_value",
"verify_collection_type",
"verify_optional_callable",
"is_collection",
"base64_encode",
"base64_decode",
Expand Down Expand Up @@ -133,6 +134,11 @@ def verify_collection_type(var_name: str, var, expected_type: Union[type, tuple]
verify_type("elements of collection '{}'".format(var_name), value, expected_type)


def verify_optional_callable(var_name: str, var):
if var and not callable(var):
raise TypeError("{} must be a callable (was {})".format(var_name, type(var)))


def is_collection(var) -> bool:
return isinstance(var, (list, set, tuple))

Expand Down
18 changes: 16 additions & 2 deletions src/neptune/metadata_containers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__all__ = ["SupportsNamespaces"]
__all__ = ["SupportsNamespaces", "NeptuneObject", "NeptuneObjectCallback"]

from abc import (
ABC,
abstractmethod,
)
from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
Callable,
Optional,
Union,
)

if TYPE_CHECKING:
from neptune.handler import Handler
Expand Down Expand Up @@ -64,3 +69,12 @@ def __delitem__(self, path) -> None:
@abstractmethod
def get_root_object(self) -> "SupportsNamespaces":
...


class NeptuneObject(SupportsNamespaces, ABC):
@abstractmethod
def stop(self, *, seconds: Optional[Union[float, int]] = None) -> None:
...


NeptuneObjectCallback = Callable[[NeptuneObject], None]