-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for no synchronization callbacks #1478
Changes from 20 commits
9b111e2
3c642a3
e96eb9f
bf65d14
9a7ceb5
45543ba
887dff5
9d1155d
128159d
020f33a
f1b8afa
572ee61
df28866
4c4a2c1
7543e37
1df03d7
4bcdcb2
d63465f
aaa20b7
0d80cd0
b9709d9
8f95267
a5e69ea
cfcdce2
5961f7d
94881c6
69276c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -130,3 +130,6 @@ stream.bin | |
|
||
# zenml e2e | ||
.zen | ||
|
||
# mocks | ||
MagicMock/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
time, | ||
) | ||
from typing import ( | ||
Callable, | ||
List, | ||
Optional, | ||
) | ||
|
@@ -36,6 +37,11 @@ | |
from neptune.internal.container_type import ContainerType | ||
from neptune.internal.disk_queue import DiskQueue | ||
from neptune.internal.id_formats import UniqueId | ||
from neptune.internal.init.parameters import ( | ||
ASYNC_LAG_THRESHOLD, | ||
ASYNC_NO_PROGRESS_THRESHOLD, | ||
DEFAULT_STOP_TIMEOUT, | ||
) | ||
from neptune.internal.operation import Operation | ||
from neptune.internal.operation_processors.operation_processor import OperationProcessor | ||
from neptune.internal.operation_processors.operation_storage import ( | ||
|
@@ -50,7 +56,7 @@ | |
|
||
class AsyncOperationProcessor(OperationProcessor): | ||
STOP_QUEUE_STATUS_UPDATE_FREQ_SECONDS = 30 | ||
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, "300")) | ||
STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS = int(os.getenv(NEPTUNE_SYNC_AFTER_STOP_TIMEOUT, DEFAULT_STOP_TIMEOUT)) | ||
|
||
def __init__( | ||
self, | ||
|
@@ -60,6 +66,10 @@ def __init__( | |
lock: threading.RLock, | ||
sleep_time: float = 5, | ||
batch_size: int = 1000, | ||
async_lag_callback: Optional[Callable[[], None]] = None, | ||
async_lag_threshold: float = ASYNC_LAG_THRESHOLD, | ||
async_no_progress_callback: Optional[Callable[[], None]] = None, | ||
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, | ||
): | ||
self._operation_storage = OperationStorage(self._init_data_path(container_id, container_type)) | ||
|
||
|
@@ -74,9 +84,17 @@ def __init__( | |
self._container_type = container_type | ||
self._backend = backend | ||
self._batch_size = batch_size | ||
self._async_lag_callback = async_lag_callback or (lambda: None) | ||
self._async_lag_threshold = async_lag_threshold | ||
self._async_no_progress_callback = async_no_progress_callback or (lambda: None) | ||
self._async_no_progress_threshold = async_no_progress_threshold | ||
self._last_version = 0 | ||
self._consumed_version = 0 | ||
self._consumer = self.ConsumerThread(self, sleep_time, batch_size) | ||
self._lock = lock | ||
self._last_ack = None | ||
self._lag_exceeded = False | ||
self._should_call_no_progress_callback = False | ||
|
||
# Caller is responsible for taking this lock | ||
self._waiting_cond = threading.Condition(lock=lock) | ||
|
@@ -88,6 +106,9 @@ def _init_data_path(container_id: UniqueId, container_type: ContainerType) -> Pa | |
return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, process_path) | ||
|
||
def enqueue_operation(self, op: Operation, *, wait: bool) -> None: | ||
self._check_lag() | ||
self._check_no_progress() | ||
|
||
self._last_version = self._queue.put(op) | ||
if self._queue.size() > self._batch_size / 2: | ||
self._consumer.wake_up() | ||
|
@@ -107,6 +128,20 @@ def wait(self): | |
if not self._consumer.is_running(): | ||
raise NeptuneSynchronizationAlreadyStoppedException() | ||
|
||
def _check_lag(self): | ||
if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: | ||
with self._lock: | ||
if not self._lag_exceeded: | ||
self._async_no_progress_callback() | ||
self._lag_exceeded = True | ||
|
||
def _check_no_progress(self): | ||
if self._should_call_no_progress_callback: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we rewrite it slightly so that it's less indented?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, is it necessary to check this condition twice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is common pattern as lock acquiring is heavy operation: https://en.wikipedia.org/wiki/Double-checked_locking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
with self._lock: | ||
if self._should_call_no_progress_callback: | ||
self._async_no_progress_callback() | ||
self._should_call_no_progress_callback = False | ||
|
||
def flush(self): | ||
self._queue.flush() | ||
|
||
|
@@ -225,6 +260,7 @@ def __init__( | |
self._processor = processor | ||
self._batch_size = batch_size | ||
self._last_flush = 0 | ||
self._no_progress_exceeded = False | ||
|
||
def run(self): | ||
try: | ||
|
@@ -246,6 +282,12 @@ def work(self) -> None: | |
return | ||
self.process_batch([element.obj for element in batch], batch[-1].ver) | ||
|
||
def _check_no_progress(self): | ||
if not self._no_progress_exceeded: | ||
if monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold: | ||
self._no_progress_exceeded = True | ||
self._processor._should_call_no_progress_callback = True | ||
|
||
@Daemon.ConnectionRetryWrapper( | ||
kill_message=( | ||
"Killing Neptune asynchronous thread. All data is safe on disk and can be later" | ||
|
@@ -257,16 +299,27 @@ def process_batch(self, batch: List[Operation], version: int) -> None: | |
version_to_ack = version - expected_count | ||
while True: | ||
# TODO: Handle Metadata errors | ||
processed_count, errors = self._processor._backend.execute_operations( | ||
container_id=self._processor._container_id, | ||
container_type=self._processor._container_type, | ||
operations=batch, | ||
operation_storage=self._processor._operation_storage, | ||
) | ||
try: | ||
processed_count, errors = self._processor._backend.execute_operations( | ||
container_id=self._processor._container_id, | ||
container_type=self._processor._container_type, | ||
operations=batch, | ||
operation_storage=self._processor._operation_storage, | ||
) | ||
except Exception as e: | ||
self._check_no_progress() | ||
# Let default retry logic handle this | ||
raise e from e | ||
|
||
self._no_progress_exceeded = False | ||
|
||
version_to_ack += processed_count | ||
batch = batch[processed_count:] | ||
|
||
with self._processor._waiting_cond: | ||
self._processor._queue.ack(version_to_ack) | ||
self._processor._last_ack = monotonic() | ||
self._processor._lag_exceeded = False | ||
|
||
for error in errors: | ||
_logger.error( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here a de-dent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you wish 😉