From 9c3cd49ced5a2fd6219175cbdeeaa4cdf170a21c Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 28 Oct 2025 12:02:47 -0400 Subject: [PATCH 1/2] chore: Add FDv2-compatible contract test support --- contract-tests/client_entity.py | 71 ++++++++++++- ldclient/impl/datasourcev2/polling.py | 11 ++- ldclient/impl/datasourcev2/streaming.py | 87 +++++----------- ldclient/impl/datasystem/__init__.py | 8 ++ ldclient/impl/datasystem/config.py | 25 ++--- ldclient/impl/datasystem/fdv2.py | 28 ++++++ ldclient/impl/datasystem/store.py | 99 ++++++++++++++++++- .../test_streaming_synchronizer.py | 48 ++++++--- 8 files changed, 285 insertions(+), 92 deletions(-) diff --git a/contract-tests/client_entity.py b/contract-tests/client_entity.py index c0030adb..6b627851 100644 --- a/contract-tests/client_entity.py +++ b/contract-tests/client_entity.py @@ -15,6 +15,12 @@ Stage ) from ldclient.config import BigSegmentsConfig +from ldclient.impl.datasourcev2.polling import PollingDataSourceBuilder +from ldclient.impl.datasystem.config import ( + custom, + polling_ds_builder, + streaming_ds_builder +) class ClientEntity: @@ -29,7 +35,70 @@ def __init__(self, tag, config): 'version': tags.get('applicationVersion', ''), } - if config.get("streaming") is not None: + datasystem_config = config.get('dataSystem') + if datasystem_config is not None: + datasystem = custom() + + init_configs = datasystem_config.get('initializers') + if init_configs is not None: + initializers = [] + for init_config in init_configs: + polling = init_config.get('polling') + if polling is not None: + if polling.get("baseUri") is not None: + opts["base_uri"] = polling["baseUri"] + _set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval") + polling = polling_ds_builder() + initializers.append(polling) + + datasystem.initializers(initializers) + sync_config = datasystem_config.get('synchronizers') + if sync_config is not None: + primary = sync_config.get('primary') + secondary = sync_config.get('secondary') + + primary_builder = None + secondary_builder = None + + if primary is not None: + streaming = primary.get('streaming') + if streaming is not None: + primary_builder = streaming_ds_builder() + if streaming.get("baseUri") is not None: + opts["stream_uri"] = streaming["baseUri"] + _set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay") + primary_builder = streaming_ds_builder() + elif primary.get('polling') is not None: + polling = primary.get('polling') + if polling.get("baseUri") is not None: + opts["base_uri"] = polling["baseUri"] + _set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval") + primary_builder = polling_ds_builder() + + if secondary is not None: + streaming = secondary.get('streaming') + if streaming is not None: + secondary_builder = streaming_ds_builder() + if streaming.get("baseUri") is not None: + opts["stream_uri"] = streaming["baseUri"] + _set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay") + secondary_builder = streaming_ds_builder() + elif secondary.get('polling') is not None: + polling = secondary.get('polling') + if polling.get("baseUri") is not None: + opts["base_uri"] = polling["baseUri"] + _set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval") + secondary_builder = polling_ds_builder() + + if primary_builder is not None: + datasystem.synchronizers(primary_builder, secondary_builder) + + if datasystem_config.get("payloadFilter") is not None: + opts["payload_filter_key"] = datasystem_config["payloadFilter"] + + opts["datasystem_config"] = datasystem.build() + + elif config.get("streaming") is not None: streaming = config["streaming"] if streaming.get("baseUri") is not None: opts["stream_uri"] = streaming["baseUri"] diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index c77ff8b4..8a350c82 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -86,6 +86,7 @@ def __init__( self._requester = requester self._poll_interval = poll_interval self._event = Event() + self._stop = Event() self._task = RepeatingTask( "ldclient.datasource.polling", poll_interval, 0, self._poll ) @@ -108,7 +109,8 @@ def sync(self) -> Generator[Update, None, None]: occurs. """ log.info("Starting PollingDataSourceV2 synchronizer") - while True: + self._stop.clear() + while self._stop.is_set() is False: result = self._requester.fetch(None) if isinstance(result, _Fail): if isinstance(result.exception, UnsuccessfulResponseException): @@ -161,6 +163,13 @@ def sync(self) -> Generator[Update, None, None]: if self._event.wait(self._poll_interval): break + def stop(self): + """Stops the synchronizer.""" + log.info("Stopping PollingDataSourceV2 synchronizer") + self._event.set() + self._task.stop() + self._stop.set() + def _poll(self) -> BasisResult: try: # TODO(fdv2): Need to pass the selector through diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 808b5238..75e44552 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -9,7 +9,7 @@ from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple from urllib import parse -from ld_eventsource import SSEClient as SSEClientImpl +from ld_eventsource import SSEClient from ld_eventsource.actions import Action, Event, Fault from ld_eventsource.config import ( ConnectStrategy, @@ -54,33 +54,19 @@ STREAMING_ENDPOINT = "/sdk/stream" -class SSEClient(Protocol): # pylint: disable=too-few-public-methods - """ - SSEClient is a protocol that defines the interface for a client that can - connect to a Server-Sent Events (SSE) stream and provide an iterable of - actions received from that stream. - """ - - @property - @abstractmethod - def all(self) -> Iterable[Action]: - """ - Returns an iterable of all actions received from the SSE stream. - """ - raise NotImplementedError - - SseClientBuilder = Callable[[Config], SSEClient] # TODO(sdk-1391): Pass a selector-retrieving function through so it can # re-connect with the last known status. -def create_sse_client(config: Config) -> SSEClientImpl: +def create_sse_client(config: Config) -> SSEClient: """ " - create_sse_client creates an SSEClientImpl instance configured to connect + create_sse_client creates an SSEClient instance configured to connect to the LaunchDarkly streaming endpoint. """ uri = config.stream_base_uri + STREAMING_ENDPOINT + if config.payload_filter_key is not None: + uri += "?%s" % parse.urlencode({"filter": config.payload_filter_key}) # We don't want the stream to use the same read timeout as the rest of the SDK. http_factory = _http_factory(config) @@ -90,7 +76,7 @@ def create_sse_client(config: Config) -> SSEClientImpl: override_read_timeout=STREAM_READ_TIMEOUT, ) - return SSEClientImpl( + return SSEClient( connect=ConnectStrategy.http( url=uri, headers=http_factory.base_headers, @@ -119,15 +105,11 @@ class StreamingDataSource(Synchronizer): from the streaming data source. """ - def __init__( - self, config: Config, sse_client_builder: SseClientBuilder = create_sse_client - ): - self._sse_client_builder = sse_client_builder - self._uri = config.stream_base_uri + STREAMING_ENDPOINT - if config.payload_filter_key is not None: - self._uri += "?%s" % parse.urlencode({"filter": config.payload_filter_key}) + def __init__(self, config: Config): + self._sse_client_builder = create_sse_client self._config = config self._sse: Optional[SSEClient] = None + self._running = False @property def name(self) -> str: @@ -142,13 +124,13 @@ def sync(self) -> Generator[Update, None, None]: Update objects until the connection is closed or an unrecoverable error occurs. """ - log.info("Starting StreamingUpdateProcessor connecting to uri: %s", self._uri) self._sse = self._sse_client_builder(self._config) if self._sse is None: log.error("Failed to create SSE client for streaming updates.") return change_set_builder = ChangeSetBuilder() + self._running = True for action in self._sse.all: if isinstance(action, Fault): @@ -177,8 +159,7 @@ def sync(self) -> Generator[Update, None, None]: log.info( "Error while handling stream event; will restart stream: %s", e ) - # TODO(sdk-1409) - # self._sse.interrupt() + self._sse.interrupt() (update, should_continue) = self._handle_error(e) if update is not None: @@ -189,8 +170,7 @@ def sync(self) -> Generator[Update, None, None]: log.info( "Error while handling stream event; will restart stream: %s", e ) - # TODO(sdk-1409) - # self._sse.interrupt() + self._sse.interrupt() yield Update( state=DataSourceState.INTERRUPTED, @@ -210,27 +190,16 @@ def sync(self) -> Generator[Update, None, None]: # DataSourceState.VALID, None # ) - # if not self._ready.is_set(): - # log.info("StreamingUpdateProcessor initialized ok.") - # self._ready.set() - - # TODO(sdk-1409) - # self._sse.close() - - # TODO(sdk-1409) - # def stop(self): - # self.__stop_with_error_info(None) - # - # def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]): - # log.info("Stopping StreamingUpdateProcessor") - # self._running = False - # if self._sse: - # self._sse.close() - # - # if self._data_source_update_sink is None: - # return - # - # self._data_source_update_sink.update_status(DataSourceState.OFF, error) + self._sse.close() + + def stop(self): + """ + Stops the streaming synchronizer, closing any open connections. + """ + log.info("Stopping StreamingUpdateProcessor") + self._running = False + if self._sse: + self._sse.close() # pylint: disable=too-many-return-statements def _process_message( @@ -317,8 +286,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: If an update is provided, it should be forward upstream, regardless of whether or not we are going to retry this failure. """ - # if not self._running: - # return (False, None) # don't retry if we've been deliberately stopped + if not self._running: + return (None, False) # don't retry if we've been deliberately stopped update: Optional[Update] = None @@ -362,10 +331,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: if not is_recoverable: log.error(http_error_message_result) - # TODO(sdk-1409) - # self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited - # self.__stop_with_error_info(error_info) - # self.stop() + self.stop() return (update, False) log.warning(http_error_message_result) @@ -391,8 +357,7 @@ def __enter__(self): return self def __exit__(self, type, value, traceback): - # self.stop() - pass + self.stop() class StreamingDataSourceBuilder: # disable: pylint: disable=too-few-public-methods diff --git a/ldclient/impl/datasystem/__init__.py b/ldclient/impl/datasystem/__init__.py index 15b9e8f0..cc6fbba5 100644 --- a/ldclient/impl/datasystem/__init__.py +++ b/ldclient/impl/datasystem/__init__.py @@ -212,3 +212,11 @@ def sync(self) -> Generator[Update, None, None]: occurs. """ raise NotImplementedError + + @abstractmethod + def stop(self): + """ + stop should halt the synchronization process, causing the sync method + to exit as soon as possible. + """ + raise NotImplementedError diff --git a/ldclient/impl/datasystem/config.py b/ldclient/impl/datasystem/config.py index c02ba952..b179ff9f 100644 --- a/ldclient/impl/datasystem/config.py +++ b/ldclient/impl/datasystem/config.py @@ -28,11 +28,13 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods Builder for the data system configuration. """ - _initializers: Optional[List[Builder[Initializer]]] = None - _primary_synchronizer: Optional[Builder[Synchronizer]] = None - _secondary_synchronizer: Optional[Builder[Synchronizer]] = None - _store_mode: DataStoreMode = DataStoreMode.READ_ONLY - _data_store: Optional[FeatureStore] = None + def __init__(self) -> None: + self._initializers: Optional[List[Builder[Initializer]]] = None + self._primary_synchronizer: Optional[Builder[Synchronizer]] = None + self._secondary_synchronizer: Optional[Builder[Synchronizer]] = None + self._fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None + self._store_mode: DataStoreMode = DataStoreMode.READ_ONLY + self._data_store: Optional[FeatureStore] = None def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder": """ @@ -72,12 +74,13 @@ def build(self) -> DataSystemConfig: initializers=self._initializers, primary_synchronizer=self._primary_synchronizer, secondary_synchronizer=self._secondary_synchronizer, + fdv1_fallback_synchronizer=self._fdv1_fallback_synchronizer, data_store_mode=self._store_mode, data_store=self._data_store, ) -def __polling_ds_builder() -> Builder[PollingDataSource]: +def polling_ds_builder() -> Builder[PollingDataSource]: def builder(config: LDConfig) -> PollingDataSource: requester = Urllib3PollingRequester(config) polling_ds = PollingDataSourceBuilder(config) @@ -88,7 +91,7 @@ def builder(config: LDConfig) -> PollingDataSource: return builder -def __streaming_ds_builder() -> Builder[StreamingDataSource]: +def streaming_ds_builder() -> Builder[StreamingDataSource]: def builder(config: LDConfig) -> StreamingDataSource: return StreamingDataSourceBuilder(config).build() @@ -109,8 +112,8 @@ def default() -> ConfigBuilder: for updates. """ - polling_builder = __polling_ds_builder() - streaming_builder = __streaming_ds_builder() + polling_builder = polling_ds_builder() + streaming_builder = streaming_ds_builder() builder = ConfigBuilder() builder.initializers([polling_builder]) @@ -126,7 +129,7 @@ def streaming() -> ConfigBuilder: with no additional latency. """ - streaming_builder = __streaming_ds_builder() + streaming_builder = streaming_ds_builder() builder = ConfigBuilder() builder.synchronizers(streaming_builder) @@ -141,7 +144,7 @@ def polling() -> ConfigBuilder: streaming, but may be necessary in some network environments. """ - polling_builder: Builder[Synchronizer] = __polling_ds_builder() + polling_builder: Builder[Synchronizer] = polling_ds_builder() builder = ConfigBuilder() builder.synchronizers(polling_builder) diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index e41386e3..01824203 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -205,6 +205,8 @@ def __init__( # Threading self._stop_event = Event() + self._lock = ReadWriteLock() + self._active_synchronizer: Optional[Synchronizer] = None self._threads: List[Thread] = [] # Track configuration @@ -240,10 +242,20 @@ def stop(self): """Stop the FDv2 data system and all associated threads.""" self._stop_event.set() + self._lock.lock() + if self._active_synchronizer is not None: + try: + self._active_synchronizer.stop() + except Exception as e: + log.error("Error stopping active data source: %s", e) + self._lock.unlock() + # Wait for all threads to complete for thread in self._threads: if thread.is_alive(): thread.join(timeout=5.0) # 5 second timeout + if thread.is_alive(): + log.warning("Thread %s did not terminate in time", thread.name) # Close the store self._store.close() @@ -319,7 +331,11 @@ def synchronizer_loop(self: 'FDv2'): while not self._stop_event.is_set() and self._primary_synchronizer_builder is not None: # Try primary synchronizer try: + self._lock.lock() primary_sync = self._primary_synchronizer_builder(self._config) + self._active_synchronizer = primary_sync + self._lock.unlock() + log.info("Primary synchronizer %s is starting", primary_sync.name) remove_sync, fallback_v1 = self._consume_synchronizer_results( @@ -345,9 +361,14 @@ def synchronizer_loop(self: 'FDv2'): if self._secondary_synchronizer_builder is None: continue + if self._stop_event.is_set(): + break + self._lock.lock() secondary_sync = self._secondary_synchronizer_builder(self._config) log.info("Secondary synchronizer %s is starting", secondary_sync.name) + self._active_synchronizer = secondary_sync + self._lock.unlock() remove_sync, fallback_v1 = self._consume_synchronizer_results( secondary_sync, set_on_ready, self._recovery_condition @@ -378,6 +399,11 @@ def synchronizer_loop(self: 'FDv2'): # Ensure we always set the ready event when exiting if not set_on_ready.is_set(): set_on_ready.set() + self._lock.lock() + if self._active_synchronizer is not None: + self._active_synchronizer.stop() + self._active_synchronizer = None + self._lock.unlock() sync_thread = Thread( target=synchronizer_loop, @@ -428,6 +454,8 @@ def _consume_synchronizer_results( except Exception as e: log.error("Error consuming synchronizer results: %s", e) return True, False + finally: + synchronizer.stop() return True, False diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 94f015e7..9686c193 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -7,9 +7,9 @@ """ import threading -from typing import Dict, List, Mapping, Optional, Set +from collections import defaultdict +from typing import Any, Callable, Dict, List, Mapping, Optional, Set -from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.datasystem.protocolv2 import ( Change, ChangeSet, @@ -20,15 +20,110 @@ ) from ldclient.impl.dependency_tracker import DependencyTracker, KindAndKey from ldclient.impl.listeners import Listeners +from ldclient.impl.rwlock import ReadWriteLock from ldclient.impl.util import log from ldclient.interfaces import ( DataStoreStatusProvider, + DiagnosticDescription, FeatureStore, FlagChange ) from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind +class InMemoryFeatureStore(FeatureStore, DiagnosticDescription): + """The default feature store implementation, which holds all data in a thread-safe data structure in memory.""" + + def __init__(self): + """Constructs an instance of InMemoryFeatureStore.""" + self._lock = ReadWriteLock() + self._initialized = False + self._items = defaultdict(dict) + + def is_monitoring_enabled(self) -> bool: + return False + + def is_available(self) -> bool: + return True + + def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: + """ """ + try: + self._lock.rlock() + items_of_kind = self._items[kind] + item = items_of_kind.get(key) + if item is None: + log.debug("Attempted to get missing key %s in '%s', returning None", key, kind.namespace) + return callback(None) + if 'deleted' in item and item['deleted']: + log.debug("Attempted to get deleted key %s in '%s', returning None", key, kind.namespace) + return callback(None) + return callback(item) + finally: + self._lock.runlock() + + def all(self, kind, callback): + """ """ + try: + self._lock.rlock() + items_of_kind = self._items[kind] + return callback(dict((k, i) for k, i in items_of_kind.items() if ('deleted' not in i) or not i['deleted'])) + finally: + self._lock.runlock() + + def init(self, all_data): + """ """ + all_decoded = {} + for kind, items in all_data.items(): + items_decoded = {} + for key, item in items.items(): + items_decoded[key] = kind.decode(item) + all_decoded[kind] = items_decoded + try: + self._lock.rlock() + self._items.clear() + self._items.update(all_decoded) + self._initialized = True + for k in all_data: + log.debug("Initialized '%s' store with %d items", k.namespace, len(all_data[k])) + finally: + self._lock.runlock() + + # noinspection PyShadowingNames + def delete(self, kind, key: str, version: int): + """ """ + try: + self._lock.rlock() + items_of_kind = self._items[kind] + items_of_kind[key] = {'deleted': True, 'version': version} + finally: + self._lock.runlock() + + def upsert(self, kind, item): + """ """ + decoded_item = kind.decode(item) + key = item['key'] + try: + self._lock.rlock() + items_of_kind = self._items[kind] + items_of_kind[key] = decoded_item + log.debug("Updated %s in '%s' to version %d", key, kind.namespace, item['version']) + finally: + self._lock.runlock() + + @property + def initialized(self) -> bool: + """ """ + try: + self._lock.rlock() + return self._initialized + finally: + self._lock.runlock() + + def describe_configuration(self, config): + return 'memory' + + class Store: """ Store is a dual-mode persistent/in-memory store that serves requests for data from the evaluation diff --git a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py index 8aa66bbb..d78aac6c 100644 --- a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py @@ -51,6 +51,12 @@ def __init__( def all(self) -> Iterable[Action]: return self._events + def interrupt(self): + pass + + def close(self): + pass + class HttpExceptionThrowingSseClient: def __init__(self, status_codes: List[int]): # pylint: disable=redefined-outer-name @@ -74,16 +80,16 @@ class UnknownTypeOfEvent(Action): pass unknown_named_event = Event(event="Unknown") - builder = list_sse_client([UnknownTypeOfEvent(), unknown_named_event]) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = list_sse_client([UnknownTypeOfEvent(), unknown_named_event]) assert len(list(synchronizer.sync())) == 0 def test_ignores_faults_without_errors(): errorless_fault = Fault(error=None) - builder = list_sse_client([errorless_fault]) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = list_sse_client([errorless_fault]) assert len(list(synchronizer.sync())) == 0 @@ -160,9 +166,9 @@ def test_handles_no_changes(): event=EventName.SERVER_INTENT, data=json.dumps(server_intent.to_dict()), ) - builder = list_sse_client([intent_event]) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = list_sse_client([intent_event]) updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -181,7 +187,8 @@ def test_handles_empty_changeset(events): # pylint: disable=redefined-outer-nam ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -207,7 +214,8 @@ def test_handles_put_objects(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -238,7 +246,8 @@ def test_handles_delete_objects(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -268,7 +277,8 @@ def test_swallows_goodbye(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -294,7 +304,8 @@ def test_swallows_heartbeat(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -322,7 +333,8 @@ def test_error_resets(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -345,7 +357,8 @@ def test_handles_out_of_order(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -375,7 +388,8 @@ def test_invalid_json_decoding(events): # pylint: disable=redefined-outer-name ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 2 @@ -407,7 +421,8 @@ def test_stops_on_unrecoverable_status_code( ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 1 @@ -436,7 +451,8 @@ def test_continues_on_recoverable_status_code( events[EventName.PAYLOAD_TRANSFERRED], ] ) - synchronizer = StreamingDataSource(Config(sdk_key="key"), builder) + synchronizer = StreamingDataSource(Config(sdk_key="key")) + synchronizer._sse_client_builder = builder updates = list(synchronizer.sync()) assert len(updates) == 3 From e87daa0ca4826d9960e8893a28d67c333fb77523 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 4 Nov 2025 08:32:30 -0500 Subject: [PATCH 2/2] Fix invalid lock usage --- ldclient/impl/datasystem/store.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 9686c193..dabd5d29 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -80,36 +80,36 @@ def init(self, all_data): items_decoded[key] = kind.decode(item) all_decoded[kind] = items_decoded try: - self._lock.rlock() + self._lock.lock() self._items.clear() self._items.update(all_decoded) self._initialized = True for k in all_data: log.debug("Initialized '%s' store with %d items", k.namespace, len(all_data[k])) finally: - self._lock.runlock() + self._lock.unlock() # noinspection PyShadowingNames def delete(self, kind, key: str, version: int): """ """ try: - self._lock.rlock() + self._lock.lock() items_of_kind = self._items[kind] items_of_kind[key] = {'deleted': True, 'version': version} finally: - self._lock.runlock() + self._lock.unlock() def upsert(self, kind, item): """ """ decoded_item = kind.decode(item) key = item['key'] try: - self._lock.rlock() + self._lock.lock() items_of_kind = self._items[kind] items_of_kind[key] = decoded_item log.debug("Updated %s in '%s' to version %d", key, kind.namespace, item['version']) finally: - self._lock.runlock() + self._lock.unlock() @property def initialized(self) -> bool: