Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ filterwarnings = [
# with the line below.
"ignore:.get_validate_properties_on_set. will become .True. by default:DeprecationWarning",
]
anyio_mode = "auto"

[tool.ruff]
target-version = "py310"
Expand Down
86 changes: 22 additions & 64 deletions src/labthings_fastapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
TypeVar,
overload,
)
from weakref import WeakSet
import weakref
from fastapi import APIRouter, FastAPI, HTTPException, Response, Body, BackgroundTasks
from pydantic import BaseModel, create_model

from labthings_fastapi.message_broker import Message


from .middleware.url_for import URLFor
from .base_descriptor import (
BaseDescriptor,
Expand Down Expand Up @@ -74,7 +76,6 @@
)
from .thing_description import type_to_dataschema
from .thing_description._model import ActionAffordance, ActionOp, Form, LinkElement
from .utilities import labthings_data


if TYPE_CHECKING:
Expand Down Expand Up @@ -270,6 +271,20 @@
log=self.log,
)

def _publish_status(self) -> None:
"""Publish a status change event to any observers.

This should be called after each change to ``self._status``
"""
self.thing._thing_server_interface.publish(
Message(
thing=self.thing.name,
affordance=self.action.name, # type: ignore[attr-defined]
message_type="action",
payload=self._status.value,
)
)

def run(self) -> None:
"""Run the action and track progress.

Expand Down Expand Up @@ -305,7 +320,7 @@
add_thing_log_destination(self.id, self._log)
with invocation_contexts.set_invocation_id(self.id):
try:
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()

thing = self.thing
kwargs = model_to_dict(self.input)
Expand All @@ -321,7 +336,7 @@
with self._status_lock:
self._status = InvocationStatus.RUNNING
self._start_time = datetime.datetime.now()
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()

# Actually run the action
ret = action.func(thing, **kwargs, **self.dependencies)
Expand All @@ -336,12 +351,12 @@
with self._status_lock:
self._output_model_instance = output_model_instance
self._status = InvocationStatus.COMPLETED
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()
except InvocationCancelledError:
logger.info(f"Invocation {self.id} was cancelled.")
with self._status_lock:
self._status = InvocationStatus.CANCELLED
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()
except Exception as e: # skipcq: PYL-W0703
# First log
if isinstance(e, (InvocationError, InvalidReturnValueError)):
Expand All @@ -361,7 +376,7 @@
with self._status_lock:
self._status = InvocationStatus.ERROR
self._exception = e
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()
finally:
with self._status_lock:
self._end_time = datetime.datetime.now()
Expand Down Expand Up @@ -553,8 +568,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 572 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

571-572 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand All @@ -567,7 +582,7 @@
invocation.output.response
):
# TODO: honour "accept" header
return invocation.output.response()

Check warning on line 585 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

585 line is not covered with tests
try:
return serialise_from_user_code(
model_instance=invocation.output_model_instance,
Expand Down Expand Up @@ -603,8 +618,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 622 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

621-622 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand Down Expand Up @@ -799,7 +814,7 @@
"""
super().__set_name__(owner, name)
if self.name != self.func.__name__:
raise ValueError(

Check warning on line 817 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

817 line is not covered with tests
f"Action name '{self.name}' does not match function name "
f"'{self.func.__name__}'",
)
Expand Down Expand Up @@ -868,63 +883,6 @@

return partial(wrapped, obj)

def _observers_set(self, obj: Thing) -> WeakSet:
"""Return a set used to notify changes.

Note that we need to supply the `~lt.Thing` we are looking at, as in
general there may be more than one object of the same type, and
descriptor instances are shared between all instances of their class.

:param obj: The `~lt.Thing` on which the action is being observed.

:return: a weak set of callables to notify on changes to the action.
This is used by websocket endpoints.
"""
ld = labthings_data(obj)
if self.name not in ld.action_observers:
ld.action_observers[self.name] = WeakSet()
return ld.action_observers[self.name]

def emit_changed_event(self, obj: Thing, status: str) -> None:
"""Notify subscribers that the action status has changed.

This function is run from within the `.Invocation` thread that
is created when an action is called. It must be run from a thread
as it is communicating with the event loop via an `asyncio` blocking
portal. Async code must not use the blocking portal as it can deadlock
the event loop.

:param obj: The `~lt.Thing` on which the action is being observed.
:param status: The status of the action, to be sent to observers.
"""
obj._thing_server_interface.start_async_task_soon(
self.emit_changed_event_async,
obj,
status,
)

async def emit_changed_event_async(self, obj: Thing, value: Any) -> None:
"""Notify subscribers that the action status has changed.

This is an async function that must be run in the `anyio` event loop.
It will send messages to each observer to notify them that something
has changed.

:param obj: The `~lt.Thing` on which the action is defined.
`.ActionDescriptor` objects are unique to the class, but there may
be more than one `~lt.Thing` attached to a server with the same class.
We use ``obj`` to look up the observers of the current `~lt.Thing`.
:param value: The action status to communicate to the observers.
"""
action_name = self.name
for observer in self._observers_set(obj):
await observer.send(
{
"messageType": "actionStatus",
"data": {"action name": action_name, "status": value},
}
)

def add_to_fastapi(self, app: FastAPI, thing: Thing) -> None:
"""Add this action to a FastAPI app, bound to a particular Thing.

Expand Down Expand Up @@ -967,9 +925,9 @@
status_code=201,
code=self.func,
)
except InvalidReturnValueError as e:
thing.logger.error(e)
raise HTTPException(status_code=500, detail=str(e)) from e

Check warning on line 930 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

928-930 lines are not covered with tests

if issubclass(self.input_model, EmptyInput):
annotation = Body(default_factory=StrictEmptyInput)
Expand Down Expand Up @@ -1000,14 +958,14 @@
try:
responses[200]["model"] = self.output_model
pass
except AttributeError:
print(f"Failed to generate response model for action {self.name}")

Check warning on line 962 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

961-962 lines are not covered with tests
# Add an additional media type if we may return a file
if hasattr(self.output_model, "media_type"):
responses[200]["content"][self.output_model.media_type] = {}

Check warning on line 965 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

965 line is not covered with tests
# Now we can add the endpoint to the app.
if thing.path is None:
raise NotConnectedToServerError(

Check warning on line 968 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

968 line is not covered with tests
"Can't add the endpoint without thing.path!"
)
app.post(
Expand Down Expand Up @@ -1055,7 +1013,7 @@
"""
path = path or thing.path
if path is None:
raise NotConnectedToServerError("Can't generate forms without a path!")

Check warning on line 1016 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

1016 line is not covered with tests
forms = [
Form[ActionOp](href=path + self.name, op=[ActionOp.invokeaction]),
]
Expand Down
15 changes: 14 additions & 1 deletion src/labthings_fastapi/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ReadOnlyPropertyError(AttributeError):
class PropertyNotObservableError(RuntimeError):
"""The property is not observable.

This exception is raised when `~lt.Thing.observe_property` is called with a
This exception is raised when trying to observe a
property that is not observable. Currently, only data properties are
observable: functional properties (using a getter/setter) may not be
observed.
Expand Down Expand Up @@ -460,3 +460,16 @@ class GlobalLockBusyError(TimeoutError):
it. It indicates that the LabThings server is busy running another action or
property setter.
"""


class MessageDroppedWarning(RuntimeWarning):
Comment thread
rwb27 marked this conversation as resolved.
"""A message was dropped by the message broker.

This warning is emitted when a message can't be sent to a subscribed stream
because the stream's buffer is full. The message broker won't block, as
doing so could result in a potentially infinite number of stalled tasks.

If you see this warning, it means that a stream has been subscribed to
messages, but is not being read. Most likely, this means the stream was
not closed or deleted properly.
"""
151 changes: 151 additions & 0 deletions src/labthings_fastapi/message_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""Handle pub-sub style events.

Both properties and actions can emit events that may be observed. This module handles
all the pub-sub messaging in LabThings.
"""

import anyio
from pydantic.dataclasses import dataclass
from typing import Any, Literal
from weakref import WeakSet
import logging
import warnings

from anyio.streams.memory import MemoryObjectSendStream

from labthings_fastapi.exceptions import MessageDroppedWarning


LOGGER = logging.getLogger(__name__)


@dataclass
class Message:
"""A pub-sub event message.

This is the message that is sent when a property or action generates
an event.

This is a pydantic dataclass, so we validate the message. This might
change in the future for performance reasons.

:param thing: The name of the Thing generating the event.
:param affordance: The name of the affordance generating the event.
:param message_type: The kind of affordance from which the event originates.
:param payload: Data specific to the event (e.g. property value, action status).
"""

thing: str
affordance: str
message_type: Literal["property", "action"]
payload: Any
Comment thread
rwb27 marked this conversation as resolved.


class MessageBroker:
r"""A class that relays pub/sub messages.

This class takes care of relaying messages to streams that have subscribed to them.
It does not format messages or handle any details of e.g. websocket protocol.

Subscriptions require an `ObjectSendStream[Message]` and each time a `Message`
matching the subscription parameters (``thing`` and ``affordance``) is published,
it will be sent on that stream.

The broker does not validate thing or affordance names: that's up to the code
calling `MessageBroker.subscribe`\ .
"""

def __init__(self) -> None:
"""Initialise the message broker."""
# Note that we use a weak set below, so that when a websocket disconnects,
# its stream is removed automatically.
self._subscriptions: dict[
str, dict[str, WeakSet[MemoryObjectSendStream[Message]]]
] = {}

async def subscribe(
self, thing: str, affordance: str, stream: MemoryObjectSendStream[Message]
) -> None:
"""Subscribe to messages from a particular affordance.

Note that this method must be called from the event loop, as the message
broker is deliberately not thread safe.

:param thing: The name of the `.Thing` being subscribed to.
:param affordance: The name of the affordance being subscribed to.
:param stream: A stream to send the messages to.
:raises TypeError: if the `thing` or `affordance` argument is not a string.
"""
if not isinstance(thing, str):
Comment thread
rwb27 marked this conversation as resolved.
raise TypeError(f"`thing` must be a string, not '{thing}'.")
if not isinstance(affordance, str):
raise TypeError(f"`affordance` must be a string, not '{affordance}'.")
affordances = self._subscriptions.setdefault(thing, {})
streams = affordances.setdefault(affordance, WeakSet())
streams.add(stream)

async def unsubscribe(
self, thing: str, affordance: str, stream: MemoryObjectSendStream[Message]
) -> None:
"""Unsubscribe a stream from messages from a particular affordance.

This function is often not necessary: streams will be unsubscribed automatically
if they are closed or finalised. As the message broker only keeps a weak
reference to the stream, that means it will be finalised and unsubscribed
when the code that created it goes out of scope.

:param thing: The name of the `.Thing` being unsubscribed from.
:param affordance: The name of the affordance being unsubscribed from.
:param stream: The stream to unsubscribe.
:raises KeyError: if there is no such subscription.
:raises TypeError: if the `thing` or `affordance` argument is not a string.
"""
if not isinstance(thing, str):
Comment thread
rwb27 marked this conversation as resolved.
raise TypeError(f"`thing` must be a string, not '{thing}'.")
if not isinstance(affordance, str):
raise TypeError(f"`affordance` must be a string, not '{affordance}'.")
try:
self._subscriptions[thing][affordance].remove(stream)
except KeyError as e:
raise e
Comment thread
rwb27 marked this conversation as resolved.

async def publish(self, message: Message) -> None:
"""Publish a message.

This async method will relay the message to any subscriber streams.

:param message: the message to send.
"""
try:
subscriptions = self._subscriptions[message.thing][message.affordance]
except KeyError:
return # No subscribers for this thing.
subscriptions_to_remove = set()
for stream in subscriptions:
try:
stream.send_nowait(message)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Streams that have been closed will be automatically unsubscribed.
# They can't be reopened, so they won't be reused.
subscriptions_to_remove.add(stream)
except anyio.WouldBlock:
msg = f"Could not pass notification to {stream} as it was full."
LOGGER.warning(msg)
warnings.warn(MessageDroppedWarning(msg), stacklevel=1)
for stream in subscriptions_to_remove:
# discard rather than remove, so that if the stream has been finalised
# since it was closed, we don't get an error.
subscriptions.discard(stream)

async def close_streams(self) -> None:
"""Close all streams that are subscribed to receive messages.

This should be called when the server shuts down.
"""
# We use a task group so we shut down all streams concurrently, rather
# than waiting for each one to close.
async with anyio.create_task_group() as tg:
for thing_subs in self._subscriptions.values():
for subs in thing_subs.values():
for stream in subs:
tg.start_soon(stream.aclose)
Loading
Loading