Skip to content

Commit

Permalink
Merge pull request #220 from lsst-ts/tickets/DM-33207
Browse files Browse the repository at this point in the history
DM-33207: improve reading historical data for an indexed SAL component read with index=0 (meaning "all instances")
  • Loading branch information
r-owen committed Jan 12, 2022
2 parents 5c91517 + 2b74c55 commit 2b9189b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 11 deletions.
5 changes: 5 additions & 0 deletions doc/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ v6.9.0
* Use the new `parse_idl_file` and `make_dds_topic_class` functions in ADLink's ``ddsutil.py``, instead of our versions.
This change requires ts-dds version 6.9 (community) or 6.10 (licensed) build 18.
* Remove deprecated support for environment variable ``LSST_DDS_DOMAIN``.
* `Remote` and `SalInfo`: improve retrieval of historical data in one special case:
reading an indexed SAL component using index=0 in the `Remote` (meaning "read data from all indices").
Formerly there would be only 1 sample of historical data: the most recent sample output with any index.
Now retrieve the most recent sample _for each index_, in the order received.


Requirements:

Expand Down
21 changes: 17 additions & 4 deletions python/lsst/ts/salobj/sal_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,21 +682,34 @@ async def start(self) -> None:
"giving up"
) from e
self.log.debug(f"Read {len(data_list)} history items for {reader}")
sd_list = [
# All historical data for the specified index
full_sd_list = [
self._sample_to_data(sd, si)
for sd, si in data_list
if si.valid_data
]
if len(sd_list) < len(data_list):
ninvalid = len(data_list) - len(sd_list)
if len(full_sd_list) < len(data_list):
ninvalid = len(data_list) - len(full_sd_list)
self.log.warning(
f"Read {ninvalid} invalid late-joiner items from {reader}. "
"The invalid items were safely skipped, but please examine "
"the code in SalInfo.start to see if it needs an update "
"for changes to OpenSplice dds."
)
if reader.max_history > 0:
sd_list = sd_list[-reader.max_history :]
if self.index == 0 and self.indexed:
# Get the most recent sample for each index
index_field = f"{self.name}ID"
data_dict = {
getattr(data, index_field): data
for data in full_sd_list
}
sd_list: typing.Collection[
type_hints.BaseDdsDataType
] = data_dict.values()
else:
# Get the max_history most recent samples
sd_list = full_sd_list[-reader.max_history :]
if sd_list:
reader._queue_data(sd_list, loop=None)
self._read_loop_task = asyncio.create_task(self._read_loop(loop=loop))
Expand Down
24 changes: 20 additions & 4 deletions python/lsst/ts/salobj/topics/read_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,15 @@ class ReadTopic(BaseTopic):
max_history : `int`
Maximum number of historical items to read:
* 0 is required for commands, events, and the ackcmd topic
* 1 is recommended for telemetry
* 0 is required for commands, events, and the ackcmd topic.
* 1 is recommended for telemetry. For an indexed component
it is possible for data from one index to push data for another
index off the DDS queue, so historical data is not guaranteed.
* For the special case of reading an indexed SAL component
with index=0 (read all indices) the only allowed values are 0 or 1.
If 1 then retrieve the most recent sample for each index
that is still in the read queue, in the order received.
max_history > 1 is forbidden, because it is difficult to implement.
queue_len : `int`, optional
The maximum number of messages that can be read and not dealt with
by a callback function or `next` before older messages will be dropped.
Expand All @@ -212,6 +219,10 @@ class ReadTopic(BaseTopic):
If queue_len < MIN_QUEUE_LEN.
ValueError
If max_history > queue_len.
ValueError
If for an indexed component if index=0 and max_history > 1.
Reading more than one historical sample per index is more trouble
than it is worth.
UserWarning
If max_history > DDS history queue depth or DDS durability service
history depth for this topic.
Expand Down Expand Up @@ -283,6 +294,11 @@ def __init__(
raise ValueError(f"max_history={max_history} must be >= 0")
if max_history > 0 and self.volatile:
raise ValueError(f"max_history={max_history} must be 0 for volatile topics")
if salinfo.indexed and salinfo.index == 0 and max_history > 1:
raise ValueError(
f"max_history={max_history} must be 0 or 1 "
"for an indexed component read with index=0"
)
if queue_len <= MIN_QUEUE_LEN:
raise ValueError(
f"queue_len={queue_len} must be >= MIN_QUEUE_LEN={MIN_QUEUE_LEN}"
Expand Down Expand Up @@ -711,14 +727,14 @@ async def _run_callback(self, data: type_hints.BaseDdsDataType) -> None:

def _queue_data(
self,
data_list: typing.Sequence[type_hints.BaseDdsDataType],
data_list: typing.Collection[type_hints.BaseDdsDataType],
loop: typing.Optional[asyncio.AbstractEventLoop],
) -> None:
"""Queue multiple one or more messages.
Parameters
----------
data_list : `list` [dds_messages]
data_list : typing.Collection[type_hints.BaseDdsDataType]
DDS messages to be queueued.
loop : `asyncio.AbstractEventLoop` or `None`
Foreground asyncio loop.
Expand Down
18 changes: 15 additions & 3 deletions tests/test_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,18 @@ async def test_read_topic_constructor_errors_and_warnings(self) -> None:
queue_len=warn_max_history + MIN_QUEUE_LEN,
)

# max_history can only be 0 or 1 with index=0
# for an indexed component
salinfo0 = salobj.SalInfo(domain=domain, name="Test", index=0)
for bad_max_history in (-1, 2, 3, 10):
with pytest.raises(ValueError):
salobj.topics.ReadTopic(
salinfo=salinfo0,
name="scalars",
sal_prefix="logevent_",
max_history=bad_max_history,
)

async def test_asynchronous_event_callback(self) -> None:
async with self.make_csc(initial_state=salobj.State.ENABLED):
cmd_scalars_data = self.csc.make_random_cmd_scalars()
Expand Down Expand Up @@ -1381,7 +1393,7 @@ async def test_partitions(self) -> None:
assert read_codes2 == expected_codes2

async def test_sal_index(self) -> None:
"""Test separation of data using SAL index.
"""Test separation of data using SAL index, including historical data.
Readers with index=0 should see data from all writers of that topic,
regardless of index.
Expand All @@ -1397,7 +1409,7 @@ async def test_sal_index(self) -> None:
writer2 = salobj.topics.ControllerEvent(salinfo=salinfo2, name="errorCode")

# write late joiner data (before we have readers);
# only the last value should be seen
# only the last value for each index should be seen
for i in (3, 4, 5):
writer0.set_put(errorCode=i)
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -1430,7 +1442,7 @@ async def test_sal_index(self) -> None:
read_codes0 = []
read_codes1 = []
read_codes2 = []
expected_codes0 = [25, 6, 16, 26, 7, 17, 27, 8, 18, 28]
expected_codes0 = [5, 15, 25, 6, 16, 26, 7, 17, 27, 8, 18, 28]
expected_codes1 = [15, 16, 17, 18]
expected_codes2 = [25, 26, 27, 28]
try:
Expand Down

0 comments on commit 2b9189b

Please sign in to comment.