Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-33207: improve reading historical data for an indexed SAL component read with index=0 (meaning "all instances") #220

Merged
merged 2 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/version_history.rst
Expand Up @@ -11,6 +11,11 @@ v6.9.0
* Use the new `parse_idl_file` and `make_dds_topic_class` functions in ADLink's ``ddsutil.py``, instead of our versions.
This change requires ts-dds version 6.9 (community) or 6.10 (licensed) build 18.
* Remove deprecated support for environment variable ``LSST_DDS_DOMAIN``.
* `Remote` and `SalInfo`: improve retrieval of historical data in one special case:
reading an indexed SAL component using index=0 in the `Remote` (meaning "read data from all indices").
Formerly there would be only 1 sample of historical data: the most recent sample output with any index.
Now retrieve the most recent sample _for each index_, in the order received.


Requirements:

Expand Down
21 changes: 17 additions & 4 deletions python/lsst/ts/salobj/sal_info.py
Expand Up @@ -682,21 +682,34 @@ async def start(self) -> None:
"giving up"
) from e
self.log.debug(f"Read {len(data_list)} history items for {reader}")
sd_list = [
# All historical data for the specified index
full_sd_list = [
self._sample_to_data(sd, si)
for sd, si in data_list
if si.valid_data
]
if len(sd_list) < len(data_list):
ninvalid = len(data_list) - len(sd_list)
if len(full_sd_list) < len(data_list):
ninvalid = len(data_list) - len(full_sd_list)
self.log.warning(
f"Read {ninvalid} invalid late-joiner items from {reader}. "
"The invalid items were safely skipped, but please examine "
"the code in SalInfo.start to see if it needs an update "
"for changes to OpenSplice dds."
)
if reader.max_history > 0:
sd_list = sd_list[-reader.max_history :]
if self.index == 0 and self.indexed:
# Get the most recent sample for each index
index_field = f"{self.name}ID"
data_dict = {
getattr(data, index_field): data
for data in full_sd_list
}
sd_list: typing.Collection[
type_hints.BaseDdsDataType
] = data_dict.values()
else:
# Get the max_history most recent samples
sd_list = full_sd_list[-reader.max_history :]
if sd_list:
reader._queue_data(sd_list, loop=None)
self._read_loop_task = asyncio.create_task(self._read_loop(loop=loop))
Expand Down
24 changes: 20 additions & 4 deletions python/lsst/ts/salobj/topics/read_topic.py
Expand Up @@ -191,8 +191,15 @@ class ReadTopic(BaseTopic):
max_history : `int`
Maximum number of historical items to read:

* 0 is required for commands, events, and the ackcmd topic
* 1 is recommended for telemetry
* 0 is required for commands, events, and the ackcmd topic.
* 1 is recommended for telemetry. For an indexed component
it is possible for data from one index to push data for another
index off the DDS queue, so historical data is not guaranteed.
* For the special case of reading an indexed SAL component
with index=0 (read all indices) the only allowed values are 0 or 1.
If 1 then retrieve the most recent sample for each index
that is still in the read queue, in the order received.
max_history > 1 is forbidden, because it is difficult to implement.
queue_len : `int`, optional
The maximum number of messages that can be read and not dealt with
by a callback function or `next` before older messages will be dropped.
Expand All @@ -212,6 +219,10 @@ class ReadTopic(BaseTopic):
If queue_len < MIN_QUEUE_LEN.
ValueError
If max_history > queue_len.
ValueError
If for an indexed component if index=0 and max_history > 1.
Reading more than one historical sample per index is more trouble
than it is worth.
UserWarning
If max_history > DDS history queue depth or DDS durability service
history depth for this topic.
Expand Down Expand Up @@ -283,6 +294,11 @@ def __init__(
raise ValueError(f"max_history={max_history} must be >= 0")
if max_history > 0 and self.volatile:
raise ValueError(f"max_history={max_history} must be 0 for volatile topics")
if salinfo.indexed and salinfo.index == 0 and max_history > 1:
raise ValueError(
f"max_history={max_history} must be 0 or 1 "
"for an indexed component read with index=0"
)
if queue_len <= MIN_QUEUE_LEN:
raise ValueError(
f"queue_len={queue_len} must be >= MIN_QUEUE_LEN={MIN_QUEUE_LEN}"
Expand Down Expand Up @@ -711,14 +727,14 @@ async def _run_callback(self, data: type_hints.BaseDdsDataType) -> None:

def _queue_data(
self,
data_list: typing.Sequence[type_hints.BaseDdsDataType],
data_list: typing.Collection[type_hints.BaseDdsDataType],
loop: typing.Optional[asyncio.AbstractEventLoop],
) -> None:
"""Queue multiple one or more messages.

Parameters
----------
data_list : `list` [dds_messages]
data_list : typing.Collection[type_hints.BaseDdsDataType]
DDS messages to be queueued.
loop : `asyncio.AbstractEventLoop` or `None`
Foreground asyncio loop.
Expand Down
18 changes: 15 additions & 3 deletions tests/test_topics.py
Expand Up @@ -1248,6 +1248,18 @@ async def test_read_topic_constructor_errors_and_warnings(self) -> None:
queue_len=warn_max_history + MIN_QUEUE_LEN,
)

# max_history can only be 0 or 1 with index=0
# for an indexed component
salinfo0 = salobj.SalInfo(domain=domain, name="Test", index=0)
for bad_max_history in (-1, 2, 3, 10):
with pytest.raises(ValueError):
salobj.topics.ReadTopic(
salinfo=salinfo0,
name="scalars",
sal_prefix="logevent_",
max_history=bad_max_history,
)

async def test_asynchronous_event_callback(self) -> None:
async with self.make_csc(initial_state=salobj.State.ENABLED):
cmd_scalars_data = self.csc.make_random_cmd_scalars()
Expand Down Expand Up @@ -1381,7 +1393,7 @@ async def test_partitions(self) -> None:
assert read_codes2 == expected_codes2

async def test_sal_index(self) -> None:
"""Test separation of data using SAL index.
"""Test separation of data using SAL index, including historical data.

Readers with index=0 should see data from all writers of that topic,
regardless of index.
Expand All @@ -1397,7 +1409,7 @@ async def test_sal_index(self) -> None:
writer2 = salobj.topics.ControllerEvent(salinfo=salinfo2, name="errorCode")

# write late joiner data (before we have readers);
# only the last value should be seen
# only the last value for each index should be seen
for i in (3, 4, 5):
writer0.set_put(errorCode=i)
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -1430,7 +1442,7 @@ async def test_sal_index(self) -> None:
read_codes0 = []
read_codes1 = []
read_codes2 = []
expected_codes0 = [25, 6, 16, 26, 7, 17, 27, 8, 18, 28]
expected_codes0 = [5, 15, 25, 6, 16, 26, 7, 17, 27, 8, 18, 28]
expected_codes1 = [15, 16, 17, 18]
expected_codes2 = [25, 26, 27, 28]
try:
Expand Down