Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion contract-tests/client_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
Stage
)
from ldclient.config import BigSegmentsConfig
from ldclient.impl.datasourcev2.polling import PollingDataSourceBuilder
from ldclient.impl.datasystem.config import (
custom,
polling_ds_builder,
streaming_ds_builder
)


class ClientEntity:
Expand All @@ -29,7 +35,70 @@ def __init__(self, tag, config):
'version': tags.get('applicationVersion', ''),
}

if config.get("streaming") is not None:
datasystem_config = config.get('dataSystem')
if datasystem_config is not None:
datasystem = custom()

init_configs = datasystem_config.get('initializers')
if init_configs is not None:
initializers = []
for init_config in init_configs:
polling = init_config.get('polling')
if polling is not None:
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
polling = polling_ds_builder()
initializers.append(polling)

datasystem.initializers(initializers)
sync_config = datasystem_config.get('synchronizers')
if sync_config is not None:
primary = sync_config.get('primary')
secondary = sync_config.get('secondary')

primary_builder = None
secondary_builder = None

if primary is not None:
streaming = primary.get('streaming')
if streaming is not None:
primary_builder = streaming_ds_builder()
if streaming.get("baseUri") is not None:
opts["stream_uri"] = streaming["baseUri"]
_set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay")
primary_builder = streaming_ds_builder()
elif primary.get('polling') is not None:
polling = primary.get('polling')
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
primary_builder = polling_ds_builder()

if secondary is not None:
streaming = secondary.get('streaming')
if streaming is not None:
secondary_builder = streaming_ds_builder()
if streaming.get("baseUri") is not None:
opts["stream_uri"] = streaming["baseUri"]
_set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay")
secondary_builder = streaming_ds_builder()
elif secondary.get('polling') is not None:
polling = secondary.get('polling')
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
secondary_builder = polling_ds_builder()

if primary_builder is not None:
datasystem.synchronizers(primary_builder, secondary_builder)

if datasystem_config.get("payloadFilter") is not None:
opts["payload_filter_key"] = datasystem_config["payloadFilter"]

opts["datasystem_config"] = datasystem.build()

elif config.get("streaming") is not None:
streaming = config["streaming"]
if streaming.get("baseUri") is not None:
opts["stream_uri"] = streaming["baseUri"]
Expand Down
11 changes: 10 additions & 1 deletion ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(
self._requester = requester
self._poll_interval = poll_interval
self._event = Event()
self._stop = Event()
self._task = RepeatingTask(
"ldclient.datasource.polling", poll_interval, 0, self._poll
)
Expand All @@ -108,7 +109,8 @@ def sync(self) -> Generator[Update, None, None]:
occurs.
"""
log.info("Starting PollingDataSourceV2 synchronizer")
while True:
self._stop.clear()
while self._stop.is_set() is False:
result = self._requester.fetch(None)
if isinstance(result, _Fail):
if isinstance(result.exception, UnsuccessfulResponseException):
Expand Down Expand Up @@ -161,6 +163,13 @@ def sync(self) -> Generator[Update, None, None]:
if self._event.wait(self._poll_interval):
break

def stop(self):
"""Stops the synchronizer."""
log.info("Stopping PollingDataSourceV2 synchronizer")
self._event.set()
self._task.stop()
self._stop.set()

def _poll(self) -> BasisResult:
try:
# TODO(fdv2): Need to pass the selector through
Expand Down
87 changes: 26 additions & 61 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple
from urllib import parse

from ld_eventsource import SSEClient as SSEClientImpl
from ld_eventsource import SSEClient
from ld_eventsource.actions import Action, Event, Fault
from ld_eventsource.config import (
ConnectStrategy,
Expand Down Expand Up @@ -54,33 +54,19 @@
STREAMING_ENDPOINT = "/sdk/stream"


class SSEClient(Protocol): # pylint: disable=too-few-public-methods
"""
SSEClient is a protocol that defines the interface for a client that can
connect to a Server-Sent Events (SSE) stream and provide an iterable of
actions received from that stream.
"""

@property
@abstractmethod
def all(self) -> Iterable[Action]:
"""
Returns an iterable of all actions received from the SSE stream.
"""
raise NotImplementedError


SseClientBuilder = Callable[[Config], SSEClient]


# TODO(sdk-1391): Pass a selector-retrieving function through so it can
# re-connect with the last known status.
def create_sse_client(config: Config) -> SSEClientImpl:
def create_sse_client(config: Config) -> SSEClient:
""" "
create_sse_client creates an SSEClientImpl instance configured to connect
create_sse_client creates an SSEClient instance configured to connect
to the LaunchDarkly streaming endpoint.
"""
uri = config.stream_base_uri + STREAMING_ENDPOINT
if config.payload_filter_key is not None:
uri += "?%s" % parse.urlencode({"filter": config.payload_filter_key})

# We don't want the stream to use the same read timeout as the rest of the SDK.
http_factory = _http_factory(config)
Expand All @@ -90,7 +76,7 @@ def create_sse_client(config: Config) -> SSEClientImpl:
override_read_timeout=STREAM_READ_TIMEOUT,
)

return SSEClientImpl(
return SSEClient(
connect=ConnectStrategy.http(
url=uri,
headers=http_factory.base_headers,
Expand Down Expand Up @@ -119,15 +105,11 @@ class StreamingDataSource(Synchronizer):
from the streaming data source.
"""

def __init__(
self, config: Config, sse_client_builder: SseClientBuilder = create_sse_client
):
self._sse_client_builder = sse_client_builder
self._uri = config.stream_base_uri + STREAMING_ENDPOINT
if config.payload_filter_key is not None:
self._uri += "?%s" % parse.urlencode({"filter": config.payload_filter_key})
def __init__(self, config: Config):
self._sse_client_builder = create_sse_client
self._config = config
self._sse: Optional[SSEClient] = None
self._running = False

@property
def name(self) -> str:
Expand All @@ -142,13 +124,13 @@ def sync(self) -> Generator[Update, None, None]:
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
log.info("Starting StreamingUpdateProcessor connecting to uri: %s", self._uri)
self._sse = self._sse_client_builder(self._config)
if self._sse is None:
log.error("Failed to create SSE client for streaming updates.")
return

change_set_builder = ChangeSetBuilder()
self._running = True

for action in self._sse.all:
if isinstance(action, Fault):
Expand Down Expand Up @@ -177,8 +159,7 @@ def sync(self) -> Generator[Update, None, None]:
log.info(
"Error while handling stream event; will restart stream: %s", e
)
# TODO(sdk-1409)
# self._sse.interrupt()
self._sse.interrupt()

(update, should_continue) = self._handle_error(e)
if update is not None:
Expand All @@ -189,8 +170,7 @@ def sync(self) -> Generator[Update, None, None]:
log.info(
"Error while handling stream event; will restart stream: %s", e
)
# TODO(sdk-1409)
# self._sse.interrupt()
self._sse.interrupt()

yield Update(
state=DataSourceState.INTERRUPTED,
Expand All @@ -210,27 +190,16 @@ def sync(self) -> Generator[Update, None, None]:
# DataSourceState.VALID, None
# )

# if not self._ready.is_set():
# log.info("StreamingUpdateProcessor initialized ok.")
# self._ready.set()

# TODO(sdk-1409)
# self._sse.close()

# TODO(sdk-1409)
# def stop(self):
# self.__stop_with_error_info(None)
#
# def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]):
# log.info("Stopping StreamingUpdateProcessor")
# self._running = False
# if self._sse:
# self._sse.close()
#
# if self._data_source_update_sink is None:
# return
#
# self._data_source_update_sink.update_status(DataSourceState.OFF, error)
self._sse.close()

def stop(self):
"""
Stops the streaming synchronizer, closing any open connections.
"""
log.info("Stopping StreamingUpdateProcessor")
self._running = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume no threading concerns in this implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there should be. The data system starts it's own thread which creates and destroys synchronizers synchronously.

if self._sse:
self._sse.close()

# pylint: disable=too-many-return-statements
def _process_message(
Expand Down Expand Up @@ -317,8 +286,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
If an update is provided, it should be forward upstream, regardless of
whether or not we are going to retry this failure.
"""
# if not self._running:
# return (False, None) # don't retry if we've been deliberately stopped
if not self._running:
return (None, False) # don't retry if we've been deliberately stopped

update: Optional[Update] = None

Expand Down Expand Up @@ -362,10 +331,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:

if not is_recoverable:
log.error(http_error_message_result)
# TODO(sdk-1409)
# self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited
# self.__stop_with_error_info(error_info)
# self.stop()
self.stop()
return (update, False)

log.warning(http_error_message_result)
Expand All @@ -391,8 +357,7 @@ def __enter__(self):
return self

def __exit__(self, type, value, traceback):
# self.stop()
pass
self.stop()


class StreamingDataSourceBuilder: # disable: pylint: disable=too-few-public-methods
Expand Down
8 changes: 8 additions & 0 deletions ldclient/impl/datasystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,11 @@ def sync(self) -> Generator[Update, None, None]:
occurs.
"""
raise NotImplementedError

@abstractmethod
def stop(self):
"""
stop should halt the synchronization process, causing the sync method
to exit as soon as possible.
"""
raise NotImplementedError
25 changes: 14 additions & 11 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods
Builder for the data system configuration.
"""

_initializers: Optional[List[Builder[Initializer]]] = None
_primary_synchronizer: Optional[Builder[Synchronizer]] = None
_secondary_synchronizer: Optional[Builder[Synchronizer]] = None
_store_mode: DataStoreMode = DataStoreMode.READ_ONLY
_data_store: Optional[FeatureStore] = None
def __init__(self) -> None:
self._initializers: Optional[List[Builder[Initializer]]] = None
self._primary_synchronizer: Optional[Builder[Synchronizer]] = None
self._secondary_synchronizer: Optional[Builder[Synchronizer]] = None
self._fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
self._store_mode: DataStoreMode = DataStoreMode.READ_ONLY
self._data_store: Optional[FeatureStore] = None

def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder":
"""
Expand Down Expand Up @@ -72,12 +74,13 @@ def build(self) -> DataSystemConfig:
initializers=self._initializers,
primary_synchronizer=self._primary_synchronizer,
secondary_synchronizer=self._secondary_synchronizer,
fdv1_fallback_synchronizer=self._fdv1_fallback_synchronizer,
data_store_mode=self._store_mode,
data_store=self._data_store,
)


def __polling_ds_builder() -> Builder[PollingDataSource]:
def polling_ds_builder() -> Builder[PollingDataSource]:
def builder(config: LDConfig) -> PollingDataSource:
requester = Urllib3PollingRequester(config)
polling_ds = PollingDataSourceBuilder(config)
Expand All @@ -88,7 +91,7 @@ def builder(config: LDConfig) -> PollingDataSource:
return builder


def __streaming_ds_builder() -> Builder[StreamingDataSource]:
def streaming_ds_builder() -> Builder[StreamingDataSource]:
def builder(config: LDConfig) -> StreamingDataSource:
return StreamingDataSourceBuilder(config).build()

Expand All @@ -109,8 +112,8 @@ def default() -> ConfigBuilder:
for updates.
"""

polling_builder = __polling_ds_builder()
streaming_builder = __streaming_ds_builder()
polling_builder = polling_ds_builder()
streaming_builder = streaming_ds_builder()

builder = ConfigBuilder()
builder.initializers([polling_builder])
Expand All @@ -126,7 +129,7 @@ def streaming() -> ConfigBuilder:
with no additional latency.
"""

streaming_builder = __streaming_ds_builder()
streaming_builder = streaming_ds_builder()

builder = ConfigBuilder()
builder.synchronizers(streaming_builder)
Expand All @@ -141,7 +144,7 @@ def polling() -> ConfigBuilder:
streaming, but may be necessary in some network environments.
"""

polling_builder: Builder[Synchronizer] = __polling_ds_builder()
polling_builder: Builder[Synchronizer] = polling_ds_builder()

builder = ConfigBuilder()
builder.synchronizers(polling_builder)
Expand Down
Loading