From c16e84d7336b40b8ae459d0d24c8d8fccd7b6d8f Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 28 May 2026 13:43:31 +0100 Subject: [PATCH 01/14] Tidy up pub/sub handling This refactors handling of property and action observations. In particular, it: * Introduces a MessageBroker class to handle pub/sub messaging centrally. This eliminates duplicated code from descriptors and should be much clearer. * Adds a `publish` method to the thing server interface for easy publication of events. * No longer errors if events are published before the event loop is active: they are silently ignored. * Removes the option to set properties without emitting an event: this is no longer needed - it was only ever done to suppress errors. * Separates handling of pub/sub messages from websocket protocol considerations. This does not change the websocket protocol. I've updated tests, but have not yet added tests for `MessageBroker`. --- src/labthings_fastapi/actions.py | 86 +++-------- src/labthings_fastapi/exceptions.py | 2 +- src/labthings_fastapi/message_broker.py | 105 +++++++++++++ src/labthings_fastapi/properties.py | 139 ++++-------------- src/labthings_fastapi/server/__init__.py | 2 + src/labthings_fastapi/thing.py | 40 +---- .../thing_server_interface.py | 17 +++ src/labthings_fastapi/websockets.py | 65 ++++++-- tests/test_websocket.py | 39 +---- 9 files changed, 236 insertions(+), 259 deletions(-) create mode 100644 src/labthings_fastapi/message_broker.py diff --git a/src/labthings_fastapi/actions.py b/src/labthings_fastapi/actions.py index 1ee5d35e..06c1e8eb 100644 --- a/src/labthings_fastapi/actions.py +++ b/src/labthings_fastapi/actions.py @@ -37,11 +37,13 @@ TypeVar, overload, ) -from weakref import WeakSet import weakref from fastapi import APIRouter, FastAPI, HTTPException, Response, Body, BackgroundTasks from pydantic import BaseModel, create_model +from labthings_fastapi.message_broker import Message + + from .middleware.url_for import URLFor from .base_descriptor import ( BaseDescriptor, @@ -74,7 +76,6 @@ ) from .thing_description import type_to_dataschema from .thing_description._model import ActionAffordance, ActionOp, Form, LinkElement -from .utilities import labthings_data if TYPE_CHECKING: @@ -270,6 +271,20 @@ def response(self) -> InvocationModel: log=self.log, ) + def _publish_status(self) -> None: + """Publish a status change event to any observers. + + This should be called after each change to ``self._status`` + """ + self.thing._thing_server_interface.publish( + Message( + thing=self.thing.name, + affordance=self.action.name, # type: ignore[attr-defined] + message_type="action", + payload=self._status.value, + ) + ) + def run(self) -> None: """Run the action and track progress. @@ -305,7 +320,7 @@ def run(self) -> None: add_thing_log_destination(self.id, self._log) with invocation_contexts.set_invocation_id(self.id): try: - action.emit_changed_event(self.thing, self._status.value) + self._publish_status() thing = self.thing kwargs = model_to_dict(self.input) @@ -321,7 +336,7 @@ def run(self) -> None: with self._status_lock: self._status = InvocationStatus.RUNNING self._start_time = datetime.datetime.now() - action.emit_changed_event(self.thing, self._status.value) + self._publish_status() # Actually run the action ret = action.func(thing, **kwargs, **self.dependencies) @@ -336,12 +351,12 @@ def run(self) -> None: with self._status_lock: self._output_model_instance = output_model_instance self._status = InvocationStatus.COMPLETED - action.emit_changed_event(self.thing, self._status.value) + self._publish_status() except InvocationCancelledError: logger.info(f"Invocation {self.id} was cancelled.") with self._status_lock: self._status = InvocationStatus.CANCELLED - action.emit_changed_event(self.thing, self._status.value) + self._publish_status() except Exception as e: # skipcq: PYL-W0703 # First log if isinstance(e, (InvocationError, InvalidReturnValueError)): @@ -361,7 +376,7 @@ def run(self) -> None: with self._status_lock: self._status = InvocationStatus.ERROR self._exception = e - action.emit_changed_event(self.thing, self._status.value) + self._publish_status() finally: with self._status_lock: self._end_time = datetime.datetime.now() @@ -868,63 +883,6 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: # noqa: DOC101, DOC103, DOC201 return partial(wrapped, obj) - def _observers_set(self, obj: Thing) -> WeakSet: - """Return a set used to notify changes. - - Note that we need to supply the `~lt.Thing` we are looking at, as in - general there may be more than one object of the same type, and - descriptor instances are shared between all instances of their class. - - :param obj: The `~lt.Thing` on which the action is being observed. - - :return: a weak set of callables to notify on changes to the action. - This is used by websocket endpoints. - """ - ld = labthings_data(obj) - if self.name not in ld.action_observers: - ld.action_observers[self.name] = WeakSet() - return ld.action_observers[self.name] - - def emit_changed_event(self, obj: Thing, status: str) -> None: - """Notify subscribers that the action status has changed. - - This function is run from within the `.Invocation` thread that - is created when an action is called. It must be run from a thread - as it is communicating with the event loop via an `asyncio` blocking - portal. Async code must not use the blocking portal as it can deadlock - the event loop. - - :param obj: The `~lt.Thing` on which the action is being observed. - :param status: The status of the action, to be sent to observers. - """ - obj._thing_server_interface.start_async_task_soon( - self.emit_changed_event_async, - obj, - status, - ) - - async def emit_changed_event_async(self, obj: Thing, value: Any) -> None: - """Notify subscribers that the action status has changed. - - This is an async function that must be run in the `anyio` event loop. - It will send messages to each observer to notify them that something - has changed. - - :param obj: The `~lt.Thing` on which the action is defined. - `.ActionDescriptor` objects are unique to the class, but there may - be more than one `~lt.Thing` attached to a server with the same class. - We use ``obj`` to look up the observers of the current `~lt.Thing`. - :param value: The action status to communicate to the observers. - """ - action_name = self.name - for observer in self._observers_set(obj): - await observer.send( - { - "messageType": "actionStatus", - "data": {"action name": action_name, "status": value}, - } - ) - def add_to_fastapi(self, app: FastAPI, thing: Thing) -> None: """Add this action to a FastAPI app, bound to a particular Thing. diff --git a/src/labthings_fastapi/exceptions.py b/src/labthings_fastapi/exceptions.py index e9e9e33c..7acc0e2b 100644 --- a/src/labthings_fastapi/exceptions.py +++ b/src/labthings_fastapi/exceptions.py @@ -43,7 +43,7 @@ class ReadOnlyPropertyError(AttributeError): class PropertyNotObservableError(RuntimeError): """The property is not observable. - This exception is raised when `~lt.Thing.observe_property` is called with a + This exception is raised when trying to observe property that is not observable. Currently, only data properties are observable: functional properties (using a getter/setter) may not be observed. diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py new file mode 100644 index 00000000..0ac36365 --- /dev/null +++ b/src/labthings_fastapi/message_broker.py @@ -0,0 +1,105 @@ +"""Handle pub-sub style events. + +Both properties and actions can emit events that may be observed. This module handles +all the pub-sub messaging in LabThings. +""" + +from dataclasses import dataclass +from typing import Any, Literal +from weakref import WeakSet + +from anyio.abc import ObjectSendStream + + +@dataclass +class Message: + """A pub-sub event message. + + This is the message that is sent when a property or action generates + an event. + + :param thing: The name of the Thing generating the event. + :param affordance: The name of the affordance generating the event. + :param message: The message to send. + """ + + thing: str + affordance: str + message_type: Literal["property", "action", "event"] + payload: Any + + +class MessageBroker: + r"""A class that relays pub/sub messages. + + This class takes care of relaying messages to streams that have subscribed to them. + It does not format messages or handle any details of e.g. websocket protocol. + + Subscriptions require an `ObjectSendStream[Message]` and each time a `Message` + matching the subscription parameters (``thing`` and ``affordance``) is published, + it will be sent on that stream. + + The broker does not validate thing or affordance names: that's up to the code + calling `MessageBroker.subscribe`\ . + """ + + def __init__(self) -> None: + """Initialise the message broker.""" + # Note that we use a weak set below, so that when a websocket disconnects, + # its stream is removed automatically. + self._subscriptions: dict[ + str, dict[str, WeakSet[ObjectSendStream[Message]]] + ] = {} + + def subscribe( + self, thing: str, affordance: str, stream: ObjectSendStream[Message] + ) -> None: + """Subscribe to messages from a particular affordance. + + Note that this method is not async - it just registers the stream and so + can be run from any thread. + + :param thing: The name of the `.Thing` being subscribed to. + :param affordance: The name of the affordance being subscribed to. + :param stream: A stream to send the messages to. + :raises TypeError: if the `thing` argument is not a string. + """ + if not isinstance(thing, str): + raise TypeError(f"The `thing` argument should be a string, not {thing}.") + if thing not in self._subscriptions: + self._subscriptions[thing] = {} + if affordance not in self._subscriptions[thing]: + self._subscriptions[thing][affordance] = WeakSet() + self._subscriptions[thing][affordance].add(stream) + + def unsubscribe( + self, thing: str, affordance: str, stream: ObjectSendStream[Message] + ) -> None: + """Unsubscribe a stream from messages from a particular affordance. + + :param thing: The name of the `.Thing` being unsubscribed from. + :param affordance: The name of the affordance being unsubscribed from. + :param stream: The stream to unsubscribe. + :raises KeyError: if there is no such subscription. + :raises TypeError: if the `thing` argument is not a string. + """ + if not isinstance(thing, str): + raise TypeError(f"The `thing` argument should be a string, not {thing}.") + try: + self._subscriptions[thing][affordance].discard(stream) + except KeyError as e: + raise e + + async def publish(self, message: Message) -> None: + """Publish a message. + + This async method will relay the message to any subscriber streams. + + :param message: the message to send. + """ + try: + subscriptions = self._subscriptions[message.thing][message.affordance] + except KeyError: + return # No subscribers for this thing. + for stream in subscriptions: + await stream.send(message) diff --git a/src/labthings_fastapi/properties.py b/src/labthings_fastapi/properties.py index f75e59d7..54f81854 100644 --- a/src/labthings_fastapi/properties.py +++ b/src/labthings_fastapi/properties.py @@ -60,7 +60,6 @@ class attribute. Documentation is in strings immediately following the TYPE_CHECKING, ) from typing_extensions import Self, TypedDict -from weakref import WeakSet from fastapi import Body, FastAPI, Response, HTTPException from pydantic import ( @@ -73,6 +72,8 @@ class attribute. Documentation is in strings immediately following the with_config, ) +from labthings_fastapi.message_broker import Message + from .thing_description import type_to_dataschema from .thing_description._model import ( DataSchema, @@ -82,7 +83,6 @@ class attribute. Documentation is in strings immediately following the ) from .utilities import ( RootModelWrapper, - labthings_data, serialise_from_user_code, validate_from_user_code, ) @@ -405,6 +405,14 @@ def __init__( except UnsupportedConstraintError: raise + observable: bool = False + """Whether or not the property may be observed. + + If `observable` is `True` then a websocket connection can register to be notified + when the property changes. By default this is `True` for data properties and + `False` for functional properties. + """ + @staticmethod def _validate_constraints(constraints: Mapping[str, Any]) -> FieldConstraints: """Validate an untyped dictionary of constraints. @@ -781,9 +789,7 @@ def instance_get(self, obj: Owner) -> Value: obj.__dict__[self.name] = self._default_factory() return obj.__dict__[self.name] - def __set__( - self, obj: Owner, value: Value, emit_changed_event: bool = True - ) -> None: + def __set__(self, obj: Owner, value: Value) -> None: """Set the property's value. This sets the property's value, and notifies any observers. @@ -794,7 +800,6 @@ def __set__( :param obj: the `~lt.Thing` to which we are attached. :param value: the new value for the property. - :param emit_changed_event: whether to emit a changed event. """ with obj._thing_server_interface._optionally_hold_global_lock( self.use_global_lock @@ -804,8 +809,16 @@ def __set__( obj.__dict__[self.name] = property_info.validate(value) else: obj.__dict__[self.name] = value - if emit_changed_event: - self.emit_changed_event(obj, value) + obj._thing_server_interface.publish( + Message(obj.name, self.name, "property", value) + ) + + observable: bool = True + """Whether or not the property may be observed. + + If `observable` is `True` then a websocket connection can register to be notified + when the property changes. By default this is `True` for data properties. + """ def get_default(self, obj: Owner | None) -> Value: """Return the default value of this property. @@ -828,56 +841,6 @@ def reset(self, obj: Owner) -> None: """ self.__set__(obj, self.get_default(obj)) - def _observers_set(self, obj: Thing) -> WeakSet: - """Return the observers of this property. - - Each observer in this set will be notified when the property is changed. - See ``.DataProperty.emit_changed_event`` - - :param obj: the `~lt.Thing` to which we are attached. - - :return: the set of observers corresponding to ``obj``. - """ - ld = labthings_data(obj) - if self.name not in ld.property_observers: - ld.property_observers[self.name] = WeakSet() - return ld.property_observers[self.name] - - def emit_changed_event(self, obj: Thing, value: Value) -> None: - """Notify subscribers that the property has changed. - - This function is run when properties are updated. It must be run from - within a thread. This could be the `Invocation` thread of a running action, or - the property should be updated over via a client/http. It must be run from a - thread as it is communicating with the event loop via an `asyncio` blocking - portal and can cause deadlock if run in the event loop. - - This method will raise a `.ServerNotRunningError` if the event loop is not - running, and should only be called after the server has started. - - :param obj: the `~lt.Thing` to which we are attached. - :param value: the new property value, to be sent to observers. - """ - obj._thing_server_interface.start_async_task_soon( - self.emit_changed_event_async, - obj, - value, - ) - - async def emit_changed_event_async(self, obj: Thing, value: Value) -> None: - """Notify subscribers that the property has changed. - - This function may only be run in the `anyio` event loop. See - `.DataProperty.emit_changed_event`. - - :param obj: the `~lt.Thing` to which we are attached. - :param value: the new property value, to be sent to observers. - """ - for observer in self._observers_set(obj): - await observer.send( - {"messageType": "propertyStatus", "data": {self._name: value}} - ) - class FunctionalProperty(BaseProperty[Owner, Value], Generic[Owner, Value]): """A property that uses a getter and a setter. @@ -1233,6 +1196,11 @@ def default(self) -> Value: # noqa: DOC201 """ return self.get_descriptor().get_default(self.owning_object) + @builtins.property + def is_observable(self) -> bool: # noqa: DOC201 + """Whether the property may be observed.""" + return self.get_descriptor().observable + @builtins.property def is_resettable(self) -> bool: # noqa: DOC201 """Whether the property may be reset using the ``reset()`` method.""" @@ -1431,20 +1399,6 @@ class BaseSetting(BaseProperty[Owner, Value], Generic[Owner, Value]): two concrete implementations: `.DataSetting` and `.FunctionalSetting`\ . """ - def set_without_emit(self, obj: Owner, value: Value) -> None: - """Set the setting's value without emitting an event. - - This is used to set the setting's value without notifying observers. - It is used during initialisation to set the value from disk before - the server is fully started. - - :param obj: the `~lt.Thing` to which we are attached. - :param value: the new value of the setting. - - :raises NotImplementedError: this method should be implemented in subclasses. - """ - raise NotImplementedError("This method should be implemented in subclasses.") - def descriptor_info(self, owner: Owner | None = None) -> SettingInfo[Owner, Value]: r"""Return an object that allows access to this descriptor's metadata. @@ -1474,32 +1428,17 @@ class DataSetting( The setting otherwise acts just like a normal variable. """ - def __set__( - self, obj: Owner, value: Value, emit_changed_event: bool = True - ) -> None: + def __set__(self, obj: Owner, value: Value) -> None: """Set the setting's value. This will cause the settings to be saved to disk. :param obj: the `~lt.Thing` to which we are attached. :param value: the new value of the setting. - :param emit_changed_event: whether to emit a changed event. """ - super().__set__(obj, value, emit_changed_event) + super().__set__(obj, value) obj.save_settings() - def set_without_emit(self, obj: Owner, value: Value) -> None: - """Set the property's value, but do not emit event to notify the server. - - This function is not expected to be used externally. It is called during - initial setup so that the setting can be set from disk before the server - is fully started. - - :param obj: the `~lt.Thing` to which we are attached. - :param value: the new value of the setting. - """ - super().__set__(obj, value, emit_changed_event=False) - class FunctionalSetting( FunctionalProperty[Owner, Value], BaseSetting[Owner, Value], Generic[Owner, Value] @@ -1532,34 +1471,12 @@ def __set__(self, obj: Owner, value: Value) -> None: super().__set__(obj, value) obj.save_settings() - def set_without_emit(self, obj: Owner, value: Value) -> None: - """Set the property's value, but do not emit event to notify the server. - - This function is not expected to be used externally. It is called during - initial setup so that the setting can be set from disk before the server - is fully started. - - :param obj: the `~lt.Thing` to which we are attached. - :param value: the new value of the setting. - """ - # FunctionalProperty does not emit changed events, so no special - # behaviour is needed. - super().__set__(obj, value) - class SettingInfo( PropertyInfo[BaseSetting[Owner, Value], Owner, Value], Generic[Owner, Value] ): """Access to the metadata of a setting.""" - def set_without_emit(self, value: Value) -> None: - """Set the value of the setting, but don't emit a notification. - - :param value: the new value for the setting. - """ - obj = self.owning_object_or_error() - self.get_descriptor().set_without_emit(obj, value) - class SettingCollection(DescriptorInfoCollection[Owner, SettingInfo], Generic[Owner]): """Access to metadata on all the properties of a `~lt.Thing` instance or subclass. diff --git a/src/labthings_fastapi/server/__init__.py b/src/labthings_fastapi/server/__init__.py index 8ea2a44a..d45e3fe2 100644 --- a/src/labthings_fastapi/server/__init__.py +++ b/src/labthings_fastapi/server/__init__.py @@ -25,6 +25,7 @@ import uvicorn from labthings_fastapi.exceptions import GlobalLockBusyError +from labthings_fastapi.message_broker import MessageBroker from ..middleware.url_for import url_for_middleware from ..thing_slots import ThingSlot @@ -150,6 +151,7 @@ def __init__( self._set_url_for_middleware() self._add_exception_handlers() self.action_manager = ActionManager() + self.message_broker = MessageBroker() self.app.include_router(self.action_manager.router(), prefix=self.api_prefix) self.app.include_router(blob.router, prefix=self.api_prefix) self.app.include_router(self._things_view_router(), prefix=self.api_prefix) diff --git a/src/labthings_fastapi/thing.py b/src/labthings_fastapi/thing.py index a15c3831..fb945505 100644 --- a/src/labthings_fastapi/thing.py +++ b/src/labthings_fastapi/thing.py @@ -16,24 +16,20 @@ from json.decoder import JSONDecodeError from fastapi.encoders import jsonable_encoder from fastapi import Request, WebSocket -from anyio.abc import ObjectSendStream from anyio.to_thread import run_sync from .logs import THING_LOGGER from .properties import ( - BaseProperty, - DataProperty, PropertyCollection, SettingCollection, ) -from .actions import ActionCollection, ActionDescriptor +from .actions import ActionCollection from .base_descriptor import OptionallyBoundDescriptor from .thing_description._model import ThingDescription, NoSecurityScheme from .utilities import class_attributes from .thing_description import validation from .utilities.introspection import get_summary, get_docstring from .websockets import websocket_endpoint -from .exceptions import PropertyNotObservableError from .thing_server_interface import ThingServerInterface from .invocation_contexts import get_invocation_id from .thing_class_settings import ThingClassSettings, validate_thing_class_settings @@ -210,7 +206,7 @@ def thing_description(request: Request) -> ThingDescription: @server.app.websocket(self.path + "ws") async def websocket(ws: WebSocket) -> None: - await websocket_endpoint(self, ws) + await websocket_endpoint(self, ws, server.message_broker) def _read_settings_file(self) -> Mapping[str, Any] | None: """Read the settings file and return a mapping of saved settings or None. @@ -281,7 +277,7 @@ def load_settings(self) -> None: try: setting = self.settings[name] # Load the key from the JSON file using the setting's model - setting.set_without_emit(setting.validate(value)) + setting.set(setting.validate(value)) except ValidationError: self.logger.warning( f"Could not load setting {name} from settings file " @@ -430,36 +426,6 @@ def thing_description_dict( td_dict: dict = td.model_dump(exclude_none=True, by_alias=True) return jsonable_encoder(td_dict) - def observe_property(self, property_name: str, stream: ObjectSendStream) -> None: - """Register a stream to receive property change notifications. - - :param property_name: the property to register for. - :param stream: the stream used to send events. - - :raise KeyError: if the requested name is not defined on this Thing. - :raise PropertyNotObservableError: if the property is not observable. - """ - prop = getattr(self.__class__, property_name, None) - if not isinstance(prop, BaseProperty): - raise KeyError(f"{property_name} is not a LabThings Property") - if not isinstance(prop, DataProperty): - raise PropertyNotObservableError(f"{property_name} is not observable.") - prop._observers_set(self).add(stream) - - def observe_action(self, action_name: str, stream: ObjectSendStream) -> None: - """Register a stream to receive action status change notifications. - - :param action_name: the action to register for. - :param stream: the stream used to send events. - - :raise KeyError: if the requested name is not defined on this Thing. - """ - action = getattr(self.__class__, action_name, None) - if not isinstance(action, ActionDescriptor): - raise KeyError(f"{action_name} is not an LabThings Action") - observers = action._observers_set(self) - observers.add(stream) - def get_current_invocation_logs(self) -> list[logging.LogRecord]: """Get the log records for an on going action. diff --git a/src/labthings_fastapi/thing_server_interface.py b/src/labthings_fastapi/thing_server_interface.py index c55e9394..1d36a340 100644 --- a/src/labthings_fastapi/thing_server_interface.py +++ b/src/labthings_fastapi/thing_server_interface.py @@ -18,6 +18,7 @@ from weakref import ref, ReferenceType from labthings_fastapi.global_lock import GlobalLock +from labthings_fastapi.message_broker import Message from .exceptions import FeatureNotEnabledError, ServerNotRunningError @@ -139,6 +140,22 @@ def call_async_task( raise ServerNotRunningError("Can't run async code without an event loop.") return portal.call(async_function, *args) + def publish(self, message: Message) -> None: + """Publish an event. + + Use the async event loop to notify subscribers that something has + happened. The message should contain the name of the `~lt.Thing` and affordance. + + Note that this function will do nothing if the event loop is not yet running. + + :param message: the message being published. + """ + try: + broker = self._get_server().message_broker + self.start_async_task_soon(broker.publish, message) + except ServerNotRunningError: + pass # If the server isn't running yet, we can't publish events. + @property def settings_folder(self) -> str: """The path to a folder where persistent files may be saved.""" diff --git a/src/labthings_fastapi/websockets.py b/src/labthings_fastapi/websockets.py index 2d3136f2..9d5c00df 100644 --- a/src/labthings_fastapi/websockets.py +++ b/src/labthings_fastapi/websockets.py @@ -25,11 +25,14 @@ from fastapi import WebSocket, WebSocketDisconnect from fastapi.encoders import jsonable_encoder from typing import TYPE_CHECKING, Literal + +from labthings_fastapi.message_broker import Message, MessageBroker from .exceptions import PropertyNotObservableError if TYPE_CHECKING: from .thing import Thing +LOGGER = logging.getLogger(__file__) WEBTHING_ERROR_URL = "https://w3c.github.io/web-thing-protocol/errors" @@ -76,7 +79,7 @@ def observation_error_response( async def relay_notifications_to_websocket( - websocket: WebSocket, receive_stream: ObjectReceiveStream + websocket: WebSocket, receive_stream: ObjectReceiveStream[Message] ) -> None: """Relay objects from a stream to a websocket as JSON. @@ -90,11 +93,45 @@ async def relay_notifications_to_websocket( """ async with receive_stream: async for item in receive_stream: - await websocket.send_json(jsonable_encoder(item)) + if item.message_type == "action": + msg = { + "messageType": "actionStatus", + "data": {"action name": item.affordance, "status": item.payload}, + } + elif item.message_type == "property": + msg = { + "messageType": "propertyStatus", + "data": {item.affordance: jsonable_encoder(item.payload)}, + } + else: + LOGGER.error(f"Could not relay '{item}' to websocket - bad type.") + await websocket.send_json(msg) + + +def assert_property_is_observable(thing: Thing, property: str) -> bool: + """Check that a Thing has a particular property and it is observable. + + :param thing: the `~lt.Thing` instance being observed. + :param property: the name of the property. + :raises KeyError: if the property does not exist. + :raises PropertyNotObservableError: if the property isn't observable. + :returns: `True` if an exception wasn't raised. + """ + try: + prop = thing.properties[property] # raises KeyError if it doesn't exist + except KeyError: + raise + if not prop.is_observable: + msg = f"'{thing.name}.{property}' is not observable." + raise PropertyNotObservableError(msg) + return True async def process_messages_from_websocket( - websocket: WebSocket, send_stream: ObjectSendStream, thing: Thing + websocket: WebSocket, + send_stream: ObjectSendStream[Message], + broker: MessageBroker, + thing: Thing, ) -> None: r"""Process messages received from a websocket. @@ -105,6 +142,7 @@ async def process_messages_from_websocket( :param send_stream: an `anyio.abc.ObjectSendStream` that we use to register for events, i.e. data sent to that stream will be sent through this websocket, by `.relay_notifications_to_websocket`\ . + :param broker: the message broker to use for subscriptions. :param thing: the `~lt.Thing` we are attached to. The websocket is specific to one `~lt.Thing`, and this is it. """ @@ -117,20 +155,24 @@ async def process_messages_from_websocket( if data["messageType"] == "addPropertyObservation": try: for k in data["data"].keys(): - thing.observe_property(k, send_stream) + assert_property_is_observable(thing, k) + broker.subscribe(thing.name, k, send_stream) except (KeyError, PropertyNotObservableError) as e: logging.error(f"Got a bad websocket message: {data}, caused {e!r}.") - await send_stream.send(observation_error_response(k, "property", e)) + await websocket.send_json(observation_error_response(k, "property", e)) if data["messageType"] == "addActionObservation": try: for k in data["data"].keys(): - thing.observe_action(k, send_stream) + _ = thing.actions[k] # raise a KeyError if the action doesn't exist + broker.subscribe(thing.name, k, send_stream) except KeyError as e: logging.error(f"Got a bad websocket message: {data}, caused {e!r}.") - await send_stream.send(observation_error_response(k, "action", e)) + await websocket.send_json(observation_error_response(k, "action", e)) -async def websocket_endpoint(thing: Thing, websocket: WebSocket) -> None: +async def websocket_endpoint( + thing: Thing, websocket: WebSocket, broker: MessageBroker +) -> None: r"""Handle communication to a client via websocket. This function handles a websocket connection to a `~lt.Thing`\ 's websocket @@ -139,9 +181,12 @@ async def websocket_endpoint(thing: Thing, websocket: WebSocket) -> None: :param thing: the `~lt.Thing` the websocket is attached to. :param websocket: the web socket that has been created. + :param broker: the message broker to use for subscriptions. """ await websocket.accept() - send_stream, receive_stream = create_memory_object_stream[dict]() + send_stream, receive_stream = create_memory_object_stream[Message]() async with create_task_group() as tg: tg.start_soon(relay_notifications_to_websocket, websocket, receive_stream) - tg.start_soon(process_messages_from_websocket, websocket, send_stream, thing) + tg.start_soon( + process_messages_from_websocket, websocket, send_stream, broker, thing + ) diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 7446a079..9eff2054 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -5,6 +5,7 @@ InvocationCancelledError, ) from labthings_fastapi.testing import create_thing_without_server +from labthings_fastapi.websockets import assert_property_is_observable class ThingWithProperties(lt.Thing): @@ -85,19 +86,6 @@ def thing(): return create_thing_without_server(ThingWithProperties) -def test_observing_dataprop(thing, mocker): - """Check `observe_property` is OK on a data property. - - This checks that something is added to the set of observers. - We don't check for events, as there's no event loop: this is - tested in `test_observing_dataprop_with_ws` below. - """ - observers_set = ThingWithProperties.dataprop._observers_set(thing) - fake_observer = mocker.Mock() - thing.observe_property("dataprop", fake_observer) - assert fake_observer in observers_set - - @pytest.mark.parametrize( argnames=["name", "exception"], argvalues=[ @@ -109,10 +97,10 @@ def test_observing_dataprop(thing, mocker): ("missing", KeyError), ], ) -def test_observing_errors(thing, mocker, name, exception): +def test_observing_errors(thing, name, exception): """Check errors are raised if we observe an unsuitable property.""" with pytest.raises(exception): - thing.observe_property(name, mocker.Mock()) + assert assert_property_is_observable(thing, name) def test_observing_dataprop_with_ws(client, ws): @@ -171,27 +159,6 @@ def test_observing_dataprop_error_with_ws(ws, name, title, status): assert message["error"]["status"] == status -def test_observing_action(thing, mocker): - """Check observing an action is successful. - - This verifies we've added an observer to the set, but doesn't test for - notifications: that would require an event loop. - """ - observers_set = ThingWithProperties.increment_dataprop._observers_set(thing) - fake_observer = mocker.Mock() - thing.observe_action("increment_dataprop", fake_observer) - assert fake_observer in observers_set - - -@pytest.mark.parametrize( - "name", ["non_property", "python_property", "undecorated", "dataprop"] -) -def test_observing_action_error(thing, mocker, name): - """Check observing an attribute that's not an action raises an error.""" - with pytest.raises(KeyError): - thing.observe_action(name, mocker.Mock()) - - @pytest.mark.parametrize( argnames=["name", "final_status"], argvalues=[ From dfe32c581877a9c887b00ae17c14e18c18c5327e Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 28 May 2026 23:38:46 +0100 Subject: [PATCH 02/14] Add tests and get tests passing This adds tests for `Message` and `MessageBroker`. As a result of testing, I've tightened up a few things: * `MessageBroker` now has a method to close all its send streams. This should help achieve a clean shutdown, and stops tests hanging. * `Message` is now a `pydantic` dataclass so it validates types. We may drop this back to a regular dataclass in the future for performance reasons. * mock_thing_instance now mocks a sensible `name` property so properties can work properly. --- src/labthings_fastapi/message_broker.py | 19 ++- src/labthings_fastapi/testing.py | 10 +- tests/test_message_broker.py | 151 ++++++++++++++++++++++++ tests/test_properties.py | 13 -- 4 files changed, 178 insertions(+), 15 deletions(-) create mode 100644 tests/test_message_broker.py diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index 0ac36365..46f427ae 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -4,7 +4,8 @@ all the pub-sub messaging in LabThings. """ -from dataclasses import dataclass +import anyio +from pydantic.dataclasses import dataclass from typing import Any, Literal from weakref import WeakSet @@ -18,6 +19,9 @@ class Message: This is the message that is sent when a property or action generates an event. + This is a pydantic dataclass, so we validate the message. This might + change in the future for performance reasons. + :param thing: The name of the Thing generating the event. :param affordance: The name of the affordance generating the event. :param message: The message to send. @@ -103,3 +107,16 @@ async def publish(self, message: Message) -> None: return # No subscribers for this thing. for stream in subscriptions: await stream.send(message) + + async def close_streams(self) -> None: + """Close all streams that are subscribed to receive messages. + + This should be called when the server shuts down. + """ + # We use a task group so we shut down all streams concurrently, rather + # than waiting for each one to close. + async with anyio.create_task_group() as tg: + for thing_subs in self._subscriptions.values(): + for subs in thing_subs.values(): + for stream in subs: + tg.start_soon(stream.aclose) diff --git a/src/labthings_fastapi/testing.py b/src/labthings_fastapi/testing.py index 6194e06e..83c3de23 100644 --- a/src/labthings_fastapi/testing.py +++ b/src/labthings_fastapi/testing.py @@ -18,6 +18,7 @@ from unittest.mock import Mock from labthings_fastapi.global_lock import GlobalLock +from labthings_fastapi.message_broker import Message from .utilities import class_attributes from .thing_slots import ThingSlot @@ -105,6 +106,12 @@ def start_async_task_soon( f.cancel() return f + def publish(self, message: Message) -> None: + """Silently ignore published events. + + :param message: a message to publish. + """ + @property def settings_folder(self) -> str: """The path to a folder where persistent files may be saved. @@ -230,9 +237,10 @@ def mock_thing_instance(spec: type[ThingSubclass]) -> ThingSubclass: """ mock = Mock(spec=spec) mock.__name__ = "Mock{spec.__name__}" + mock.name = mock.__name__.lower() mock.__module__ = "mock_module" mock._thing_server_interface = MockThingServerInterface( - mock.__name__.lower(), mock.__name__ + mock.name, mock.__name__ ) return mock diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py new file mode 100644 index 00000000..61495e1d --- /dev/null +++ b/tests/test_message_broker.py @@ -0,0 +1,151 @@ +"""Test the message broker.""" + +import anyio +from anyio.abc import ObjectReceiveStream +import pytest + +from pydantic import ValidationError + +from labthings_fastapi.message_broker import Message, MessageBroker + + +class Unjsonable: + """A class that won't serialise.""" + + +@pytest.mark.parametrize( + "message", + [ + ("test_thing", "prop", "property", 42), + ("test_thing", "prop", "property", Unjsonable()), + ("test_thing", "do_it", "action", None), + ("test_thing", "notify", "event", {"key": "value"}), + ], +) +def test_message_valid(message): + """Check that Messages can be constructed.""" + amodel = Message(*message) + ARGS = ["thing", "affordance", "message_type", "payload"] + kwargs = dict(zip(ARGS, message, strict=True)) + kmodel = Message(**kwargs) + assert amodel == kmodel + assert amodel.__dict__ == kwargs + + +@pytest.mark.parametrize( + "message", + [ + ("test_thing", "prop", "custom", None), + (Unjsonable(), "prop", "property", None), + ("thing", Unjsonable(), "property", None), + ], +) +def test_message_invalid(message): + """Check that invalid Things or message types fail validation.""" + with pytest.raises(ValidationError): + _ = Message(*message) + + +def test_subscribe_unsubscribe(): + """Test that we can subscribe to affordances, and unsubscribe.""" + broker = MessageBroker() + assert broker._subscriptions == {} + + send_stream, receive_stream = anyio.create_memory_object_stream[Message]() + broker.subscribe("thing_name", "prop", send_stream) + + assert send_stream in broker._subscriptions["thing_name"]["prop"] + + broker.unsubscribe("thing_name", "prop", send_stream) + assert send_stream not in broker._subscriptions["thing_name"]["prop"] + + # There's deliberately no validation when subscribing - that must come + # from elsewhere. We do raise key errors for unsubscriptions though, if + # there's no subscription to cancel. + with pytest.raises(KeyError): + broker.unsubscribe("other_thing", "prop", send_stream) + with pytest.raises(KeyError): + broker.unsubscribe("thing_name", "other_prop", send_stream) + # There is currently no check that a subscription is current, so we don't + # yet test if the stream is currently subscribed before deleting it from the + # list of subscriptions. That means the following should work, even though + # we're not currently subscribed: + assert len(broker._subscriptions["thing_name"]["prop"]) == 0 + broker.unsubscribe("thing_name", "prop", send_stream) + assert len(broker._subscriptions["thing_name"]["prop"]) == 0 + + # We do check that the "thing" key is a string, not a `Thing` instance + # (because that's a likely mistake). + with pytest.raises(TypeError): + broker.subscribe(Unjsonable(), "whatever", send_stream) # type: ignore + with pytest.raises(TypeError): + broker.unsubscribe(Unjsonable(), "whatever", send_stream) # type: ignore + + +async def append_messages( + stream: ObjectReceiveStream[Message], + dest: list[Message], +): + """Append messages from a stream to a list.""" + async with stream: + async for item in stream: + dest.append(item) + + +def test_message_passing(): + """Check messages propagate in an event loop. + + We test messages with 0, 1, and 2 subscribers. + """ + message_a = Message("thing_a", "prop", "property", "a") + message_b = Message("thing_b", "prop", "property", "b") + message_a2 = Message("thing_a", "prop2", "property", "a2") + + broker = MessageBroker() + + async def publish_messages_and_shutdown(): + """Publish several messages.""" + await broker.publish(message_a) + await broker.publish(message_b) # not received - but no error either + await broker.publish(message_a2) + await broker.publish(message_a2) + await broker.publish(message_a2) + # It's important to close streams or the test hangs. + await broker.close_streams() + + # We make four subscriptions, defined below. + # Each has a thing name and property name. Any messages received will be + # appended to the list. + message_lists = { + "a_prop": ("thing_a", "prop", []), # message_a + "c_prop": ("thing_c", "prop", []), # no message + "a_prop2": ("thing_a", "prop2", []), # message_a3 x3 + "a_prop2_dup": ("thing_a", "prop2", []), # as above + } + + # Define the async code that runs in an event loop + async def main(): + async with anyio.create_task_group() as tg: + retain_send_streams = [] + for thing, prop, dest in message_lists.values(): + # Subscribe to messages, and handle them by + # appending to a list. + send, recv = anyio.create_memory_object_stream[Message]() + broker.subscribe(thing, prop, send) + tg.start_soon(append_messages, recv, dest) + # The line below stops the send stream getting garbage collected. + retain_send_streams.append(send) + tg.start_soon(publish_messages_and_shutdown) + + # Run the function in an event loop + anyio.run(main) + + # Check that the messages were received by the expected streams + assert message_lists["a_prop"][2] == [message_a] + assert message_lists["c_prop"][2] == [] + assert message_lists["a_prop2"][2] == [message_a2] * 3 + assert message_lists["a_prop2_dup"][2] == [message_a2] * 3 + + +if __name__ == "__main__": + test_message_passing() diff --git a/tests/test_properties.py b/tests/test_properties.py index 9f0da758..e7e31edb 100644 --- a/tests/test_properties.py +++ b/tests/test_properties.py @@ -11,7 +11,6 @@ from labthings_fastapi.exceptions import ( ClientPropertyError, NotBoundToInstanceError, - ServerNotRunningError, UnserialisableTypeError, UnsupportedConstraintError, ) @@ -392,18 +391,6 @@ def test_setting_from_thread(server): assert r.json() is True -def test_setting_without_event_loop(): - """Test DataProperty raises an error if set without an event loop.""" - # This test may need to change, if we change the intended behaviour - # Currently it should never be necessary to change properties from the - # main thread, so we raise an error if you try to do so - server = lt.ThingServer.from_things({"thing": PropertyTestThing}) - thing = server.things["thing"] - assert isinstance(thing, PropertyTestThing) - with pytest.raises(ServerNotRunningError): - thing.boolprop = False # Can't call it until the event loop's running - - @pytest.mark.parametrize("prop_info", CONSTRAINED_PROPS) def test_constrained_properties(prop_info, mocker): """Test that constraints on property values generate correct models. From ac86b2a0fe9a10807c2ff10d0d3e0f5b0be4da07 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Fri, 29 May 2026 00:03:43 +0100 Subject: [PATCH 03/14] Eliminate LabThings Data The sets of observers for properties and actions used to be stored in a `_labthings_data` attribute of the `Thing`. They now live in the `MessageBroker` so I have eliminated `LabThingsData`. Good riddance :) --- src/labthings_fastapi/utilities/__init__.py | 51 +-------------------- 1 file changed, 1 insertion(+), 50 deletions(-) diff --git a/src/labthings_fastapi/utilities/__init__.py b/src/labthings_fastapi/utilities/__init__.py index c04130a9..89859bcf 100644 --- a/src/labthings_fastapi/utilities/__init__.py +++ b/src/labthings_fastapi/utilities/__init__.py @@ -3,18 +3,15 @@ from __future__ import annotations from collections.abc import Callable, Mapping from types import FunctionType -from typing import Any, Dict, Generic, Iterable, TYPE_CHECKING, Optional, TypeVar -from weakref import WeakSet +from typing import Any, Dict, Generic, Iterable, Optional, TypeVar from pydantic import ( BaseModel, - ConfigDict, Field, RootModel, create_model, PydanticSchemaGenerationError, ValidationError, ) -from pydantic.dataclasses import dataclass from pydantic_core import PydanticSerializationError from fastapi import Response @@ -25,16 +22,10 @@ ) from .introspection import EmptyObject -if TYPE_CHECKING: - from ..thing import Thing - - __all__ = [ "class_attributes", "attributes", "RootModelWrapper", - "LabThingsObjectData", - "labthings_data", "model_to_dict", ] @@ -69,46 +60,6 @@ def attributes(cls: Any) -> Iterable[tuple[str, Any]]: yield name, getattr(cls, name) -LABTHINGS_DICT_KEY = "__labthings" - - -@dataclass(config=ConfigDict(arbitrary_types_allowed=True)) -class LabThingsObjectData: - r"""Data used by LabThings, stored on each `~lt.Thing`. - - This `pydantic.dataclass` groups together some properties used - by LabThings descriptors, to avoid cluttering the namespace of the - `~lt.Thing` subclass on which they are defined. - """ - - property_observers: Dict[str, WeakSet] = Field(default_factory=dict) - r"""The observers added to each property. - - Keys are property names, values are weak sets used by `~lt.DataProperty`\ . - """ - action_observers: Dict[str, WeakSet] = Field(default_factory=dict) - r"""The observers added to each action. - - Keys are action names, values are weak sets used by - `.ActionDescriptor`\ . - """ - - -def labthings_data(obj: Thing) -> LabThingsObjectData: - """Get (or create) a dictionary for LabThings properties. - - Ensure there is a `.LabThingsObjectData` dataclass attached to - a particular `~lt.Thing`, and return it. - - :param obj: The `~lt.Thing` we are looking for the dataclass on. - - :return: a `.LabThingsObjectData` instance attached to ``obj``. - """ - if LABTHINGS_DICT_KEY not in obj.__dict__: - obj.__dict__[LABTHINGS_DICT_KEY] = LabThingsObjectData() - return obj.__dict__[LABTHINGS_DICT_KEY] - - WrappedT = TypeVar("WrappedT") From 92017de8a7034aa6cf38df25eb483d0caed8a87a Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Mon, 1 Jun 2026 16:22:17 +0100 Subject: [PATCH 04/14] Apply suggestions from code review Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com> --- src/labthings_fastapi/exceptions.py | 2 +- src/labthings_fastapi/message_broker.py | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/labthings_fastapi/exceptions.py b/src/labthings_fastapi/exceptions.py index 7acc0e2b..d03932c3 100644 --- a/src/labthings_fastapi/exceptions.py +++ b/src/labthings_fastapi/exceptions.py @@ -43,7 +43,7 @@ class ReadOnlyPropertyError(AttributeError): class PropertyNotObservableError(RuntimeError): """The property is not observable. - This exception is raised when trying to observe + This exception is raised when trying to observe a property that is not observable. Currently, only data properties are observable: functional properties (using a getter/setter) may not be observed. diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index 46f427ae..1d102317 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -70,11 +70,9 @@ def subscribe( """ if not isinstance(thing, str): raise TypeError(f"The `thing` argument should be a string, not {thing}.") - if thing not in self._subscriptions: - self._subscriptions[thing] = {} - if affordance not in self._subscriptions[thing]: - self._subscriptions[thing][affordance] = WeakSet() - self._subscriptions[thing][affordance].add(stream) + affordances = self._subscriptions.setdefault(thing, {}) + streams = affordances.setdefault(affordance, WeakSet()) + streams.add(stream) def unsubscribe( self, thing: str, affordance: str, stream: ObjectSendStream[Message] From 941f3e981384fabed9eafba7d01556f38ae2d9ed Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Mon, 1 Jun 2026 20:50:54 +0100 Subject: [PATCH 05/14] Fix docstring of Message and type of Message.message_type The parameters are now documented properly. I've also removed "event" from the `message_type` literal, because events aren't supported yet. That's for a future PR. --- src/labthings_fastapi/message_broker.py | 5 +++-- tests/test_message_broker.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index 1d102317..46f858b0 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -24,12 +24,13 @@ class Message: :param thing: The name of the Thing generating the event. :param affordance: The name of the affordance generating the event. - :param message: The message to send. + :param message_type: The kind of affordance from which the event originates. + :param payload: Data specific to the event (e.g. property value, action status). """ thing: str affordance: str - message_type: Literal["property", "action", "event"] + message_type: Literal["property", "action"] payload: Any diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 61495e1d..0c5fe0a9 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -19,7 +19,7 @@ class Unjsonable: ("test_thing", "prop", "property", 42), ("test_thing", "prop", "property", Unjsonable()), ("test_thing", "do_it", "action", None), - ("test_thing", "notify", "event", {"key": "value"}), + ("test_thing", "notify", "action", {"key": "value"}), ], ) def test_message_valid(message): From 488be1bc16f7b6f5b2b999bd043834ab53347843 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Mon, 1 Jun 2026 20:57:38 +0100 Subject: [PATCH 06/14] Check both keys are strings when subscribing to messages. I think the first check (for Thing) is far more likely to prove useful - but it's a cheap check so we may as well do both as checking only one seems odd. --- src/labthings_fastapi/message_broker.py | 12 ++++++++---- tests/test_message_broker.py | 8 ++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index 46f858b0..da8a8781 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -67,10 +67,12 @@ def subscribe( :param thing: The name of the `.Thing` being subscribed to. :param affordance: The name of the affordance being subscribed to. :param stream: A stream to send the messages to. - :raises TypeError: if the `thing` argument is not a string. + :raises TypeError: if the `thing` or `affordance` argument is not a string. """ if not isinstance(thing, str): - raise TypeError(f"The `thing` argument should be a string, not {thing}.") + raise TypeError(f"`thing` must be a string, not '{thing}'.") + if not isinstance(affordance, str): + raise TypeError("`affordance` must be a string, not '{affordance}'.") affordances = self._subscriptions.setdefault(thing, {}) streams = affordances.setdefault(affordance, WeakSet()) streams.add(stream) @@ -84,10 +86,12 @@ def unsubscribe( :param affordance: The name of the affordance being unsubscribed from. :param stream: The stream to unsubscribe. :raises KeyError: if there is no such subscription. - :raises TypeError: if the `thing` argument is not a string. + :raises TypeError: if the `thing` or `affordance` argument is not a string. """ if not isinstance(thing, str): - raise TypeError(f"The `thing` argument should be a string, not {thing}.") + raise TypeError(f"`thing` must be a string, not '{thing}'.") + if not isinstance(affordance, str): + raise TypeError("`affordance` must be a string, not '{affordance}'.") try: self._subscriptions[thing][affordance].discard(stream) except KeyError as e: diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 0c5fe0a9..516d3a4a 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -74,12 +74,16 @@ def test_subscribe_unsubscribe(): broker.unsubscribe("thing_name", "prop", send_stream) assert len(broker._subscriptions["thing_name"]["prop"]) == 0 - # We do check that the "thing" key is a string, not a `Thing` instance - # (because that's a likely mistake). + # We do check that the `thing` and `affordance` are strings, because it would + # be very easy to pass a `Thing` by accident otherwise. with pytest.raises(TypeError): broker.subscribe(Unjsonable(), "whatever", send_stream) # type: ignore with pytest.raises(TypeError): broker.unsubscribe(Unjsonable(), "whatever", send_stream) # type: ignore + with pytest.raises(TypeError): + broker.subscribe("whatever", Unjsonable(), send_stream) # type: ignore + with pytest.raises(TypeError): + broker.unsubscribe("whatever", Unjsonable(), send_stream) # type: ignore async def append_messages( From 4e4ec648f4b87f0ee26baf7acb453bf20f61af6d Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Mon, 1 Jun 2026 21:00:17 +0100 Subject: [PATCH 07/14] Raise an error if we unsubscribe a non-existent subscription. Previously, unsubscribing from a non-existent subscription may or may not have raised a KeyError. Now, we always get the error. Test updated accordingly. --- src/labthings_fastapi/message_broker.py | 2 +- tests/test_message_broker.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index da8a8781..7f101256 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -93,7 +93,7 @@ def unsubscribe( if not isinstance(affordance, str): raise TypeError("`affordance` must be a string, not '{affordance}'.") try: - self._subscriptions[thing][affordance].discard(stream) + self._subscriptions[thing][affordance].remove(stream) except KeyError as e: raise e diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 516d3a4a..4364f4e8 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -71,7 +71,8 @@ def test_subscribe_unsubscribe(): # list of subscriptions. That means the following should work, even though # we're not currently subscribed: assert len(broker._subscriptions["thing_name"]["prop"]) == 0 - broker.unsubscribe("thing_name", "prop", send_stream) + with pytest.raises(KeyError): + broker.unsubscribe("thing_name", "prop", send_stream) assert len(broker._subscriptions["thing_name"]["prop"]) == 0 # We do check that the `thing` and `affordance` are strings, because it would From d55e1241c21a474745699da5e7e8b89f8e2304a0 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Mon, 1 Jun 2026 21:11:29 +0100 Subject: [PATCH 08/14] Add a test for closing streams. --- tests/test_message_broker.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 4364f4e8..f1d71782 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -152,5 +152,17 @@ async def main(): assert message_lists["a_prop2_dup"][2] == [message_a2] * 3 -if __name__ == "__main__": - test_message_passing() +def test_close_streams(): + """Verify that close_streams actually closes the subscribed streams.""" + broker = MessageBroker() + send_stream, receive_stream = anyio.create_memory_object_stream[Message]() + + broker.subscribe("thing_a", "prop", send_stream) + anyio.run(broker.close_streams) + + # Check the send stream was closed + assert send_stream._closed is True + + # Check this propagates to the receive stream + with pytest.raises(anyio.EndOfStream): + anyio.run(receive_stream.receive) From c15cff0e975790aa2724b5ac1c821f759c06c062 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Tue, 2 Jun 2026 20:38:50 +0100 Subject: [PATCH 09/14] Improvements from code review This should significantly improve the robustness of the message broker thanks to comments from @bprobert97. The main changes are: * If a stream is closed, it's automatically unsubscribed from messages. This avoids errors that could derail the message broker. * If a stream's buffer is full, we drop messages rather than blocking. This avoids gumming up the event loop with stuck tasks, which is a potential memory leak. * I've made the subscribe and unsubscribe functions `async` because they must be called in the event loop. They don't actually need to be async, but they do need to be in the event loop, and the async keyword is a good way to ensure that. I considered logging a warning when streams are automatically unsubscribed, but decided against it. My thinking is that a closed stream can't be reopened, so there's no risk that the unsubscription is losing messages unexpectedly. Also, saying that closing or deleting the stream is an official way to unsubscribe removes the need for lots of logic to track and explicitly remove subscriptions, which simplifies the websocket code. I have added an `anyio` line to pytest configuration so we can have `async` tests now. This supports the new tests in `test_message_broker.py` and avoids lots of `anyio.run()` clutter. --- pyproject.toml | 1 + src/labthings_fastapi/exceptions.py | 13 ++ src/labthings_fastapi/message_broker.py | 44 +++++-- src/labthings_fastapi/websockets.py | 18 +-- tests/test_message_broker.py | 159 +++++++++++++++++++----- 5 files changed, 188 insertions(+), 47 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f4aee75c..155a0425 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,7 @@ filterwarnings = [ # with the line below. "ignore:.get_validate_properties_on_set. will become .True. by default:DeprecationWarning", ] +anyio_mode = "auto" [tool.ruff] target-version = "py310" diff --git a/src/labthings_fastapi/exceptions.py b/src/labthings_fastapi/exceptions.py index d03932c3..d20239ba 100644 --- a/src/labthings_fastapi/exceptions.py +++ b/src/labthings_fastapi/exceptions.py @@ -460,3 +460,16 @@ class GlobalLockBusyError(TimeoutError): it. It indicates that the LabThings server is busy running another action or property setter. """ + + +class MessageDroppedWarning(RuntimeWarning): + """A message was dropped by the message broker. + + This warning is emitted when a message can't be sent to a subscribed stream + because the stream's buffer is full. The message broker won't block, as + doing so could result in a potentially infinite number of stalled tasks. + + If you see this warning, it means that a stream has been subscribed to + messages, but is not being read. Most likely, this means the stream was + not closed or deleted properly. + """ diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index 7f101256..0d62da98 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -8,8 +8,15 @@ from pydantic.dataclasses import dataclass from typing import Any, Literal from weakref import WeakSet +import logging +import warnings -from anyio.abc import ObjectSendStream +from anyio.streams.memory import MemoryObjectSendStream + +from labthings_fastapi.exceptions import MessageDroppedWarning + + +LOGGER = logging.getLogger(__name__) @dataclass @@ -53,16 +60,16 @@ def __init__(self) -> None: # Note that we use a weak set below, so that when a websocket disconnects, # its stream is removed automatically. self._subscriptions: dict[ - str, dict[str, WeakSet[ObjectSendStream[Message]]] + str, dict[str, WeakSet[MemoryObjectSendStream[Message]]] ] = {} - def subscribe( - self, thing: str, affordance: str, stream: ObjectSendStream[Message] + async def subscribe( + self, thing: str, affordance: str, stream: MemoryObjectSendStream[Message] ) -> None: """Subscribe to messages from a particular affordance. - Note that this method is not async - it just registers the stream and so - can be run from any thread. + Note that this method must be called from the event loop, as the message + broker is deliberately not thread safe. :param thing: The name of the `.Thing` being subscribed to. :param affordance: The name of the affordance being subscribed to. @@ -77,11 +84,16 @@ def subscribe( streams = affordances.setdefault(affordance, WeakSet()) streams.add(stream) - def unsubscribe( - self, thing: str, affordance: str, stream: ObjectSendStream[Message] + async def unsubscribe( + self, thing: str, affordance: str, stream: MemoryObjectSendStream[Message] ) -> None: """Unsubscribe a stream from messages from a particular affordance. + This function is often not necessary: streams will be unsubscribed automatically + if they are closed or finalised. As the message broker only keeps a weak + reference to the stream, that means it will be finalized and unsubscribed + when the code that created it goes out of scope. + :param thing: The name of the `.Thing` being unsubscribed from. :param affordance: The name of the affordance being unsubscribed from. :param stream: The stream to unsubscribe. @@ -108,8 +120,22 @@ async def publish(self, message: Message) -> None: subscriptions = self._subscriptions[message.thing][message.affordance] except KeyError: return # No subscribers for this thing. + subscriptions_to_remove = set() for stream in subscriptions: - await stream.send(message) + try: + stream.send_nowait(message) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + # Streams that have been closed will be automatically unsubscribed. + # They can't be reopened, so they won't be reused. + subscriptions_to_remove.add(stream) + except anyio.WouldBlock: + msg = f"Could not pass notification to {stream} as it was full." + LOGGER.warning(msg) + warnings.warn(MessageDroppedWarning(msg), stacklevel=1) + for stream in subscriptions_to_remove: + # discard rather than remove, so that if the stream has been finalized + # since it was closed, we don't get an error. + subscriptions.discard(stream) async def close_streams(self) -> None: """Close all streams that are subscribed to receive messages. diff --git a/src/labthings_fastapi/websockets.py b/src/labthings_fastapi/websockets.py index 9d5c00df..4b019f67 100644 --- a/src/labthings_fastapi/websockets.py +++ b/src/labthings_fastapi/websockets.py @@ -20,7 +20,7 @@ from __future__ import annotations from anyio import create_memory_object_stream, create_task_group -from anyio.abc import ObjectReceiveStream, ObjectSendStream +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream import logging from fastapi import WebSocket, WebSocketDisconnect from fastapi.encoders import jsonable_encoder @@ -79,7 +79,7 @@ def observation_error_response( async def relay_notifications_to_websocket( - websocket: WebSocket, receive_stream: ObjectReceiveStream[Message] + websocket: WebSocket, receive_stream: MemoryObjectReceiveStream[Message] ) -> None: """Relay objects from a stream to a websocket as JSON. @@ -88,7 +88,7 @@ async def relay_notifications_to_websocket( queue and passes them to the websocket. :param websocket: the WebSocket we are communicating over. - :param receive_stream: an `anyio.abc.ObjectReceiveStream` that will + :param receive_stream: a stream that will yield objects that we send over the websocket. """ async with receive_stream: @@ -129,7 +129,7 @@ def assert_property_is_observable(thing: Thing, property: str) -> bool: async def process_messages_from_websocket( websocket: WebSocket, - send_stream: ObjectSendStream[Message], + send_stream: MemoryObjectSendStream[Message], broker: MessageBroker, thing: Thing, ) -> None: @@ -156,7 +156,7 @@ async def process_messages_from_websocket( try: for k in data["data"].keys(): assert_property_is_observable(thing, k) - broker.subscribe(thing.name, k, send_stream) + await broker.subscribe(thing.name, k, send_stream) except (KeyError, PropertyNotObservableError) as e: logging.error(f"Got a bad websocket message: {data}, caused {e!r}.") await websocket.send_json(observation_error_response(k, "property", e)) @@ -164,7 +164,7 @@ async def process_messages_from_websocket( try: for k in data["data"].keys(): _ = thing.actions[k] # raise a KeyError if the action doesn't exist - broker.subscribe(thing.name, k, send_stream) + await broker.subscribe(thing.name, k, send_stream) except KeyError as e: logging.error(f"Got a bad websocket message: {data}, caused {e!r}.") await websocket.send_json(observation_error_response(k, "action", e)) @@ -184,7 +184,11 @@ async def websocket_endpoint( :param broker: the message broker to use for subscriptions. """ await websocket.accept() - send_stream, receive_stream = create_memory_object_stream[Message]() + # We use a small, buffer size for the stream: the message broker will drop + # messages if the buffer fills up, and the default is no buffer. While we + # should always clear the buffer quickly, a small buffer should avoid + # dropping any messages. + send_stream, receive_stream = create_memory_object_stream[Message](5) async with create_task_group() as tg: tg.start_soon(relay_notifications_to_websocket, websocket, receive_stream) tg.start_soon( diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index f1d71782..1ad86229 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -1,5 +1,8 @@ """Test the message broker.""" +import logging +from weakref import WeakSet + import anyio from anyio.abc import ObjectReceiveStream import pytest @@ -46,45 +49,45 @@ def test_message_invalid(message): _ = Message(*message) -def test_subscribe_unsubscribe(): +async def test_subscribe_unsubscribe(): """Test that we can subscribe to affordances, and unsubscribe.""" broker = MessageBroker() assert broker._subscriptions == {} send_stream, receive_stream = anyio.create_memory_object_stream[Message]() - broker.subscribe("thing_name", "prop", send_stream) + await broker.subscribe("thing_name", "prop", send_stream) assert send_stream in broker._subscriptions["thing_name"]["prop"] - broker.unsubscribe("thing_name", "prop", send_stream) + await broker.unsubscribe("thing_name", "prop", send_stream) assert send_stream not in broker._subscriptions["thing_name"]["prop"] # There's deliberately no validation when subscribing - that must come # from elsewhere. We do raise key errors for unsubscriptions though, if # there's no subscription to cancel. with pytest.raises(KeyError): - broker.unsubscribe("other_thing", "prop", send_stream) + await broker.unsubscribe("other_thing", "prop", send_stream) with pytest.raises(KeyError): - broker.unsubscribe("thing_name", "other_prop", send_stream) + await broker.unsubscribe("thing_name", "other_prop", send_stream) # There is currently no check that a subscription is current, so we don't # yet test if the stream is currently subscribed before deleting it from the # list of subscriptions. That means the following should work, even though # we're not currently subscribed: assert len(broker._subscriptions["thing_name"]["prop"]) == 0 with pytest.raises(KeyError): - broker.unsubscribe("thing_name", "prop", send_stream) + await broker.unsubscribe("thing_name", "prop", send_stream) assert len(broker._subscriptions["thing_name"]["prop"]) == 0 # We do check that the `thing` and `affordance` are strings, because it would # be very easy to pass a `Thing` by accident otherwise. with pytest.raises(TypeError): - broker.subscribe(Unjsonable(), "whatever", send_stream) # type: ignore + await broker.subscribe(Unjsonable(), "whatever", send_stream) # type: ignore with pytest.raises(TypeError): - broker.unsubscribe(Unjsonable(), "whatever", send_stream) # type: ignore + await broker.unsubscribe(Unjsonable(), "whatever", send_stream) # type: ignore with pytest.raises(TypeError): - broker.subscribe("whatever", Unjsonable(), send_stream) # type: ignore + await broker.subscribe("whatever", Unjsonable(), send_stream) # type: ignore with pytest.raises(TypeError): - broker.unsubscribe("whatever", Unjsonable(), send_stream) # type: ignore + await broker.unsubscribe("whatever", Unjsonable(), send_stream) # type: ignore async def append_messages( @@ -97,7 +100,7 @@ async def append_messages( dest.append(item) -def test_message_passing(): +async def test_message_passing(): """Check messages propagate in an event loop. We test messages with 0, 1, and 2 subscribers. @@ -128,22 +131,21 @@ async def publish_messages_and_shutdown(): "a_prop2_dup": ("thing_a", "prop2", []), # as above } - # Define the async code that runs in an event loop - async def main(): - async with anyio.create_task_group() as tg: - retain_send_streams = [] - for thing, prop, dest in message_lists.values(): - # Subscribe to messages, and handle them by - # appending to a list. - send, recv = anyio.create_memory_object_stream[Message]() - broker.subscribe(thing, prop, send) - tg.start_soon(append_messages, recv, dest) - # The line below stops the send stream getting garbage collected. - retain_send_streams.append(send) - tg.start_soon(publish_messages_and_shutdown) - - # Run the function in an event loop - anyio.run(main) + async with anyio.create_task_group() as tg: + # Set up the subscriptions + retain_send_streams = [] + for thing, prop, dest in message_lists.values(): + # Subscribe to messages, and handle them by + # appending to a list. + # Note buffer size needs to be >0 or we'll drop messages + # if they're sent before we start listening. + send, recv = anyio.create_memory_object_stream[Message](5) + await broker.subscribe(thing, prop, send) + tg.start_soon(append_messages, recv, dest) + # The line below stops the send stream getting garbage collected. + retain_send_streams.append(send) + # Now publish messages to the streams, then close them. + tg.start_soon(publish_messages_and_shutdown) # Check that the messages were received by the expected streams assert message_lists["a_prop"][2] == [message_a] @@ -152,17 +154,112 @@ async def main(): assert message_lists["a_prop2_dup"][2] == [message_a2] * 3 -def test_close_streams(): +async def test_close_streams(): """Verify that close_streams actually closes the subscribed streams.""" broker = MessageBroker() send_stream, receive_stream = anyio.create_memory_object_stream[Message]() - broker.subscribe("thing_a", "prop", send_stream) - anyio.run(broker.close_streams) + # We subscribe to two affordances, to make sure it's not a problem to + # close multiple subscriptions that are the same stream. + await broker.subscribe("thing_a", "prop", send_stream) + await broker.subscribe("thing_b", "prop", send_stream) + await broker.close_streams() # Check the send stream was closed assert send_stream._closed is True # Check this propagates to the receive stream with pytest.raises(anyio.EndOfStream): - anyio.run(receive_stream.receive) + await receive_stream.receive() + + +@pytest.mark.parametrize("action", ["close_send", "close_receive", "delete_send"]) +async def test_sending_to_closed_streams(caplog, action): + """Check that closing a stream causes it to unsubscribe.""" + caplog.set_level(logging.WARNING) + broker = MessageBroker() + send_stream, receive_stream = anyio.create_memory_object_stream[Message](2) + await broker.subscribe("thing", "prop", send_stream) + # Verify there's a subscription + assert len(broker._subscriptions["thing"]["prop"]) == 1 + assert send_stream in broker._subscriptions["thing"]["prop"] + message = Message("thing", "prop", "property", None) + + print("sending first message") + # Check we can send and receive a message + await broker.publish(message) + received = await receive_stream.receive() + assert received is message + + # Close or delete the stream + if action == "close_send": + await send_stream.aclose() + elif action == "close_receive": + await receive_stream.aclose() + else: + del send_stream # streams are unsubscribed when they are finalised + assert len(caplog.records) == 0 + print("sending second message") + await broker.publish(message) + assert len(caplog.records) == 0 # Shouldn't be any warnings in the log + # Check we've been unsubscribed + assert len(broker._subscriptions["thing"]["prop"]) == 0 + + +async def test_sending_to_full_stream(caplog): + """Check that a stream that's full logs a warning and doesn't block.""" + caplog.set_level(logging.WARNING) + broker = MessageBroker() + send_stream, receive_stream = anyio.create_memory_object_stream(max_buffer_size=1) + await broker.subscribe("thing", "prop", send_stream) + message = Message("thing", "prop", "property", None) + + # Check we can send and receive a message + await broker.publish(message) + received = await receive_stream.receive() + assert received is message + + # Send another message, so the stream's buffer fills up + await broker.publish(message) + assert len(caplog.records) == 0 + + # Send a third message, which should fail and log a warning + await broker.publish(message) + assert len(caplog.records) == 1 + msg = caplog.records[0].getMessage() + assert msg.startswith("Could not pass notification to") + assert msg.endswith("as it was full.") + + # Receive the message and clear the buffer + received = await receive_stream.receive() + assert received is message + + # Send and receive again - should be no further problems + await broker.publish(message) + received = await receive_stream.receive() + assert received is message + assert len(caplog.records) == 1 + + +async def test_weakset_garbage_collection(): + """Check we can't cause a problem by garbage-collecting streams mid-send. + + This tests behaviour of Python and the garbage collector - it's not very clear to + me from the Python docs when garbage collection may happen, or whether `WeakSet` is + robust to it. This test checks I've understood that behaviour correctly, and should + fail if Python's behaviour changes in a problematic way. + + The test iterates over a set of four objects, but deletes the strong references + during the first iteration - if this were a regular set, it would cause an error. + """ + items = {Unjsonable() for _ in range(4)} + assert len(items) == 4 + weak = WeakSet(items) + assert len(weak) == 4 + iterated = set() + for item in weak: + iterated.add(item) # We don't know which item this will be + del items # There are now no references except to the one in `iterated` + assert len(iterated) == 1 # We only iterated once, but there wasn't an error. + # If we complete the test, it confirms we don't need to worry about streams being + # finalized during iteration. From e552dd7e81783006a19df3ac3205a650061660b7 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 3 Jun 2026 12:47:05 +0100 Subject: [PATCH 10/14] Apply suggestion from @bprobert97 Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com> --- src/labthings_fastapi/message_broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index 0d62da98..f98dece9 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -79,7 +79,7 @@ async def subscribe( if not isinstance(thing, str): raise TypeError(f"`thing` must be a string, not '{thing}'.") if not isinstance(affordance, str): - raise TypeError("`affordance` must be a string, not '{affordance}'.") + raise TypeError(f"`affordance` must be a string, not '{affordance}'.") affordances = self._subscriptions.setdefault(thing, {}) streams = affordances.setdefault(affordance, WeakSet()) streams.add(stream) From 39693830af983c59430bf020fca5815bf41f7c9f Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 3 Jun 2026 12:50:45 +0100 Subject: [PATCH 11/14] Test a warning is emitted if a stream is full. --- tests/test_message_broker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 1ad86229..4af2f728 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -9,6 +9,7 @@ from pydantic import ValidationError +from labthings_fastapi.exceptions import MessageDroppedWarning from labthings_fastapi.message_broker import Message, MessageBroker @@ -224,7 +225,8 @@ async def test_sending_to_full_stream(caplog): assert len(caplog.records) == 0 # Send a third message, which should fail and log a warning - await broker.publish(message) + with pytest.warns(MessageDroppedWarning): + await broker.publish(message) assert len(caplog.records) == 1 msg = caplog.records[0].getMessage() assert msg.startswith("Could not pass notification to") From ae5a39a68f6979907266a961c1fc8bf645377fca Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 3 Jun 2026 12:54:04 +0100 Subject: [PATCH 12/14] Apply suggestions from code review Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com> --- src/labthings_fastapi/message_broker.py | 6 +++--- tests/test_message_broker.py | 7 +------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/labthings_fastapi/message_broker.py b/src/labthings_fastapi/message_broker.py index f98dece9..f6e6bf0a 100644 --- a/src/labthings_fastapi/message_broker.py +++ b/src/labthings_fastapi/message_broker.py @@ -91,7 +91,7 @@ async def unsubscribe( This function is often not necessary: streams will be unsubscribed automatically if they are closed or finalised. As the message broker only keeps a weak - reference to the stream, that means it will be finalized and unsubscribed + reference to the stream, that means it will be finalised and unsubscribed when the code that created it goes out of scope. :param thing: The name of the `.Thing` being unsubscribed from. @@ -103,7 +103,7 @@ async def unsubscribe( if not isinstance(thing, str): raise TypeError(f"`thing` must be a string, not '{thing}'.") if not isinstance(affordance, str): - raise TypeError("`affordance` must be a string, not '{affordance}'.") + raise TypeError(f"`affordance` must be a string, not '{affordance}'.") try: self._subscriptions[thing][affordance].remove(stream) except KeyError as e: @@ -133,7 +133,7 @@ async def publish(self, message: Message) -> None: LOGGER.warning(msg) warnings.warn(MessageDroppedWarning(msg), stacklevel=1) for stream in subscriptions_to_remove: - # discard rather than remove, so that if the stream has been finalized + # discard rather than remove, so that if the stream has been finalised # since it was closed, we don't get an error. subscriptions.discard(stream) diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 4af2f728..7f19466a 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -246,11 +246,6 @@ async def test_sending_to_full_stream(caplog): async def test_weakset_garbage_collection(): """Check we can't cause a problem by garbage-collecting streams mid-send. - This tests behaviour of Python and the garbage collector - it's not very clear to - me from the Python docs when garbage collection may happen, or whether `WeakSet` is - robust to it. This test checks I've understood that behaviour correctly, and should - fail if Python's behaviour changes in a problematic way. - The test iterates over a set of four objects, but deletes the strong references during the first iteration - if this were a regular set, it would cause an error. """ @@ -264,4 +259,4 @@ async def test_weakset_garbage_collection(): del items # There are now no references except to the one in `iterated` assert len(iterated) == 1 # We only iterated once, but there wasn't an error. # If we complete the test, it confirms we don't need to worry about streams being - # finalized during iteration. + # finalised during iteration. From b23d7d9f7bd6c5bb0ed19862dbb7bd7cff6997ce Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 3 Jun 2026 12:58:42 +0100 Subject: [PATCH 13/14] Test the exception message for type errors when subscribing/unsubscribing. --- tests/test_message_broker.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/test_message_broker.py b/tests/test_message_broker.py index 7f19466a..8896e026 100644 --- a/tests/test_message_broker.py +++ b/tests/test_message_broker.py @@ -81,14 +81,15 @@ async def test_subscribe_unsubscribe(): # We do check that the `thing` and `affordance` are strings, because it would # be very easy to pass a `Thing` by accident otherwise. - with pytest.raises(TypeError): - await broker.subscribe(Unjsonable(), "whatever", send_stream) # type: ignore - with pytest.raises(TypeError): - await broker.unsubscribe(Unjsonable(), "whatever", send_stream) # type: ignore - with pytest.raises(TypeError): - await broker.subscribe("whatever", Unjsonable(), send_stream) # type: ignore - with pytest.raises(TypeError): - await broker.unsubscribe("whatever", Unjsonable(), send_stream) # type: ignore + msg_suffix = " must be a string, not '42'" + with pytest.raises(TypeError, match="`thing`" + msg_suffix): + await broker.subscribe(42, "whatever", send_stream) # type: ignore + with pytest.raises(TypeError, match="`thing`" + msg_suffix): + await broker.unsubscribe(42, "whatever", send_stream) # type: ignore + with pytest.raises(TypeError, match="`affordance`" + msg_suffix): + await broker.subscribe("whatever", 42, send_stream) # type: ignore + with pytest.raises(TypeError, match="`affordance`" + msg_suffix): + await broker.unsubscribe("whatever", 42, send_stream) # type: ignore async def append_messages( From efb6b9373d27582e07516711b602cb323df5a83b Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 3 Jun 2026 22:54:23 +0100 Subject: [PATCH 14/14] Format fix after rebasing --- src/labthings_fastapi/testing.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/labthings_fastapi/testing.py b/src/labthings_fastapi/testing.py index 83c3de23..da890973 100644 --- a/src/labthings_fastapi/testing.py +++ b/src/labthings_fastapi/testing.py @@ -239,9 +239,7 @@ def mock_thing_instance(spec: type[ThingSubclass]) -> ThingSubclass: mock.__name__ = "Mock{spec.__name__}" mock.name = mock.__name__.lower() mock.__module__ = "mock_module" - mock._thing_server_interface = MockThingServerInterface( - mock.name, mock.__name__ - ) + mock._thing_server_interface = MockThingServerInterface(mock.name, mock.__name__) return mock