From 2af9207723e33c8e626f21212eb14d123a9271bb Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 25 May 2026 00:43:35 +0300 Subject: [PATCH] refactor(watcher): extract _WatchedFieldBase to consolidate sync and async field logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two WatchedField classes shared ~90% of their implementation but had no common base, causing drift (the sync version acquired a lock in _update but then read _value outside it for the callback; the async version had no lock at all). Extract _WatchedFieldBase with _apply_raw and _fire_callbacks helpers. Both subclasses call super().__init__, inherit path/on_change/__bool__, and delegate value-mutation to the base—with or without a lock as appropriate. The three _RECONNECT_* constants are also consolidated in the new module. Closes #60 Co-Authored-By: Claude --- sdk/src/opendecree/_watcher_base.py | 66 ++++++++++++++++++++++++++++ sdk/src/opendecree/async_watcher.py | 67 +++++++---------------------- sdk/src/opendecree/watcher.py | 66 ++++++---------------------- 3 files changed, 95 insertions(+), 104 deletions(-) create mode 100644 sdk/src/opendecree/_watcher_base.py diff --git a/sdk/src/opendecree/_watcher_base.py b/sdk/src/opendecree/_watcher_base.py new file mode 100644 index 0000000..eee17c4 --- /dev/null +++ b/sdk/src/opendecree/_watcher_base.py @@ -0,0 +1,66 @@ +"""Shared base class for WatchedField and AsyncWatchedField.""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import Generic, TypeVar + +from opendecree._convert import convert_value + +T = TypeVar("T") + +_logger = logging.getLogger("opendecree.watcher") + +_RECONNECT_INITIAL = 1.0 +_RECONNECT_MAX = 30.0 +_RECONNECT_MULTIPLIER = 2.0 + + +class _WatchedFieldBase(Generic[T]): + """Common state and helpers shared by WatchedField and AsyncWatchedField.""" + + def __init__(self, path: str, type_: type[T], default: T) -> None: + self._path = path + self._type = type_ + self._default = default + self._value: T = default + self._is_set = False + self._callbacks: list[Callable[[T, T], None]] = [] + + @property + def path(self) -> str: + """The field path this value tracks.""" + return self._path + + def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]: + """Register a callback for value changes. Can be used as a decorator. + + The callback receives (old_value, new_value). + """ + self._callbacks.append(fn) + return fn + + def __bool__(self) -> bool: + """Truthy based on the current value. False for False, 0, '', None.""" + return bool(self._value) + + def _apply_raw(self, raw_value: str | None) -> tuple[T, T]: + """Set _value/_is_set from a raw string. Returns (old, new). Caller must lock if needed.""" + old = self._value + if raw_value is not None: + self._value = convert_value(raw_value, self._type) # type: ignore[assignment] + self._is_set = True + else: + self._value = self._default + self._is_set = False + return old, self._value + + def _fire_callbacks(self, old: T, new: T) -> None: + """Invoke registered callbacks when the value changes.""" + if old != new: + for cb in self._callbacks: + try: + cb(old, new) + except Exception: + _logger.exception("Error in on_change callback for %s", self._path) diff --git a/sdk/src/opendecree/async_watcher.py b/sdk/src/opendecree/async_watcher.py index 65decaf..09bc2b3 100644 --- a/sdk/src/opendecree/async_watcher.py +++ b/sdk/src/opendecree/async_watcher.py @@ -21,66 +21,44 @@ import asyncio import logging import random -from collections.abc import AsyncIterator, Callable -from typing import Any, Generic, TypeVar +from collections.abc import AsyncIterator +from typing import Any, TypeVar import grpc.aio -from opendecree._convert import convert_value, typed_value_to_string +from opendecree._convert import typed_value_to_string from opendecree._stubs import process_get_all_response +from opendecree._watcher_base import ( + _RECONNECT_INITIAL, + _RECONNECT_MAX, + _RECONNECT_MULTIPLIER, + _WatchedFieldBase, +) from opendecree.types import Change logger = logging.getLogger("opendecree.async_watcher") T = TypeVar("T") -# Default reconnect backoff parameters. -_RECONNECT_INITIAL = 1.0 -_RECONNECT_MAX = 30.0 -_RECONNECT_MULTIPLIER = 2.0 - -class AsyncWatchedField(Generic[T]): - """A live, thread-safe configuration field with a typed value (async variant). +class AsyncWatchedField(_WatchedFieldBase[T]): + """A live configuration field with a typed value (async variant). Updated automatically by the watcher's asyncio task. """ def __init__(self, path: str, type_: type[T], default: T) -> None: - self._path = path - self._type = type_ - self._default = default - self._value: T = default - self._is_set = False - self._callbacks: list[Callable[[T, T], None]] = [] + super().__init__(path, type_, default) self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue() - @property - def path(self) -> str: - """The field path this value tracks.""" - return self._path - @property def value(self) -> T: """The current value — always fresh.""" return self._value - def __bool__(self) -> bool: - """Truthy based on the current value. False for False, 0, '', None.""" - return bool(self._value) - def __repr__(self) -> str: return f"AsyncWatchedField({self._path!r}, value={self._value!r})" - def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]: - """Register a callback for value changes. Can be used as a decorator. - - The callback receives (old_value, new_value) and is called from the - watcher's asyncio task. - """ - self._callbacks.append(fn) - return fn - async def changes(self) -> AsyncIterator[Change]: """Async iterator that yields Change events for this field. @@ -94,28 +72,13 @@ async def changes(self) -> AsyncIterator[Change]: def _update(self, raw_value: str | None, change: Change) -> None: """Update the field value from a raw string. Called by the watcher task.""" - old = self._value - if raw_value is not None: - self._value = convert_value(raw_value, self._type) # type: ignore[assignment] - self._is_set = True - else: - self._value = self._default - self._is_set = False - - new = self._value - if old != new: - for cb in self._callbacks: - try: - cb(old, new) - except Exception: - logger.exception("Error in on_change callback for %s", self._path) - + old, new = self._apply_raw(raw_value) + self._fire_callbacks(old, new) self._change_queue.put_nowait(change) def _load_initial(self, raw_value: str) -> None: """Set initial value from snapshot. No callbacks fired.""" - self._value = convert_value(raw_value, self._type) # type: ignore[assignment] - self._is_set = True + self._apply_raw(raw_value) def _stop(self) -> None: """Signal the changes() iterator to stop.""" diff --git a/sdk/src/opendecree/watcher.py b/sdk/src/opendecree/watcher.py index 7c9b04f..f7c2cc2 100644 --- a/sdk/src/opendecree/watcher.py +++ b/sdk/src/opendecree/watcher.py @@ -20,68 +20,46 @@ import random import threading import time -from collections.abc import Callable, Iterator -from typing import Any, Generic, TypeVar +from collections.abc import Iterator +from typing import Any, TypeVar import grpc -from opendecree._convert import convert_value, typed_value_to_string +from opendecree._convert import typed_value_to_string from opendecree._stubs import process_get_all_response +from opendecree._watcher_base import ( + _RECONNECT_INITIAL, + _RECONNECT_MAX, + _RECONNECT_MULTIPLIER, + _WatchedFieldBase, +) from opendecree.types import Change logger = logging.getLogger("opendecree.watcher") T = TypeVar("T") -# Default reconnect backoff parameters. -_RECONNECT_INITIAL = 1.0 -_RECONNECT_MAX = 30.0 -_RECONNECT_MULTIPLIER = 2.0 - -class WatchedField(Generic[T]): +class WatchedField(_WatchedFieldBase[T]): """A live, thread-safe configuration field with a typed value. Attributes are updated automatically by the watcher's background thread. """ def __init__(self, path: str, type_: type[T], default: T) -> None: - self._path = path - self._type = type_ - self._default = default - self._value: T = default - self._is_set = False + super().__init__(path, type_, default) self._lock = threading.Lock() - self._callbacks: list[Callable[[T, T], None]] = [] self._change_queue: queue.Queue[Change] = queue.Queue() - @property - def path(self) -> str: - """The field path this value tracks.""" - return self._path - @property def value(self) -> T: """The current value — always fresh, thread-safe.""" with self._lock: return self._value - def __bool__(self) -> bool: - """Truthy based on the current value. False for False, 0, '', None.""" - return bool(self.value) - def __repr__(self) -> str: return f"WatchedField({self._path!r}, value={self.value!r})" - def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]: - """Register a callback for value changes. Can be used as a decorator. - - The callback receives (old_value, new_value) and is called from the - watcher's background thread. - """ - self._callbacks.append(fn) - return fn - def changes(self) -> Iterator[Change]: """Blocking iterator that yields Change events for this field. @@ -100,30 +78,14 @@ def changes(self) -> Iterator[Change]: def _update(self, raw_value: str | None, change: Change) -> None: """Update the field value from a raw string. Called by the watcher thread.""" with self._lock: - old = self._value - if raw_value is not None: - self._value = convert_value(raw_value, self._type) # type: ignore[assignment] - self._is_set = True - else: - self._value = self._default - self._is_set = False - - # Notify callbacks (outside the lock to avoid deadlocks). - new = self._value - if old != new: - for cb in self._callbacks: - try: - cb(old, new) - except Exception: - logger.exception("Error in on_change callback for %s", self._path) - + old, new = self._apply_raw(raw_value) + self._fire_callbacks(old, new) self._change_queue.put(change) def _load_initial(self, raw_value: str) -> None: """Set initial value from snapshot. No callbacks fired.""" with self._lock: - self._value = convert_value(raw_value, self._type) # type: ignore[assignment] - self._is_set = True + self._apply_raw(raw_value) def _stop(self) -> None: """Signal the changes() iterator to stop."""