From 36d221f6b71f003877bb5fda1a00963a8f380966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Nie=C5=BCurawski?= Date: Fri, 16 Jul 2021 07:38:43 +0200 Subject: [PATCH] Fix race condition in disk queue (#626) --- CHANGELOG.md | 4 ++++ neptune/new/internal/containers/disk_queue.py | 11 +++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5570d0d5a..391e286db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ### Features - Added NEPTUNE_MONITORING_NAMEPSACE environment variable ([#623](https://github.com/neptune-ai/neptune-client/pull/623)) +### Fixes +- Use absolute path for operations queue([#624](https://github.com/neptune-ai/neptune-client/pull/624)) +- Fix race condition in operations queue([#626](https://github.com/neptune-ai/neptune-client/pull/626)) + ## neptune-client 0.10.1 ### Features diff --git a/neptune/new/internal/containers/disk_queue.py b/neptune/new/internal/containers/disk_queue.py index 30373174d..95f2c8f46 100644 --- a/neptune/new/internal/containers/disk_queue.py +++ b/neptune/new/internal/containers/disk_queue.py @@ -83,18 +83,20 @@ def put(self, obj: T) -> int: def get(self) -> Tuple[Optional[T], int]: if self._should_skip_to_ack: - self._should_skip_to_ack = False return self._skip_and_get() else: return self._get() def _skip_and_get(self) -> Tuple[Optional[T], int]: ack_version = self._last_ack_file.read_local() + ver = -1 while True: - obj, ver = self._get() + obj, next_ver = self._get() if obj is None: return None, ver + ver = next_ver if ver > ack_version: + self._should_skip_to_ack = False if ver > ack_version + 1: _logger.warning("Possible data loss. Last acknowledged operation version: %d, next: %d", ack_version, ver) @@ -104,7 +106,7 @@ def _get(self) -> Tuple[Optional[T], int]: _json = self._reader.get() if not _json: if self._read_file_version >= self._write_file_version: - return None, self._last_put_file.read_local() + return None, -1 self._reader.close() self._read_file_version = self._next_log_file_version(self._read_file_version) self._reader = JsonFileSplitter(self._get_log_file(self._read_file_version)) @@ -121,9 +123,10 @@ def get_batch(self, size: int) -> Tuple[List[T], int]: return [], ver ret = [first] for _ in range(0, size - 1): - obj, ver = self._get() + obj, next_ver = self._get() if not obj: break + ver = next_ver ret.append(obj) return ret, ver