Skip to content

Commit bf82505

Browse files
committed
chore: Provide selector for use as basis on FDv2 data sources
1 parent 9c3cd49 commit bf82505

File tree

10 files changed

+113
-75
lines changed

10 files changed

+113
-75
lines changed

ldclient/impl/datasourcev2/polling.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import urllib3
1515

1616
from ldclient.config import Config
17-
from ldclient.impl.datasystem import BasisResult, Update
17+
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
1818
from ldclient.impl.datasystem.protocolv2 import (
1919
Basis,
2020
ChangeSet,
@@ -96,13 +96,13 @@ def name(self) -> str:
9696
"""Returns the name of the initializer."""
9797
return "PollingDataSourceV2"
9898

99-
def fetch(self) -> BasisResult:
99+
def fetch(self, ss: SelectorStore) -> BasisResult:
100100
"""
101101
Fetch returns a Basis, or an error if the Basis could not be retrieved.
102102
"""
103-
return self._poll()
103+
return self._poll(ss)
104104

105-
def sync(self) -> Generator[Update, None, None]:
105+
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
106106
"""
107107
sync begins the synchronization process for the data source, yielding
108108
Update objects until the connection is closed or an unrecoverable error
@@ -111,7 +111,7 @@ def sync(self) -> Generator[Update, None, None]:
111111
log.info("Starting PollingDataSourceV2 synchronizer")
112112
self._stop.clear()
113113
while self._stop.is_set() is False:
114-
result = self._requester.fetch(None)
114+
result = self._requester.fetch(ss.selector())
115115
if isinstance(result, _Fail):
116116
if isinstance(result.exception, UnsuccessfulResponseException):
117117
error_info = DataSourceErrorInfo(
@@ -170,10 +170,9 @@ def stop(self):
170170
self._task.stop()
171171
self._stop.set()
172172

173-
def _poll(self) -> BasisResult:
173+
def _poll(self, ss: SelectorStore) -> BasisResult:
174174
try:
175-
# TODO(fdv2): Need to pass the selector through
176-
result = self._requester.fetch(None)
175+
result = self._requester.fetch(ss.selector())
177176

178177
if isinstance(result, _Fail):
179178
if isinstance(result.exception, UnsuccessfulResponseException):

ldclient/impl/datasourcev2/streaming.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from ld_eventsource.errors import HTTPStatusError
2020

2121
from ldclient.config import Config
22-
from ldclient.impl.datasystem import Synchronizer, Update
22+
from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update
2323
from ldclient.impl.datasystem.protocolv2 import (
2424
ChangeSetBuilder,
2525
DeleteObject,
@@ -54,12 +54,10 @@
5454
STREAMING_ENDPOINT = "/sdk/stream"
5555

5656

57-
SseClientBuilder = Callable[[Config], SSEClient]
57+
SseClientBuilder = Callable[[Config, SelectorStore], SSEClient]
5858

5959

60-
# TODO(sdk-1391): Pass a selector-retrieving function through so it can
61-
# re-connect with the last known status.
62-
def create_sse_client(config: Config) -> SSEClient:
60+
def create_sse_client(config: Config, ss: SelectorStore) -> SSEClient:
6361
""" "
6462
create_sse_client creates an SSEClient instance configured to connect
6563
to the LaunchDarkly streaming endpoint.
@@ -76,12 +74,17 @@ def create_sse_client(config: Config) -> SSEClient:
7674
override_read_timeout=STREAM_READ_TIMEOUT,
7775
)
7876

77+
def query_params() -> dict[str, str]:
78+
selector = ss.selector()
79+
return {"basis": selector.state} if selector.is_defined() else {}
80+
7981
return SSEClient(
8082
connect=ConnectStrategy.http(
8183
url=uri,
8284
headers=http_factory.base_headers,
8385
pool=stream_http_factory.create_pool_manager(1, uri),
8486
urllib3_request_options={"timeout": stream_http_factory.timeout},
87+
query_params=query_params
8588
),
8689
# we'll make error-handling decisions when we see a Fault
8790
error_strategy=ErrorStrategy.always_continue(),
@@ -118,13 +121,13 @@ def name(self) -> str:
118121
"""
119122
return "streaming"
120123

121-
def sync(self) -> Generator[Update, None, None]:
124+
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
122125
"""
123126
sync should begin the synchronization process for the data source, yielding
124127
Update objects until the connection is closed or an unrecoverable error
125128
occurs.
126129
"""
127-
self._sse = self._sse_client_builder(self._config)
130+
self._sse = self._sse_client_builder(self._config, ss)
128131
if self._sse is None:
129132
log.error("Failed to create SSE client for streaming updates.")
130133
return

ldclient/impl/datasystem/__init__.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
from dataclasses import dataclass
88
from enum import Enum
99
from threading import Event
10-
from typing import Generator, Optional, Protocol
10+
from typing import Callable, Generator, Optional, Protocol
1111

12-
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet
12+
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet, Selector
1313
from ldclient.impl.util import _Result
1414
from ldclient.interfaces import (
1515
DataSourceErrorInfo,
@@ -142,6 +142,21 @@ def target_availability(self) -> DataAvailability:
142142
raise NotImplementedError
143143

144144

145+
class SelectorStore(Protocol):
146+
"""
147+
SelectorStore represents a component capable of providing Selectors
148+
for data retrieval.
149+
"""
150+
151+
@abstractmethod
152+
def selector(self) -> Selector:
153+
"""
154+
get_selector should return a Selector object that defines the criteria
155+
for data retrieval.
156+
"""
157+
raise NotImplementedError
158+
159+
145160
BasisResult = _Result[Basis, str]
146161

147162

@@ -165,10 +180,12 @@ def name(self) -> str:
165180
raise NotImplementedError
166181

167182
@abstractmethod
168-
def fetch(self) -> BasisResult:
183+
def fetch(self, ss: SelectorStore) -> BasisResult:
169184
"""
170185
fetch should retrieve the initial data set for the data source, returning
171186
a Basis object on success, or an error message on failure.
187+
188+
:param ss: A SelectorStore that provides the Selector to use as a basis for data retrieval.
172189
"""
173190
raise NotImplementedError
174191

@@ -205,11 +222,13 @@ def name(self) -> str:
205222
raise NotImplementedError
206223

207224
@abstractmethod
208-
def sync(self) -> Generator[Update, None, None]:
225+
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
209226
"""
210227
sync should begin the synchronization process for the data source, yielding
211228
Update objects until the connection is closed or an unrecoverable error
212229
occurs.
230+
231+
:param ss: A SelectorStore that provides the Selector to use as a basis for data retrieval.
213232
"""
214233
raise NotImplementedError
215234

ldclient/impl/datasystem/fdv2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def _run_initializers(self, set_on_ready: Event):
299299
initializer = initializer_builder(self._config)
300300
log.info("Attempting to initialize via %s", initializer.name)
301301

302-
basis_result = initializer.fetch()
302+
basis_result = initializer.fetch(self._store)
303303

304304
if isinstance(basis_result, _Fail):
305305
log.warning("Initializer %s failed: %s", initializer.name, basis_result.error)
@@ -426,7 +426,7 @@ def _consume_synchronizer_results(
426426
:return: Tuple of (should_remove_sync, fallback_to_fdv1)
427427
"""
428428
try:
429-
for update in synchronizer.sync():
429+
for update in synchronizer.sync(self._store):
430430
log.info("Synchronizer %s update: %s", synchronizer.name, update.state)
431431
if self._stop_event.is_set():
432432
return False, False

ldclient/impl/integrations/test_datav2/test_data_sourcev2.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from queue import Empty, Queue
33
from typing import Generator
44

5-
from ldclient.impl.datasystem import BasisResult, Update
5+
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
66
from ldclient.impl.datasystem.protocolv2 import (
77
Basis,
88
ChangeSetBuilder,
@@ -16,6 +16,7 @@
1616
DataSourceErrorKind,
1717
DataSourceState
1818
)
19+
from ldclient.testing.mock_components import MockSelectorStore
1920

2021

2122
class _TestDataSourceV2:
@@ -47,7 +48,7 @@ def name(self) -> str:
4748
"""Return the name of this data source."""
4849
return "TestDataV2"
4950

50-
def fetch(self) -> BasisResult:
51+
def fetch(self, ss: SelectorStore) -> BasisResult:
5152
"""
5253
Implementation of the Initializer.fetch method.
5354
@@ -90,15 +91,15 @@ def fetch(self) -> BasisResult:
9091
except Exception as e:
9192
return _Fail(f"Error fetching test data: {str(e)}")
9293

93-
def sync(self) -> Generator[Update, None, None]:
94+
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
9495
"""
9596
Implementation of the Synchronizer.sync method.
9697
9798
Yields updates as test data changes occur.
9899
"""
99100

100101
# First yield initial data
101-
initial_result = self.fetch()
102+
initial_result = self.fetch(ss)
102103
if isinstance(initial_result, _Fail):
103104
yield Update(
104105
state=DataSourceState.OFF,
@@ -143,8 +144,8 @@ def sync(self) -> Generator[Update, None, None]:
143144
)
144145
break
145146

146-
def close(self):
147-
"""Close the data source and clean up resources."""
147+
def stop(self):
148+
"""Stop the data source and clean up resources"""
148149
with self._lock:
149150
if self._closed:
150151
return

ldclient/testing/impl/datasourcev2/test_polling_initializer.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from ldclient.impl.datasystem.protocolv2 import ChangeSetBuilder, IntentCode
1313
from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success
14+
from ldclient.testing.mock_components import MockSelectorStore
1415

1516

1617
class MockExceptionThrowingPollingRequester: # pylint: disable=too-few-public-methods
@@ -37,7 +38,7 @@ def test_error_is_returned_on_failure():
3738
mock_requester = MockPollingRequester(_Fail(error="failure message"))
3839
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
3940

40-
result = ds.fetch()
41+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
4142

4243
assert isinstance(result, _Fail)
4344
assert result.error == "failure message"
@@ -50,7 +51,7 @@ def test_error_is_recoverable():
5051
)
5152
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
5253

53-
result = ds.fetch()
54+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
5455

5556
assert isinstance(result, _Fail)
5657
assert result.error is not None
@@ -64,7 +65,7 @@ def test_error_is_unrecoverable():
6465
)
6566
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
6667

67-
result = ds.fetch()
68+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
6869

6970
assert isinstance(result, _Fail)
7071
assert result.error is not None
@@ -78,7 +79,7 @@ def test_handles_transfer_none():
7879
)
7980
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
8081

81-
result = ds.fetch()
82+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
8283

8384
assert isinstance(result, _Success)
8485
assert result.value is not None
@@ -92,7 +93,7 @@ def test_handles_uncaught_exception():
9293
mock_requester = MockExceptionThrowingPollingRequester()
9394
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
9495

95-
result = ds.fetch()
96+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
9697

9798
assert isinstance(result, _Fail)
9899
assert result.error is not None
@@ -111,7 +112,7 @@ def test_handles_transfer_full():
111112
mock_requester = MockPollingRequester(_Success(value=(change_set_result.value, {})))
112113
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
113114

114-
result = ds.fetch()
115+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
115116

116117
assert isinstance(result, _Success)
117118
assert result.value is not None
@@ -129,7 +130,7 @@ def test_handles_transfer_changes():
129130
mock_requester = MockPollingRequester(_Success(value=(change_set_result.value, {})))
130131
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
131132

132-
result = ds.fetch()
133+
result = ds.fetch(MockSelectorStore(Selector.no_selector()))
133134

134135
assert isinstance(result, _Success)
135136
assert result.value is not None

ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
)
2323
from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success
2424
from ldclient.interfaces import DataSourceErrorKind, DataSourceState
25+
from ldclient.testing.mock_components import MockSelectorStore
2526

2627

2728
class ListBasedRequester:
@@ -103,7 +104,7 @@ def test_handles_no_changes():
103104
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
104105
)
105106

106-
valid = next(synchronizer.sync())
107+
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
107108

108109
assert valid.state == DataSourceState.VALID
109110
assert valid.error is None
@@ -124,7 +125,7 @@ def test_handles_empty_changeset():
124125
synchronizer = PollingDataSource(
125126
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
126127
)
127-
valid = next(synchronizer.sync())
128+
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
128129

129130
assert valid.state == DataSourceState.VALID
130131
assert valid.error is None
@@ -152,7 +153,7 @@ def test_handles_put_objects():
152153
synchronizer = PollingDataSource(
153154
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
154155
)
155-
valid = next(synchronizer.sync())
156+
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
156157

157158
assert valid.state == DataSourceState.VALID
158159
assert valid.error is None
@@ -183,7 +184,7 @@ def test_handles_delete_objects():
183184
synchronizer = PollingDataSource(
184185
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
185186
)
186-
valid = next(synchronizer.sync())
187+
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
187188

188189
assert valid.state == DataSourceState.VALID
189190
assert valid.error is None
@@ -216,7 +217,7 @@ def test_generic_error_interrupts_and_recovers():
216217
results=iter([_Fail(error="error for test"), polling_result])
217218
),
218219
)
219-
sync = synchronizer.sync()
220+
sync = synchronizer.sync(MockSelectorStore(Selector.no_selector()))
220221
interrupted = next(sync)
221222
valid = next(sync)
222223

@@ -250,7 +251,7 @@ def test_recoverable_error_continues():
250251
poll_interval=0.01,
251252
requester=ListBasedRequester(results=iter([_failure, polling_result])),
252253
)
253-
sync = synchronizer.sync()
254+
sync = synchronizer.sync(MockSelectorStore(Selector.no_selector()))
254255
interrupted = next(sync)
255256
valid = next(sync)
256257

@@ -288,7 +289,7 @@ def test_unrecoverable_error_shuts_down():
288289
poll_interval=0.01,
289290
requester=ListBasedRequester(results=iter([_failure, polling_result])),
290291
)
291-
sync = synchronizer.sync()
292+
sync = synchronizer.sync(MockSelectorStore(Selector.no_selector()))
292293
off = next(sync)
293294
assert off.state == DataSourceState.OFF
294295
assert off.error is not None

0 commit comments

Comments
 (0)