Skip to content

Commit

Permalink
Add processor id to ProcessorStopSignalData (#1625)
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksanderWWW committed Jan 30, 2024
1 parent f2e877f commit df946da
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Flag added for cleaning internal data ([#1589](https://github.com/neptune-ai/neptune-client/pull/1589))
- Handle logging in the `AsyncOperationProcessor` with `OperationLogger` and signal queue ([#1610](https://github.com/neptune-ai/neptune-client/pull/1610))
- Stringify `Handler` paths ([#1623](https://github.com/neptune-ai/neptune-client/pull/1623))
- Added processor id to `ProcessorStopSignalData` ([#1625](https://github.com/neptune-ai/neptune-client/pull/1625))


## 1.8.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ def _wait_for_queue_empty(
time_elapsed: float = 0.0
max_reconnect_wait_time: float = self.STOP_QUEUE_MAX_TIME_NO_CONNECTION_SECONDS if seconds is None else seconds
op_logger = ProcessorStopLogger(
signal_queue=signal_queue, logger=logger, should_print_logs=self._should_print_logs
processor_id=id(self),
signal_queue=signal_queue,
logger=logger,
should_print_logs=self._should_print_logs,
)
if initial_queue_size > 0:
if self._consumer.last_backoff_time > 0:
Expand Down
20 changes: 15 additions & 5 deletions src/neptune/internal/operation_processors/operation_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class ProcessorStopSignalType(Enum):

@dataclass
class ProcessorStopSignalData:
processor_id: int = 0
size_remaining: int = 0
already_synced: int = 0
already_synced_proc: float = 0.0
Expand All @@ -85,10 +86,12 @@ class ProcessorStopSignal:
class ProcessorStopLogger:
def __init__(
self,
processor_id: int,
signal_queue: Optional["Queue[ProcessorStopSignal]"],
logger: logging.Logger,
should_print_logs: bool = True,
) -> None:
self._id = processor_id
self._signal_queue = signal_queue
self._logger = logger
self._should_print_logs = should_print_logs
Expand All @@ -98,7 +101,9 @@ def log_connection_interruption(self, max_reconnect_wait_time: float) -> None:
self._signal_queue.put(
ProcessorStopSignal(
signal_type=ProcessorStopSignalType.CONNECTION_INTERRUPTED,
data=ProcessorStopSignalData(max_reconnect_wait_time=max_reconnect_wait_time),
data=ProcessorStopSignalData(
processor_id=self._id, max_reconnect_wait_time=max_reconnect_wait_time
),
)
)
else:
Expand All @@ -112,7 +117,7 @@ def log_remaining_operations(self, size_remaining: int) -> None:
self._signal_queue.put(
ProcessorStopSignal(
signal_type=ProcessorStopSignalType.WAITING_FOR_OPERATIONS,
data=ProcessorStopSignalData(size_remaining=size_remaining),
data=ProcessorStopSignalData(processor_id=self._id, size_remaining=size_remaining),
)
)
else:
Expand All @@ -126,7 +131,8 @@ def log_success(self, ops_synced: int) -> None:
if self._signal_queue is not None:
self._signal_queue.put(
ProcessorStopSignal(
signal_type=ProcessorStopSignalType.SUCCESS, data=ProcessorStopSignalData(already_synced=ops_synced)
signal_type=ProcessorStopSignalType.SUCCESS,
data=ProcessorStopSignalData(processor_id=self._id, already_synced=ops_synced),
)
)
else:
Expand All @@ -137,7 +143,8 @@ def log_sync_failure(self, seconds: float, size_remaining: int) -> None:
if self._signal_queue is not None:
self._signal_queue.put(
ProcessorStopSignal(
signal_type=ProcessorStopSignalType.SYNC_FAILURE, data=ProcessorStopSignalData(seconds=seconds)
signal_type=ProcessorStopSignalType.SYNC_FAILURE,
data=ProcessorStopSignalData(processor_id=self._id, seconds=seconds),
)
)
else:
Expand All @@ -154,7 +161,9 @@ def log_reconnect_failure(self, max_reconnect_wait_time: float, size_remaining:
ProcessorStopSignal(
signal_type=ProcessorStopSignalType.RECONNECT_FAILURE,
data=ProcessorStopSignalData(
max_reconnect_wait_time=max_reconnect_wait_time, size_remaining=size_remaining
processor_id=self._id,
max_reconnect_wait_time=max_reconnect_wait_time,
size_remaining=size_remaining,
),
)
)
Expand All @@ -172,6 +181,7 @@ def log_still_waiting(self, size_remaining: int, already_synced: int, already_sy
ProcessorStopSignal(
signal_type=ProcessorStopSignalType.STILL_WAITING,
data=ProcessorStopSignalData(
processor_id=self._id,
size_remaining=size_remaining,
already_synced=already_synced,
already_synced_proc=already_synced_proc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

class TestOperationLoggerNoQueue(unittest.TestCase):
def setUp(self):
self.logger = ProcessorStopLogger(signal_queue=None, logger=Mock())
self.logger = ProcessorStopLogger(processor_id=0, signal_queue=None, logger=Mock())

def test_log_connection_interruption(self):
self.logger.log_connection_interruption(10)
Expand Down Expand Up @@ -73,7 +73,7 @@ def test_log_still_waiting(self):

class TestOperationLoggerWithQueue(unittest.TestCase):
def setUp(self):
self.logger = ProcessorStopLogger(signal_queue=Mock(), logger=Mock())
self.logger = ProcessorStopLogger(processor_id=0, signal_queue=Mock(), logger=Mock())

def test_log_connection_interruption(self):
self.logger.log_connection_interruption(10)
Expand Down

0 comments on commit df946da

Please sign in to comment.