From 421ac906109cc0e3a3e0f8d3d8182a4285b3db53 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 9 Oct 2025 23:15:48 +0100 Subject: [PATCH 01/11] Add invocation context module This new module uses context variables to provide cancellation and logging to action invocations. It will replace the various dependencies `InvocationID`, `InvocationLogger`, and `CancelHook`. --- src/labthings_fastapi/exceptions.py | 49 ++- src/labthings_fastapi/invocation_contexts.py | 345 +++++++++++++++++++ 2 files changed, 388 insertions(+), 6 deletions(-) create mode 100644 src/labthings_fastapi/invocation_contexts.py diff --git a/src/labthings_fastapi/exceptions.py b/src/labthings_fastapi/exceptions.py index 509b5fe..0c8e4c9 100644 --- a/src/labthings_fastapi/exceptions.py +++ b/src/labthings_fastapi/exceptions.py @@ -1,13 +1,8 @@ """A submodule for custom LabThings-FastAPI Exceptions.""" -# The "import x as x" syntax means symbols are interpreted as being re-exported, -# so they won't be flagged as unused by the linter. + # An __all__ for this module is less than helpful, unless we have an # automated check that everything's included. -from .dependencies.invocation import ( - InvocationCancelledError as InvocationCancelledError, -) -from .dependencies.invocation import InvocationError as InvocationError class NotConnectedToServerError(RuntimeError): @@ -91,3 +86,45 @@ class ThingConnectionError(RuntimeError): ThingConnection, for example because the named Thing does not exist, or is of the wrong type, or is not specified and there is no default. """ + + +class InvocationCancelledError(BaseException): + """An invocation was cancelled by the user. + + Note that this inherits from BaseException so won't be caught by + `except Exception`, it must be handled specifically. + + Action code may want to handle cancellation gracefully. This + exception should be propagated if the action's status should be + reported as ``cancelled``, or it may be handled so that the + action finishes, returns a value, and is marked as ``completed``. + + If this exception is handled and not re-raised, or if it arises in + a manually-created thread, the action will continue as normal. It + is a good idea to make sure your action terminates soon after this + exception is raised. + """ + + +class InvocationError(RuntimeError): + """The invocation ended in an anticipated error state. + + When this error is raised, action execution stops as expected. The exception will be + logged at error level without a traceback, and the invocation will return with + error status. + + Subclass this error for errors that do not need further traceback information + to be provided with the error message in logs. + """ + + +class NoInvocationContextError(RuntimeError): + """An invocation-specific resource has been requested from outside an invocation. + + This error is raised when the current invocation ID is requested, and there is no + current invocation ID. Invocation ID is determined from context (using a + `.ContextVar` ) and is available from within action functions. + + To avoid this error in test code or manually created threads, you should supply + an invocation context. + """ diff --git a/src/labthings_fastapi/invocation_contexts.py b/src/labthings_fastapi/invocation_contexts.py new file mode 100644 index 0000000..c8a8b56 --- /dev/null +++ b/src/labthings_fastapi/invocation_contexts.py @@ -0,0 +1,345 @@ +r"""Invocation-specific resources provided via context. + +This module provides key resources to code that runs as part of an action, +specifically a mechanism to allow cancellation, and a way to manage logging. +These replace the old dependencies ``CancelHook`` and ``InvocationLogger``\ . + +If you are writing action code and want to use logging or allow cancellation, +most of the time you should just use `.get_invocation_logger` or +`.cancellable_sleep` which are exposed as part of the top-level module. + +This module includes lower-level functions that are useful for testing or +managing concurrency. Many of these accept an ``id`` argument, which is +optional. If it is not supplied, we will use the context variables to find +the current invocation ID. +""" + +from collections.abc import Iterator, Mapping, Sequence +from contextvars import ContextVar +from contextlib import contextmanager +import logging +from threading import Event, Thread +from typing import Any, Callable +from typing_extensions import Self +from uuid import UUID, uuid4 +from weakref import WeakValueDictionary + +from .exceptions import InvocationCancelledError, NoInvocationContextError + + +invocation_id_ctx = ContextVar[UUID]("invocation_id_ctx") +"""Context variable storing the current invocation ID. + +Note that it is best not to access this directly. Using `.set_invocation_id` +is safer, as it ensures proper clean-up and continuity of the cancel event +associated with the invocation. +""" + + +def get_invocation_id() -> UUID: + """Return the current InvocationID. + + This function returns the ID of the current invocation. This is determined + from execution context: it will only succeed if it is called from an action + thread. + + If this function is called outside of an action thread, it will raise + an error. + + :return: the invocation ID of the current invocation. + :raises NoInvocationContextError: if called outside of an action thread. + """ + try: + return invocation_id_ctx.get() + except LookupError as e: + msg = "There is no invocation ID to return: this code was called from " + msg += "outside of an action thread." + raise NoInvocationContextError(msg) from e + + +@contextmanager +def set_invocation_id(id: UUID) -> Iterator[None]: + """Set the invocation ID associated with the current context. + + This is the preferred way to create a new invocation context. As well + as setting and cleaning up the invocation ID context variable, this + context manager ensures that the cancellation event persists and is + not accidentally reset because it's gone out of scope. + + :param id: The invocation ID to save in the context variable. + """ + token = invocation_id_ctx.set(id) + event = get_cancel_event(id) + try: + yield + finally: + invocation_id_ctx.reset(token) + del event + + +@contextmanager +def fake_invocation_context() -> Iterator[UUID]: + """Set a dummy invocation ID for a block of code. + + This function should be used in a ``with:`` block. + """ + id = uuid4() + with set_invocation_id(id): + yield id + + +class CancelEvent(Event): + """An Event subclass that enables cancellation of actions. + + This `threading.Event` subclass adds methods to raise + `.InvocationCancelledError` exceptions if the invocation is cancelled, + usually by a ``DELETE`` request to the invocation's URL. + """ + + _cancel_events: WeakValueDictionary[UUID, Self] = WeakValueDictionary() + "This class-level dictionary ensures only one event exists per invocation ID" + + def __init__(self, id: UUID) -> None: + """Initialise a cancellation event. + + Only one CancelEvent should exist per invocation. Trying to create a + second will raise an error. To avoid this, please use + `.CancelEvent.get_for_id` instead of the constructor. + + :param id: The invocation ID. + :raises RuntimeError: if a `.CancelEvent` has already been created for + the specified invocation ID. + """ + super().__init__() + self.invocation_id = id + if id in self.__class__._cancel_events: + msg = f"Tried to create a second CancelEvent for invocation {id}. " + msg += "Use `CancelEvent.get_for_id` to avoid this error." + raise RuntimeError(msg) + self.__class__._cancel_events[id] = self + + @classmethod + def get_for_id(cls, id: UUID) -> Self: + """Obtain the `.CancelEvent` for a particular Invocation ID. + + This is a safe way to obtain an instance of this class, though + the top-level function `.get_cancel_event` is recommended. + + Only one `.CancelEvent` should exist per Invocation. This method + will either create one, or return the existing one. + + :param id: The invocation ID. + """ + try: + return cls._cancel_events[id] + except KeyError: + return cls(id) + + def raise_if_set(self) -> None: + """Raise an exception if the event is set. + + An exception will be raised if the event has been set. + Before raising the exception, we clear the event. This means that setting + the event should raise exactly one exception, and that handling the exception + should result in the action continuing to run. + + This is intended as a compact alternative to: + + .. code-block:: + + if cancel_event.is_set(): + cancel_event.clear() + raise InvocationCancelledError() + + :raise InvocationCancelledError: if the event has been cancelled. + """ + if self.is_set(): + self.clear() + raise InvocationCancelledError("The action was cancelled.") + + def sleep(self, timeout: float) -> None: + r"""Sleep for a given time in seconds, but raise an exception if cancelled. + + This function can be used in place of `time.sleep`. It will usually behave + the same as `time.sleep`\ , but if the cancel event is set during the time + when we are sleeping, an exception is raised to interrupt the sleep and + cancel the action. The event is cleared before raising the exception. This + means that handling the exception is sufficient to allow the action to + continue. + + :param timeout: The time to sleep for, in seconds. + + :raise InvocationCancelledError: if the event has been cancelled. + """ + if self.wait(timeout): + self.clear() + raise InvocationCancelledError("The action was cancelled.") + + +def get_cancel_event(id: UUID | None = None) -> CancelEvent: + """Obtain an event that permits actions to be cancelled. + + :param id: The invocation ID. This will be determined from + context if not supplied. + :return: an event that allows the current invocation to be cancelled. + """ + if id is None: + id = get_invocation_id() + return CancelEvent.get_for_id(id) + + +def cancellable_sleep(interval: float | None) -> None: + """Sleep for a specified time, allowing cancellation. + + This function should be called from action functions instead of + `time.sleep` to allow them to be cancelled. Usually, this + function is equivalent to `time.sleep` (it waits the specified + number of seconds). If the action is cancelled during the sleep, + it will raise an `.InvocationCancelledError` to signal that the + action should finish. + + .. warning:: + + This function uses `.Event.wait` internally, which suffers + from timing errors on some platforms: it may have error of + around 10-20ms. If that's a problem, consider using + `time.sleep` instead. ``lt.cancellable_sleep(None)`` may then + be used to allow cancellation. + + This function may only be called from an action thread, as it + depends on the invocation ID being available from a context variable. + Use `.set_invocation_id` to make it available outside of an action + thread. + + If ``interval`` is set to None, we do not call `.Event.wait` but + instead we simply check whether the event is set. + + :param interval: The length of time to sleep for, in seconds. If it + is `None` we won't wait, but we will still check for a cancel + event, and raise the exception if it is set. + """ + event = get_cancel_event() + if interval is None: + event.raise_if_set() + else: + event.sleep(interval) + + +def get_invocation_logger(id: UUID | None = None) -> logging.Logger: + """Obtain a logger for the current invocation. + + Use this function to get a logger to use in action code. This + will associate the log messages with the invocation, so that + they may be used as status updates or related to a particular run + of the action. + + :param id: the invocation ID. This will be determined from context + so need not be specified in action code. + :return: a logger that is specific to a particular invocation of + an action. + """ + if id is None: + id = get_invocation_id() + logger = logging.getLogger(f"labthings_fastapi.actions.{id}") + return logger + + +class ThreadWithInvocationID(Thread): + """A thread that sets a new invocation ID. + + This is a subclass of `threading.Thread` and works very much the + same way. It implements its functionality by overriding the ``run`` + method, so this should not be overridden again - you should instead + specify the code to run using the ``target`` argument. + """ + + def __init__( + self, + target: Callable, + args: Sequence[Any] | None = None, + kwargs: Mapping[str, Any] | None = None, + *super_args: Any, + **super_kwargs: Any, + ) -> None: + r"""Initialise a thread with invocation ID. + + :param target: the function to call in the thread. + :param args: positional arguments to ``target``\ . + :param kwargs: keyword arguments to ``target``\ . + :param \*super_args: arguments passed to `threading.Thread`\ . + :param \*\*super_kwargs: keyword arguments passed to `threading.Thread`\ . + """ + super().__init__(*super_args, **super_kwargs) + self._target = target + self._args = args or [] + self._kwargs = kwargs or {} + self._invocation_id: UUID = uuid4() + self._result: Any = None + self._exception: BaseException | None = None + + @property + def invocation_id(self) -> UUID: + """The InvocationID of this thread.""" + return self._invocation_id + + @property + def result(self) -> Any: + """The return value of the target function.""" + return self._result + + @property + def exception(self) -> BaseException | None: + """The exception raised by the target function, or None.""" + return self._exception + + def cancel(self) -> None: + """Set the cancel event to tell the code to terminate.""" + get_cancel_event(self.invocation_id).set() + + def join_and_propagate_cancel(self, poll_interval: float = 0.2) -> None: + """Wait for the thread to finish, and propagate cancellation. + + This function wraps `threading.Thread.join` but periodically checks if + the calling thread has been cancelled. If it has, it will cancel the + thread, before attempting to ``join`` it again. + + :param poll_interval: How often to check for cancellation of the + calling thread, in seconds. + """ + cancellation: InvocationCancelledError | None = None + self._polls = 0 + self._attempted_cancels = 0 + print(f"Checking for cancellation of invocation {get_invocation_id()}") + print(f"so we can cancel {self.invocation_id}") + while self.is_alive(): + try: + cancellable_sleep(None) + self._polls += 1 + except InvocationCancelledError as e: + # Propagate the cancellation signal to the thread + cancellation = e + self.cancel() + self._attempted_cancels += 1 + self.join(timeout=poll_interval) + if cancellation is not None: + # If the action was cancelled, propagate the cancellation + # after the thread has been joined. + # Note that, regardless of how many times the thread was + # cancelled, we will only raise one exception after the + # calling thread was joined. + raise InvocationCancelledError() from cancellation + + def run(self) -> None: + """Run the target function, with invocation ID set in the context variable.""" + try: + with set_invocation_id(self.invocation_id): + if self._target is not None: + self._result = self._target(*self._args, **self._kwargs) + except BaseException as e: # noqa: BLE001 + # This catch-all Except allows us to access exceptions + # in the parent thread + self._exception = e + finally: + # Avoid a refcycle if the thread is running a function with + # an argument that has a member that points to the thread. + del self._target, self._args, self._kwargs From 4f9efd1b2bbcaeedeb5e087af716d6bc3c1df9c3 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 9 Oct 2025 23:18:08 +0100 Subject: [PATCH 02/11] Migrate actions and add to __init__ The actions module now uses the new cancellation/logging code. Said code is available via top-level imports. I'm currently still generating invocation IDs using the dependency. This will need to stay until we remove the dependency, as without it the other dependencies (`CancelHook` and `InvocationLogger`) will break. --- src/labthings_fastapi/__init__.py | 8 ++ src/labthings_fastapi/actions/__init__.py | 35 ++++---- .../dependencies/invocation.py | 83 +------------------ src/labthings_fastapi/descriptors/action.py | 1 - 4 files changed, 29 insertions(+), 98 deletions(-) diff --git a/src/labthings_fastapi/__init__.py b/src/labthings_fastapi/__init__.py index e68e850..affd56f 100644 --- a/src/labthings_fastapi/__init__.py +++ b/src/labthings_fastapi/__init__.py @@ -32,6 +32,11 @@ from .outputs import blob from .server import ThingServer, cli from .client import ThingClient +from .invocation_contexts import ( + get_invocation_logger, + cancellable_sleep, + ThreadWithInvocationID, +) # The symbols in __all__ are part of our public API. # They are imported when using `import labthings_fastapi as lt`. @@ -55,4 +60,7 @@ "ThingServer", "cli", "ThingClient", + "get_invocation_logger", + "cancellable_sleep", + "ThreadWithInvocationID", ] diff --git a/src/labthings_fastapi/actions/__init__.py b/src/labthings_fastapi/actions/__init__.py index 098a92f..fbd9a55 100644 --- a/src/labthings_fastapi/actions/__init__.py +++ b/src/labthings_fastapi/actions/__init__.py @@ -30,13 +30,17 @@ from ..utilities.introspection import EmptyInput from ..thing_description._model import LinkElement from .invocation_model import InvocationModel, InvocationStatus, LogRecordModel -from ..dependencies.invocation import ( - CancelHook, +from ..exceptions import ( InvocationCancelledError, InvocationError, - invocation_logger, ) from ..outputs.blob import BlobIOContextDep, blobdata_to_url_ctx +from ..invocation_contexts import ( + CancelEvent, + get_cancel_event, + set_invocation_id, + get_invocation_logger, +) if TYPE_CHECKING: # We only need these imports for type hints, so this avoids circular imports. @@ -77,7 +81,6 @@ def __init__( input: Optional[BaseModel] = None, dependencies: Optional[dict[str, Any]] = None, log_len: int = 1000, - cancel_hook: Optional[CancelHook] = None, ) -> None: """Create a thread to run an action and track its outputs. @@ -96,8 +99,6 @@ def __init__( FastAPI by its dependency injection mechanism. :param log_len: sets the number of log entries that will be held in memory by the invocation's logger. - :param cancel_hook: is a `threading.Event` subclass that tells the - invocation it's time to stop. See `.CancelHook`. """ Thread.__init__(self, daemon=True) @@ -106,7 +107,6 @@ def __init__( self.thing_ref = weakref.ref(thing) self.input = input if input is not None else EmptyInput() self.dependencies = dependencies if dependencies is not None else {} - self.cancel_hook = cancel_hook # A UUID for the Invocation (not the same as the threading.Thread ident) self._ID = id # Task ID @@ -195,14 +195,18 @@ def thing(self) -> Thing: raise RuntimeError("The `Thing` on which an action was run is missing!") return thing + @property + def cancel_hook(self) -> CancelEvent: + """The cancel event associated with this Invocation.""" + return get_cancel_event(self.id) + def cancel(self) -> None: """Cancel the task by requesting the code to stop. This is an opt-in feature: the action must use a `.CancelHook` dependency and periodically check it. """ - if self.cancel_hook is not None: - self.cancel_hook.set() + self.cancel_hook.set() def response(self, request: Optional[Request] = None) -> InvocationModel: """Generate a representation of the invocation suitable for HTTP. @@ -273,7 +277,8 @@ def run(self) -> None: # Create a logger just for this invocation, keyed to the invocation id # Logs that go to this logger will be copied into `self._log` handler = DequeLogHandler(dest=self._log) - logger = invocation_logger(self.id) + logger = get_invocation_logger(self.id) + logger.setLevel(logging.INFO) logger.addHandler(handler) try: action.emit_changed_event(self.thing, self._status.value) @@ -291,8 +296,10 @@ def run(self) -> None: self._start_time = datetime.datetime.now() action.emit_changed_event(self.thing, self._status.value) - # The next line actually runs the action. - ret = action.__get__(thing)(**kwargs, **self.dependencies) + bound_method = action.__get__(thing) + # Actually run the action + with set_invocation_id(self.id): + ret = bound_method(**kwargs, **self.dependencies) with self._status_lock: self._return_value = ret @@ -390,7 +397,6 @@ def invoke_action( id: uuid.UUID, input: Any, dependencies: dict[str, Any], - cancel_hook: CancelHook, ) -> Invocation: """Invoke an action, returning the thread where it's running. @@ -409,8 +415,6 @@ def invoke_action( keyword arguments. :param dependencies: is a dictionary of keyword arguments, supplied by FastAPI by its dependency injection mechanism. - :param cancel_hook: is a `threading.Event` subclass that tells the - invocation it's time to stop. See `.CancelHook`. :return: an `.Invocation` object that has been started. """ @@ -420,7 +424,6 @@ def invoke_action( input=input, dependencies=dependencies, id=id, - cancel_hook=cancel_hook, ) self.append_invocation(thread) thread.start() diff --git a/src/labthings_fastapi/dependencies/invocation.py b/src/labthings_fastapi/dependencies/invocation.py index 8ed80f7..51f95dd 100644 --- a/src/labthings_fastapi/dependencies/invocation.py +++ b/src/labthings_fastapi/dependencies/invocation.py @@ -36,7 +36,7 @@ from typing import Annotated from fastapi import Depends import logging -import threading +from ..invocation_contexts import CancelEvent, get_invocation_logger def invocation_id() -> uuid.UUID: @@ -87,9 +87,7 @@ def invocation_logger(id: InvocationID) -> logging.Logger: :return: A `logging.Logger` object specific to this invocation. """ - logger = logging.getLogger(f"labthings_fastapi.actions.{id}") - logger.setLevel(logging.INFO) - return logger + return get_invocation_logger(id) InvocationLogger = Annotated[logging.Logger, Depends(invocation_logger)] @@ -100,83 +98,6 @@ def invocation_logger(id: InvocationID) -> logging.Logger: """ -class InvocationCancelledError(BaseException): - """An invocation was cancelled by the user. - - Note that this inherits from BaseException so won't be caught by - `except Exception`, it must be handled specifically. - - Action code may want to handle cancellation gracefully. This - exception should be propagated if the action's status should be - reported as ``cancelled``, or it may be handled so that the - action finishes, returns a value, and is marked as ``completed``. - - If this exception is handled, the `.CancelEvent` should be reset - to allow another `.InvocationCancelledError` to be raised if the - invocation receives a second cancellation signal. - """ - - -class InvocationError(RuntimeError): - """The invocation ended in an anticipated error state. - - When this error is raised, action execution stops as expected. The exception will be - logged at error level without a traceback, and the invocation will return with - error status. - - Subclass this error for errors that do not need further traceback information - to be provided with the error message in logs. - """ - - -class CancelEvent(threading.Event): - """An Event subclass that enables cancellation of actions. - - This `threading.Event` subclass adds methods to raise - `.InvocationCancelledError` exceptions if the invocation is cancelled, - usually by a ``DELETE`` request to the invocation's URL. - """ - - def __init__(self, id: InvocationID) -> None: - """Initialise the cancellation event. - - :param id: The invocation ID, annotated as a dependency so it is - supplied automatically by FastAPI. - """ - threading.Event.__init__(self) - self.invocation_id = id - - def raise_if_set(self) -> None: - """Raise an exception if the event is set. - - This is intended as a compact alternative to: - - .. code-block:: - - if cancel_event.is_set(): - raise InvocationCancelledError() - - :raise InvocationCancelledError: if the event has been cancelled. - """ - if self.is_set(): - raise InvocationCancelledError("The action was cancelled.") - - def sleep(self, timeout: float) -> None: - r"""Sleep for a given time in seconds, but raise an exception if cancelled. - - This function can be used in place of `time.sleep`. It will usually behave - the same as `time.sleep`\ , but if the cancel event is set during the time - when we are sleeping, an exception is raised to interrupt the sleep and - cancel the action. - - :param timeout: The time to sleep for, in seconds. - - :raise InvocationCancelledError: if the event has been cancelled. - """ - if self.wait(timeout): - raise InvocationCancelledError("The action was cancelled.") - - def invocation_cancel_hook(id: InvocationID) -> CancelHook: """Make a cancel hook for a particular invocation. diff --git a/src/labthings_fastapi/descriptors/action.py b/src/labthings_fastapi/descriptors/action.py index 1c3fa33..cd2ef6d 100644 --- a/src/labthings_fastapi/descriptors/action.py +++ b/src/labthings_fastapi/descriptors/action.py @@ -267,7 +267,6 @@ def start_action( input=body, dependencies=dependencies, id=id, - cancel_hook=cancel_hook, ) background_tasks.add_task(action_manager.expire_invocations) return action.response(request=request) From d4899107ec599a003409ea116778b1e9b8327c9e Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 9 Oct 2025 23:19:19 +0100 Subject: [PATCH 03/11] Tests and documentation. I've not made a dedicated page (yet) but have added this to the conceptual docs on actions and concurrency. Test code achieves 100% coverage of the new module from `test_invocation_contexts`. --- docs/source/actions.rst | 44 ++++++ docs/source/concurrency.rst | 8 + tests/test_invocation_contexts.py | 246 ++++++++++++++++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 tests/test_invocation_contexts.py diff --git a/docs/source/actions.rst b/docs/source/actions.rst index ffad7b5..1aa201f 100644 --- a/docs/source/actions.rst +++ b/docs/source/actions.rst @@ -58,6 +58,50 @@ The first is ``self`` (the first positional argument), which is always the supply resources needed by the action. Most often, this is a way of accessing other `.Things` on the same server. +.. action_logging: +Logging from actions +-------------------- +It's helpful to be able to keep track of the log messages from action code. +This may be used to display status updates to the user when an action takes +a long time to run, or it may simply be a helpful debugging aid. To log +messages that are associated with a particular invocation of an action, +use ``lt.get_invocation_logger()`` to obtain a `logging.Logger` that will +associate its messages with the current invocation. + +Logs are available over HTTP. If an action is run from another action, +it will use the same logger as its parent action. + +.. action_cancellation: +Cancelling actions +------------------ +If an action could run for a long time, it is useful to be able to cancel it +cleanly. LabThings makes provision for this by allowing actions to be cancelled +using a ``DELETE`` HTTP request. In order to allow an action to be cancelled, +you must give LabThings opportunities to interrupt it. This is most often done +by replacing a `time.sleep()` statement with `.cancellable_sleep()` which +is equivalent, but will raise an exception if the action is cancelled. + +For more advanced options, see `.invocation_contexts` for detail. + +.. invocation_context: +Invocation contexts +------------------- +Cancelling actions and capturing their logs requires action code to use a +specific logger and check for cancel events. This is done using `contextvars` +such that the action code can use module-level symbols rather than needing +to explicitly pass the logger and cancel hook as arguments to the action +method. + +Usually, you don't need to consider this mechanism: simply use the invocation +logger or cancel hook as explained above. However, if you want to run actions +outside of the server (for example, for testing purposes) or if you want to +call one action from another action, but not share the cancellation signal +or log, functions are provided in `.invocation_contexts` to manage this. + +If you start a new thread from an action, code running in that thread will +not have the invocation ID set in a context variable. A subclass of +`threading.Thread` is provided to do this, `.ThreadWithInvocationID`\ . + Raising exceptions ------------------ If an action raises an unhandled exception, the action will terminate with an Error diff --git a/docs/source/concurrency.rst b/docs/source/concurrency.rst index 6a5becf..2a25b64 100644 --- a/docs/source/concurrency.rst +++ b/docs/source/concurrency.rst @@ -22,3 +22,11 @@ Calling Things from other Things When one `Thing` calls the actions or properties of another `.Thing`, either directly or via a `.DirectThingClient`, no new threads are spawned: the action or property is run in the same thread as the caller. This mirrors the behaviour of the `.ThingClient`, which blocks until the action or property is complete. See :doc:`using_things` for more details on how to call actions and properties of other Things. +Invocations and concurrency +--------------------------- + +Each time an action is run ("invoked" in :ref:`wot_cc`), we create a new thread to run it. This thread has a context variable set, such that ``lt.cancellable_sleep`` and ``lt.get_invocation_logger`` are aware of which invocation is currently running. If an action spawns a new thread (e.g. using `threading.Thread`\ ), this new thread will not have an invocation ID, and consequently the two invocation-specific functions mentioned will not work. + +Usually, the best solution to this problem is to generate a new invocation ID for the thread. This means only the original action thread will receive cancellation events, and only the original action thread will log to the invocation logger. If the action is cancelled, you must cancel the background thread. This is the behaviour of `.ThreadWithInvocationID`\ . + +It is also possible to copy the current invocation ID to a new thread. This is often a bad idea, as it's ill-defined whether the exception will arise in the original thread or the new one if the invocation is cancelled. Logs from the two threads will also be interleaved. If it's desirable to log from the background thread, the invocation logger may safely be passed as an argument, rather than accessed via ``lt.get_invocation_logger``\ . diff --git a/tests/test_invocation_contexts.py b/tests/test_invocation_contexts.py new file mode 100644 index 0000000..65595c8 --- /dev/null +++ b/tests/test_invocation_contexts.py @@ -0,0 +1,246 @@ +"""Test logging and cancellation, implemented via contextvars. + +These tests cover the code in `invocation_contexts` directly. They are also tested +in the context of a ``ThingServer`` in, for example, ``test_action_logging`` and +``test_action_cancel`` . +""" + +from contextlib import contextmanager +import time +import pytest +import uuid +from threading import Thread +from labthings_fastapi import invocation_contexts as ic +from labthings_fastapi import exceptions as exc + + +def append_invocation_id(ids: list): + """Append the current invocation ID (or the error) to a list.""" + try: + ids.append(ic.get_invocation_id()) + except exc.NoInvocationContextError as e: + ids.append(e) + + +def test_getting_and_setting_id(): + """Check the invocation context variable behaves as expected.""" + + # By default, the invocation id is not set + assert ic.invocation_id_ctx.get(...) is ... + + # This means we get an error if we look for the ID + with pytest.raises(exc.NoInvocationContextError): + ic.get_invocation_id() + + # Once we set an ID, it should be returned + id = uuid.uuid4() + with ic.set_invocation_id(id): + assert ic.get_invocation_id() == id + + # It should be reset afterwards + with pytest.raises(exc.NoInvocationContextError): + ic.get_invocation_id() + + # A context manager lets us fake the ID for testing + with ic.fake_invocation_context(): + assert isinstance(ic.get_invocation_id(), uuid.UUID) + + # This should also be reset afterwards + with pytest.raises(exc.NoInvocationContextError): + ic.get_invocation_id() + + # A new thread will not copy the context by default, so using + # get_invocation_id in a thread will fail: + with ic.fake_invocation_context(): + before = ic.get_invocation_id() + ids = [] + t = Thread(target=append_invocation_id, args=[ids]) + t.start() + t.join() + after = ic.get_invocation_id() + + assert before == after + assert len(ids) == 1 + assert isinstance(ids[0], exc.NoInvocationContextError) + + +@contextmanager +def assert_takes_time(min_t: float | None, max_t: float | None): + """Assert that a code block takes a certain amount of time.""" + before = time.time() + yield + after = time.time() + duration = after - before + if min_t is not None: + assert duration >= min_t + if max_t is not None: + assert duration <= max_t + + +def test_cancel_event(): + """Check the cancel event works as intended.""" + id = uuid.uuid4() + event = ic.CancelEvent.get_for_id(id) + + # We should get back the same event if we call this twice + assert event is ic.CancelEvent.get_for_id(id) + # The function below is equivaent to the class method above. + assert event is ic.get_cancel_event(id) + + # We should not be able to make a second one with the constructor + with pytest.raises(RuntimeError): + ic.CancelEvent(id) + + # We make a second event with a different ID. We'll use the constructor + # directly, as this should work the first time it's called (as there is + # no existing event). + id2 = uuid.uuid4() + event2 = ic.CancelEvent(id2) + assert event2 is ic.CancelEvent.get_for_id(id2) + assert event2 is not event + assert ic.get_cancel_event(id2) is event2 + + # The module-level function falls back on the context variable for ID, + # so it should raise an exception if the ID isn't present: + with pytest.raises(exc.NoInvocationContextError): + ic.get_cancel_event() + + # If we have an invocation ID in the context, this should succeed even + # if we've not made an event yet. + with ic.fake_invocation_context(): + assert isinstance(ic.get_cancel_event(), ic.CancelEvent) + + # The two custom functions should raise `InvocationCancelledError` if + # the event is set, so we'll run them both with it set and not set. + # raise_if_set should do nothing if the event is not set. + assert not event.is_set() + event.raise_if_set() + # it should raise an exception if the event is set. + event.set() + with pytest.raises(exc.InvocationCancelledError): + event.raise_if_set() + # When the event raises an exception, it resets - one `set()` == one error. + assert not event.is_set() + + # sleep behaves the same way, but waits a finite time. + with assert_takes_time(0.02, 0.04): + event.sleep(0.02) + # check an exception gets raised and reset if appropriate + event.set() + with pytest.raises(exc.InvocationCancelledError): + event.sleep(1) + assert not event.is_set() + + +def test_cancellable_sleep(): + """Check the module-level cancellable sleep.""" + with pytest.raises(exc.NoInvocationContextError): + ic.cancellable_sleep(1) + with pytest.raises(exc.NoInvocationContextError): + ic.cancellable_sleep(None) + + with ic.fake_invocation_context(): + event = ic.get_cancel_event() + + # the function should wait a finite time + with assert_takes_time(0.02, 0.04): + ic.cancellable_sleep(0.02) + + # passing `None` should return immediately. + with assert_takes_time(None, 0.002): + ic.cancellable_sleep(None) + + # check an exception gets raised and reset if appropriate + event.set() + with pytest.raises(exc.InvocationCancelledError): + ic.cancellable_sleep(1) + assert not event.is_set() + + # check an exception gets raised and reset if appropriate + event.set() + with pytest.raises(exc.InvocationCancelledError): + ic.cancellable_sleep(None) + assert not event.is_set() + + +def test_invocation_logger(): + """Check `get_invocation_logger` behaves correctly.""" + # The function simply returns a logger with the ID in the name. + fake_id = uuid.uuid4() + logger = ic.get_invocation_logger(fake_id) + assert logger.name.endswith(str(fake_id)) + + # The ID is taken from context if not supplied. + with pytest.raises(exc.NoInvocationContextError): + ic.get_invocation_logger() + with ic.fake_invocation_context(): + logger = ic.get_invocation_logger() + id = ic.get_invocation_id() + assert logger.name.endswith(str(id)) + + +def run_function_in_thread_and_propagate_cancellation(func, *args): + """Run a function in a ThreadWithInvocationID.""" + t = ic.ThreadWithInvocationID(target=func, args=args) + t.start() + try: + t.join_and_propagate_cancel(0.005) + except exc.InvocationCancelledError: + # We still want to return the finished thread if it's + # cancelled. + pass + return t + + +def test_thread_with_invocation_id(): + """Test our custom thread subclass makes a new ID and can be cancelled.""" + ids = [] + t = ic.ThreadWithInvocationID(target=append_invocation_id, args=[ids]) + assert isinstance(t.invocation_id, uuid.UUID) + t.start() + t.join() + assert len(ids) == 1 + assert ids[0] == t.invocation_id + assert t.exception is None + assert t.result is None + + # Check cancellable sleep works in the thread + t = ic.ThreadWithInvocationID(target=ic.cancellable_sleep, args=[1]) + assert isinstance(t.invocation_id, uuid.UUID) + t.start() + t.cancel() + with assert_takes_time(None, 0.1): + t.join() + assert isinstance(t.exception, exc.InvocationCancelledError) + + # Check we capture the return value + t = ic.ThreadWithInvocationID(target=lambda: True) + t.start() + t.join() + assert t.exception is None + assert t.result is True + + # Check we can propagate cancellation. + # First, we run `cancellable_sleep` and check it doesn't cancel + with ic.fake_invocation_context(): + # First test our function - there is only one thread here, and we + # check it finishes and doesn't error. + t = run_function_in_thread_and_propagate_cancellation( + ic.cancellable_sleep, 0.001 + ) + assert isinstance(t, ic.ThreadWithInvocationID) + assert not t.is_alive() + assert t.exception is None + + # Next, we run it in a thread, and cancel that thread. + # The error should propagate to the inner thread. + t = ic.ThreadWithInvocationID( + target=run_function_in_thread_and_propagate_cancellation, + args=[ic.cancellable_sleep, 10], + ) + t.start() + t.cancel() + with assert_takes_time(None, 0.05): + t.join() + assert isinstance(t.result, ic.ThreadWithInvocationID) + assert isinstance(t.result.exception, exc.InvocationCancelledError) From f2ef64424c0c104bf74554aaf8c8520684480147 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 9 Oct 2025 23:24:26 +0100 Subject: [PATCH 04/11] Docstring fixes --- src/labthings_fastapi/invocation_contexts.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/labthings_fastapi/invocation_contexts.py b/src/labthings_fastapi/invocation_contexts.py index c8a8b56..a1019d9 100644 --- a/src/labthings_fastapi/invocation_contexts.py +++ b/src/labthings_fastapi/invocation_contexts.py @@ -82,6 +82,8 @@ def fake_invocation_context() -> Iterator[UUID]: """Set a dummy invocation ID for a block of code. This function should be used in a ``with:`` block. + + :yields: the created invocation ID. """ id = uuid4() with set_invocation_id(id): @@ -129,6 +131,7 @@ def get_for_id(cls, id: UUID) -> Self: will either create one, or return the existing one. :param id: The invocation ID. + :return: the cancel event for the given ``id`` . """ try: return cls._cancel_events[id] @@ -303,8 +306,15 @@ def join_and_propagate_cancel(self, poll_interval: float = 0.2) -> None: the calling thread has been cancelled. If it has, it will cancel the thread, before attempting to ``join`` it again. + Note that, if the invocation that calls this function is cancelled + while the function is running, the exception will propagate, i.e. + you should handle `.InvocationCancelledError` unless you wish + your invocation to terminate if it is cancelled. + :param poll_interval: How often to check for cancellation of the calling thread, in seconds. + :raises InvocationCancelledError: if this invocation is cancelled + while waiting for the thread to join. """ cancellation: InvocationCancelledError | None = None self._polls = 0 From fd4b610bc515c706afbee0af120cb9b20a4a06ad Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 9 Oct 2025 23:36:41 +0100 Subject: [PATCH 05/11] Migrate test code Code testing dependencies is now moved into a submodule, pending their removal. The tests for cancellation and logging are duplicated: the old copies are in the submodule, but I have also migrated them to the new API in the main tests folder. --- tests/old_dependency_tests/__init__.py | 0 .../module_with_deps.py | 0 .../test_action_cancel.py | 204 ++++++++++++++++++ .../test_action_logging.py | 127 +++++++++++ .../test_dependencies.py | 0 .../test_dependencies_2.py | 0 .../test_dependency_metadata.py | 2 +- .../test_directthingclient.py | 2 +- .../test_thing_dependencies.py | 2 +- tests/test_action_cancel.py | 16 +- tests/test_action_logging.py | 7 +- 11 files changed, 344 insertions(+), 16 deletions(-) create mode 100644 tests/old_dependency_tests/__init__.py rename tests/{ => old_dependency_tests}/module_with_deps.py (100%) create mode 100644 tests/old_dependency_tests/test_action_cancel.py create mode 100644 tests/old_dependency_tests/test_action_logging.py rename tests/{ => old_dependency_tests}/test_dependencies.py (100%) rename tests/{ => old_dependency_tests}/test_dependencies_2.py (100%) rename tests/{ => old_dependency_tests}/test_dependency_metadata.py (98%) rename tests/{ => old_dependency_tests}/test_directthingclient.py (99%) rename tests/{ => old_dependency_tests}/test_thing_dependencies.py (99%) diff --git a/tests/old_dependency_tests/__init__.py b/tests/old_dependency_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/module_with_deps.py b/tests/old_dependency_tests/module_with_deps.py similarity index 100% rename from tests/module_with_deps.py rename to tests/old_dependency_tests/module_with_deps.py diff --git a/tests/old_dependency_tests/test_action_cancel.py b/tests/old_dependency_tests/test_action_cancel.py new file mode 100644 index 0000000..cf79c41 --- /dev/null +++ b/tests/old_dependency_tests/test_action_cancel.py @@ -0,0 +1,204 @@ +""" +This tests the log that is returned in an action invocation +""" + +import uuid +import pytest +from fastapi.testclient import TestClient +from ..temp_client import poll_task, task_href +import labthings_fastapi as lt +import time + + +class CancellableCountingThing(lt.Thing): + counter: int = lt.property(default=0) + check: bool = lt.property(default=False) + """Whether the count has been cancelled. + + This variable is used to check that the action can detect a cancel event + and react by performing another task, in this case, setting this variable. + """ + + @lt.thing_action + def count_slowly(self, cancel: lt.deps.CancelHook, n: int = 10): + for _i in range(n): + try: + cancel.sleep(0.1) + except lt.exceptions.InvocationCancelledError as e: + # Set check to true to show that cancel was called. + self.check = True + raise (e) + self.counter += 1 + + @lt.thing_action + def count_slowly_but_ignore_cancel(self, cancel: lt.deps.CancelHook, n: int = 10): + """ + Used to check that cancellation alter task behaviour + """ + counting_increment = 1 + for _i in range(n): + try: + cancel.sleep(0.1) + except lt.exceptions.InvocationCancelledError: + # Rather than cancel, this disobedient task just counts faster + counting_increment = 3 + self.counter += counting_increment + + @lt.thing_action + def count_and_only_cancel_if_asked_twice( + self, cancel: lt.deps.CancelHook, n: int = 10 + ): + """ + A task that changes behaviour on cancel, but if asked a second time will cancel + """ + cancelled_once = False + counting_increment = 1 + for _i in range(n): + try: + cancel.sleep(0.1) + except lt.exceptions.InvocationCancelledError as e: + # If this is the second time, this is called actually cancel. + if cancelled_once: + raise (e) + # If not, remember that this cancel event happened. + cancelled_once = True + # Reset the CancelHook + cancel.clear() + # Count backwards instead! + counting_increment = -1 + self.counter += counting_increment + + +@pytest.fixture +def server(): + """Create a server with a CancellableCountingThing added.""" + server = lt.ThingServer() + server.add_thing("counting_thing", CancellableCountingThing) + return server + + +@pytest.fixture +def counting_thing(server): + """Retrieve the CancellableCountingThing from the server.""" + return server.things["counting_thing"] + + +@pytest.fixture +def client(server): + with TestClient(server.app) as client: + yield client + + +def test_invocation_cancel(counting_thing, client): + """ + Test that an invocation can be cancelled and the associated + exception handled correctly. + """ + assert counting_thing.counter == 0 + assert not counting_thing.check + response = client.post("/counting_thing/count_slowly", json={}) + response.raise_for_status() + # Use `client.delete` to cancel the task! + cancel_response = client.delete(task_href(response.json())) + # Raise an exception is this isn't a 2xx response + cancel_response.raise_for_status() + invocation = poll_task(client, response.json()) + assert invocation["status"] == "cancelled" + assert counting_thing.counter < 9 + # Check that error handling worked + assert counting_thing.check + + +def test_invocation_that_refuses_to_cancel(counting_thing, client): + """ + Test that an invocation can detect a cancel request but choose + to modify behaviour. + """ + assert counting_thing.counter == 0 + response = client.post( + "/counting_thing/count_slowly_but_ignore_cancel", json={"n": 5} + ) + response.raise_for_status() + # Use `client.delete` to try to cancel the task! + cancel_response = client.delete(task_href(response.json())) + # Raise an exception is this isn't a 2xx response + cancel_response.raise_for_status() + invocation = poll_task(client, response.json()) + # As the task ignored the cancel. It should return completed + assert invocation["status"] == "completed" + # Counter should be greater than 5 as it counts faster if cancelled! + assert counting_thing.counter > 5 + + +def test_invocation_that_needs_cancel_twice(counting_thing, client): + """ + Test that an invocation can interpret cancel to change behaviour, but + can really cancel if requested a second time + """ + # First cancel only once: + assert counting_thing.counter == 0 + response = client.post( + "/counting_thing/count_and_only_cancel_if_asked_twice", json={"n": 5} + ) + response.raise_for_status() + # Use `client.delete` to try to cancel the task! + cancel_response = client.delete(task_href(response.json())) + # Raise an exception is this isn't a 2xx response + cancel_response.raise_for_status() + invocation = poll_task(client, response.json()) + # As the task ignored the cancel. It should return completed + assert invocation["status"] == "completed" + # Counter should be less than 0 as it should started counting backwards + # almost immediately. + assert counting_thing.counter < 0 + + # Next cancel twice. + counting_thing.counter = 0 + assert counting_thing.counter == 0 + response = client.post( + "/counting_thing/count_and_only_cancel_if_asked_twice", json={"n": 5} + ) + response.raise_for_status() + # Use `client.delete` to try to cancel the task! + cancel_response = client.delete(task_href(response.json())) + # Raise an exception is this isn't a 2xx response + cancel_response.raise_for_status() + # Cancel again + cancel_response2 = client.delete(task_href(response.json())) + # Raise an exception is this isn't a 2xx response + cancel_response2.raise_for_status() + invocation = poll_task(client, response.json()) + # As the task ignored the cancel. It should return completed + assert invocation["status"] == "cancelled" + # Counter should be less than 0 as it should started counting backwards + # almost immediately. + assert counting_thing.counter < 0 + + +def test_late_invocation_cancel_responds_503(counting_thing, client): + """ + Test that cancelling an invocation after it completes returns a 503 response. + """ + assert counting_thing.counter == 0 + assert not counting_thing.check + response = client.post("/counting_thing/count_slowly", json={"n": 1}) + response.raise_for_status() + # Sleep long enough that task completes. + time.sleep(0.3) + poll_task(client, response.json()) + # Use `client.delete` to cancel the task! + cancel_response = client.delete(task_href(response.json())) + # Check a 503 code is returned + assert cancel_response.status_code == 503 + # Check counter reached it's target + assert counting_thing.counter == 1 + # Check that error handling wasn't called + assert not counting_thing.check + + +def test_cancel_unknown_task(counting_thing, client): + """ + Test that cancelling an unknown invocation returns a 404 response + """ + cancel_response = client.delete(f"/invocations/{uuid.uuid4()}") + assert cancel_response.status_code == 404 diff --git a/tests/old_dependency_tests/test_action_logging.py b/tests/old_dependency_tests/test_action_logging.py new file mode 100644 index 0000000..0c4e678 --- /dev/null +++ b/tests/old_dependency_tests/test_action_logging.py @@ -0,0 +1,127 @@ +""" +This tests the log that is returned in an action invocation +""" + +import logging +from fastapi.testclient import TestClient +import pytest +from ..temp_client import poll_task +import labthings_fastapi as lt +from labthings_fastapi.actions.invocation_model import LogRecordModel + + +class ThingThatLogsAndErrors(lt.Thing): + LOG_MESSAGES = [ + "message 1", + "message 2", + ] + + @lt.thing_action + def action_that_logs(self, logger: lt.deps.InvocationLogger): + for m in self.LOG_MESSAGES: + logger.info(m) + + @lt.thing_action + def action_with_unhandled_error(self, logger: lt.deps.InvocationLogger): + raise RuntimeError("I was asked to raise this error.") + + @lt.thing_action + def action_with_invocation_error(self, logger: lt.deps.InvocationLogger): + raise lt.exceptions.InvocationError("This is an error, but I handled it!") + + +@pytest.fixture +def client(): + """Set up a Thing Server and yield a client to it.""" + server = lt.ThingServer() + server.add_thing("log_and_error_thing", ThingThatLogsAndErrors) + with TestClient(server.app) as client: + yield client + + +def test_invocation_logging(caplog, client): + """Check the expected items appear in the log when an action is invoked.""" + with caplog.at_level(logging.INFO, logger="labthings.action"): + r = client.post("/log_and_error_thing/action_that_logs") + r.raise_for_status() + invocation = poll_task(client, r.json()) + assert invocation["status"] == "completed" + assert len(invocation["log"]) == len(ThingThatLogsAndErrors.LOG_MESSAGES) + assert len(invocation["log"]) == len(caplog.records) + for expected, entry in zip( + ThingThatLogsAndErrors.LOG_MESSAGES, invocation["log"], strict=True + ): + assert entry["message"] == expected + + +def test_unhandled_error_logs(caplog, client): + """Check that a log with a traceback is raised if there is an unhandled error.""" + with caplog.at_level(logging.INFO, logger="labthings.action"): + r = client.post("/log_and_error_thing/action_with_unhandled_error") + r.raise_for_status() + invocation = poll_task(client, r.json()) + assert invocation["status"] == "error" + assert len(invocation["log"]) == len(caplog.records) == 1 + assert caplog.records[0].levelname == "ERROR" + # There is a traceback + assert caplog.records[0].exc_info is not None + + +def test_invocation_error_logs(caplog, client): + """Check that a log with a traceback is raised if there is an unhandled error.""" + with caplog.at_level(logging.INFO, logger="labthings.action"): + r = client.post("/log_and_error_thing/action_with_invocation_error") + r.raise_for_status() + invocation = poll_task(client, r.json()) + assert invocation["status"] == "error" + assert len(invocation["log"]) == len(caplog.records) == 1 + assert caplog.records[0].levelname == "ERROR" + # There is not a traceback + assert caplog.records[0].exc_info is None + + +def test_logrecordmodel(): + record = logging.LogRecord( + name="recordName", + level=logging.INFO, + pathname="dummy/path", + lineno=0, + msg="a string message", + args=None, + exc_info=None, + ) + m = LogRecordModel.model_validate(record, from_attributes=True) + assert m.levelname == record.levelname + + +def test_logrecord_args(): + record = logging.LogRecord( + name="recordName", + level=logging.INFO, + pathname="dummy/path", + lineno=0, + msg="mambo number %d", + args=(5,), + exc_info=None, + ) + m = LogRecordModel.model_validate(record, from_attributes=True) + assert m.message == record.getMessage() + + +def test_logrecord_too_many_args(): + """Calling getMessage() will raise an error - but it should still validate + + If it doesn't validate, it will cause every subsequent call to the action's + invocation record to return a 500 error. + """ + record = logging.LogRecord( + name="recordName", + level=logging.INFO, + pathname="dummy/path", + lineno=0, + msg="mambo number %d", + args=(5, 6), + exc_info=None, + ) + m = LogRecordModel.model_validate(record, from_attributes=True) + assert m.message.startswith("Error") diff --git a/tests/test_dependencies.py b/tests/old_dependency_tests/test_dependencies.py similarity index 100% rename from tests/test_dependencies.py rename to tests/old_dependency_tests/test_dependencies.py diff --git a/tests/test_dependencies_2.py b/tests/old_dependency_tests/test_dependencies_2.py similarity index 100% rename from tests/test_dependencies_2.py rename to tests/old_dependency_tests/test_dependencies_2.py diff --git a/tests/test_dependency_metadata.py b/tests/old_dependency_tests/test_dependency_metadata.py similarity index 98% rename from tests/test_dependency_metadata.py rename to tests/old_dependency_tests/test_dependency_metadata.py index efec5cb..47d264e 100644 --- a/tests/test_dependency_metadata.py +++ b/tests/old_dependency_tests/test_dependency_metadata.py @@ -5,7 +5,7 @@ from typing import Any, Mapping from fastapi.testclient import TestClient import pytest -from .temp_client import poll_task +from ..temp_client import poll_task import labthings_fastapi as lt diff --git a/tests/test_directthingclient.py b/tests/old_dependency_tests/test_directthingclient.py similarity index 99% rename from tests/test_directthingclient.py rename to tests/old_dependency_tests/test_directthingclient.py index 8bf8c8f..381ee39 100644 --- a/tests/test_directthingclient.py +++ b/tests/old_dependency_tests/test_directthingclient.py @@ -9,7 +9,7 @@ import labthings_fastapi as lt from labthings_fastapi.deps import DirectThingClient, direct_thing_client_class from labthings_fastapi.thing_server_interface import create_thing_without_server -from .temp_client import poll_task +from ..temp_client import poll_task class Counter(lt.Thing): diff --git a/tests/test_thing_dependencies.py b/tests/old_dependency_tests/test_thing_dependencies.py similarity index 99% rename from tests/test_thing_dependencies.py rename to tests/old_dependency_tests/test_thing_dependencies.py index ef7b81b..e66c7cb 100644 --- a/tests/test_thing_dependencies.py +++ b/tests/old_dependency_tests/test_thing_dependencies.py @@ -7,7 +7,7 @@ from fastapi import Request import pytest import labthings_fastapi as lt -from .temp_client import poll_task +from ..temp_client import poll_task from labthings_fastapi.client.in_server import direct_thing_client_class from labthings_fastapi.utilities.introspection import fastapi_dependency_params diff --git a/tests/test_action_cancel.py b/tests/test_action_cancel.py index 881e6bf..d9be8a7 100644 --- a/tests/test_action_cancel.py +++ b/tests/test_action_cancel.py @@ -20,10 +20,10 @@ class CancellableCountingThing(lt.Thing): """ @lt.thing_action - def count_slowly(self, cancel: lt.deps.CancelHook, n: int = 10): + def count_slowly(self, n: int = 10): for _i in range(n): try: - cancel.sleep(0.1) + lt.cancellable_sleep(0.1) except lt.exceptions.InvocationCancelledError as e: # Set check to true to show that cancel was called. self.check = True @@ -31,23 +31,21 @@ def count_slowly(self, cancel: lt.deps.CancelHook, n: int = 10): self.counter += 1 @lt.thing_action - def count_slowly_but_ignore_cancel(self, cancel: lt.deps.CancelHook, n: int = 10): + def count_slowly_but_ignore_cancel(self, n: int = 10): """ Used to check that cancellation alter task behaviour """ counting_increment = 1 for _i in range(n): try: - cancel.sleep(0.1) + lt.cancellable_sleep(0.1) except lt.exceptions.InvocationCancelledError: # Rather than cancel, this disobedient task just counts faster counting_increment = 3 self.counter += counting_increment @lt.thing_action - def count_and_only_cancel_if_asked_twice( - self, cancel: lt.deps.CancelHook, n: int = 10 - ): + def count_and_only_cancel_if_asked_twice(self, n: int = 10): """ A task that changes behaviour on cancel, but if asked a second time will cancel """ @@ -55,15 +53,13 @@ def count_and_only_cancel_if_asked_twice( counting_increment = 1 for _i in range(n): try: - cancel.sleep(0.1) + lt.cancellable_sleep(0.1) except lt.exceptions.InvocationCancelledError as e: # If this is the second time, this is called actually cancel. if cancelled_once: raise (e) # If not, remember that this cancel event happened. cancelled_once = True - # Reset the CancelHook - cancel.clear() # Count backwards instead! counting_increment = -1 self.counter += counting_increment diff --git a/tests/test_action_logging.py b/tests/test_action_logging.py index 03b9213..b48823d 100644 --- a/tests/test_action_logging.py +++ b/tests/test_action_logging.py @@ -17,16 +17,17 @@ class ThingThatLogsAndErrors(lt.Thing): ] @lt.thing_action - def action_that_logs(self, logger: lt.deps.InvocationLogger): + def action_that_logs(self): + logger = lt.get_invocation_logger() for m in self.LOG_MESSAGES: logger.info(m) @lt.thing_action - def action_with_unhandled_error(self, logger: lt.deps.InvocationLogger): + def action_with_unhandled_error(self): raise RuntimeError("I was asked to raise this error.") @lt.thing_action - def action_with_invocation_error(self, logger: lt.deps.InvocationLogger): + def action_with_invocation_error(self): raise lt.exceptions.InvocationError("This is an error, but I handled it!") From a5fa72110ff8da8f3a207303b90788ad5a71753f Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Fri, 10 Oct 2025 00:03:53 +0100 Subject: [PATCH 06/11] More docstring for ThreadWithInvocationID --- src/labthings_fastapi/invocation_contexts.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/labthings_fastapi/invocation_contexts.py b/src/labthings_fastapi/invocation_contexts.py index a1019d9..ec8c527 100644 --- a/src/labthings_fastapi/invocation_contexts.py +++ b/src/labthings_fastapi/invocation_contexts.py @@ -248,12 +248,28 @@ def get_invocation_logger(id: UUID | None = None) -> logging.Logger: class ThreadWithInvocationID(Thread): - """A thread that sets a new invocation ID. + r"""A thread that sets a new invocation ID. This is a subclass of `threading.Thread` and works very much the same way. It implements its functionality by overriding the ``run`` method, so this should not be overridden again - you should instead specify the code to run using the ``target`` argument. + + This function enables an action to be run in a thread, which gets its + own invocation ID and cancel hook. This means logs will not be interleaved + with the calling action, and the thread may be cancelled just like an + action started over HTTP, by calling its ``cancel`` method. + + The thread also remembers the return value of the target function + in the property ``result`` and stores any exception raised in the + ``exception`` property. + + A final LabThings-specific feature is cancellation propagation. If + the thread is started from an action that may be cancelled, it may + be joined with ``join_and_propagate_cancel``\ . This is intended + to be equivalent to calling ``join`` but with the added feature that, + if the parent thread is cancelled while waiting for the child thread + to join, the child thread will also be cancelled. """ def __init__( From b791072f2bfa55dbb94dffbe9b45254725e5b73d Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Wed, 22 Oct 2025 22:26:08 +0100 Subject: [PATCH 07/11] Split cancellable_sleep In response to feedback, I've added `raise_if_cancelled()` to replace `cancellable_sleep(None)`. In response to the same feedback, I now handle the error if no invocation ID is available, so we simply perform a regular time.sleep. I've also deleted a couple of defunct print statements. --- src/labthings_fastapi/__init__.py | 2 + src/labthings_fastapi/invocation_contexts.py | 49 ++++++++------ tests/test_invocation_contexts.py | 71 +++++++++++++------- 3 files changed, 79 insertions(+), 43 deletions(-) diff --git a/src/labthings_fastapi/__init__.py b/src/labthings_fastapi/__init__.py index affd56f..a718431 100644 --- a/src/labthings_fastapi/__init__.py +++ b/src/labthings_fastapi/__init__.py @@ -35,6 +35,7 @@ from .invocation_contexts import ( get_invocation_logger, cancellable_sleep, + raise_if_cancelled, ThreadWithInvocationID, ) @@ -62,5 +63,6 @@ "ThingClient", "get_invocation_logger", "cancellable_sleep", + "raise_if_cancelled", "ThreadWithInvocationID", ] diff --git a/src/labthings_fastapi/invocation_contexts.py b/src/labthings_fastapi/invocation_contexts.py index ec8c527..5a581b0 100644 --- a/src/labthings_fastapi/invocation_contexts.py +++ b/src/labthings_fastapi/invocation_contexts.py @@ -19,6 +19,7 @@ from contextlib import contextmanager import logging from threading import Event, Thread +import time from typing import Any, Callable from typing_extensions import Self from uuid import UUID, uuid4 @@ -191,8 +192,8 @@ def get_cancel_event(id: UUID | None = None) -> CancelEvent: return CancelEvent.get_for_id(id) -def cancellable_sleep(interval: float | None) -> None: - """Sleep for a specified time, allowing cancellation. +def cancellable_sleep(interval: float) -> None: + r"""Sleep for a specified time, allowing cancellation. This function should be called from action functions instead of `time.sleep` to allow them to be cancelled. Usually, this @@ -206,26 +207,38 @@ def cancellable_sleep(interval: float | None) -> None: This function uses `.Event.wait` internally, which suffers from timing errors on some platforms: it may have error of around 10-20ms. If that's a problem, consider using - `time.sleep` instead. ``lt.cancellable_sleep(None)`` may then + `time.sleep` instead. ``lt.raise_if_cancelled()`` may then be used to allow cancellation. - This function may only be called from an action thread, as it - depends on the invocation ID being available from a context variable. - Use `.set_invocation_id` to make it available outside of an action - thread. + If this function is called from outside of an action thread, it + will revert to `time.sleep`\ . + + :param interval: The length of time to wait for, in seconds. + """ + try: + event = get_cancel_event() + event.sleep(interval) + except NoInvocationContextError: + time.sleep(interval) - If ``interval`` is set to None, we do not call `.Event.wait` but - instead we simply check whether the event is set. - :param interval: The length of time to sleep for, in seconds. If it - is `None` we won't wait, but we will still check for a cancel - event, and raise the exception if it is set. +def raise_if_cancelled() -> None: + """Raise an exception if the current invocation has been cancelled. + + This function checks for cancellation events and, if the current + action invocation has been cancelled, it will raise an + `.InvocationCancelledError` to signal the thread to terminate. + It is equivalent to `.cancellable_sleep` but without waiting any + time. + + If called outside of an invocation context, this function does + nothing, and will not raise an error. """ - event = get_cancel_event() - if interval is None: + try: + event = get_cancel_event() event.raise_if_set() - else: - event.sleep(interval) + except NoInvocationContextError: + pass def get_invocation_logger(id: UUID | None = None) -> logging.Logger: @@ -335,11 +348,9 @@ def join_and_propagate_cancel(self, poll_interval: float = 0.2) -> None: cancellation: InvocationCancelledError | None = None self._polls = 0 self._attempted_cancels = 0 - print(f"Checking for cancellation of invocation {get_invocation_id()}") - print(f"so we can cancel {self.invocation_id}") while self.is_alive(): try: - cancellable_sleep(None) + raise_if_cancelled() self._polls += 1 except InvocationCancelledError as e: # Propagate the cancellation signal to the thread diff --git a/tests/test_invocation_contexts.py b/tests/test_invocation_contexts.py index 65595c8..df10da5 100644 --- a/tests/test_invocation_contexts.py +++ b/tests/test_invocation_contexts.py @@ -134,10 +134,10 @@ def test_cancel_event(): def test_cancellable_sleep(): """Check the module-level cancellable sleep.""" - with pytest.raises(exc.NoInvocationContextError): - ic.cancellable_sleep(1) - with pytest.raises(exc.NoInvocationContextError): - ic.cancellable_sleep(None) + # with no invocation context, the function should wait + # and there should be no error. + with assert_takes_time(0.02, 0.04): + ic.cancellable_sleep(0.02) with ic.fake_invocation_context(): event = ic.get_cancel_event() @@ -146,20 +146,30 @@ def test_cancellable_sleep(): with assert_takes_time(0.02, 0.04): ic.cancellable_sleep(0.02) - # passing `None` should return immediately. - with assert_takes_time(None, 0.002): - ic.cancellable_sleep(None) - # check an exception gets raised and reset if appropriate event.set() with pytest.raises(exc.InvocationCancelledError): ic.cancellable_sleep(1) assert not event.is_set() + +def test_raise_if_cancelled(): + """Check the module-level cancellable sleep.""" + # the function should return immediately. + with assert_takes_time(None, 0.002): + ic.raise_if_cancelled() + + with ic.fake_invocation_context(): + event = ic.get_cancel_event() + + # the function should return immediately. + with assert_takes_time(None, 0.002): + ic.raise_if_cancelled() + # check an exception gets raised and reset if appropriate event.set() with pytest.raises(exc.InvocationCancelledError): - ic.cancellable_sleep(None) + ic.raise_if_cancelled() assert not event.is_set() @@ -179,19 +189,6 @@ def test_invocation_logger(): assert logger.name.endswith(str(id)) -def run_function_in_thread_and_propagate_cancellation(func, *args): - """Run a function in a ThreadWithInvocationID.""" - t = ic.ThreadWithInvocationID(target=func, args=args) - t.start() - try: - t.join_and_propagate_cancel(0.005) - except exc.InvocationCancelledError: - # We still want to return the finished thread if it's - # cancelled. - pass - return t - - def test_thread_with_invocation_id(): """Test our custom thread subclass makes a new ID and can be cancelled.""" ids = [] @@ -204,6 +201,9 @@ def test_thread_with_invocation_id(): assert t.exception is None assert t.result is None + +def test_thread_with_invocation_id_cancel(): + """Test the custom thread subclass responds to cancellation.""" # Check cancellable sleep works in the thread t = ic.ThreadWithInvocationID(target=ic.cancellable_sleep, args=[1]) assert isinstance(t.invocation_id, uuid.UUID) @@ -213,20 +213,43 @@ def test_thread_with_invocation_id(): t.join() assert isinstance(t.exception, exc.InvocationCancelledError) - # Check we capture the return value + +def test_thread_with_invocation_id_return_value(): + """Check we capture the return value when running in a ThreadWithInvocationID.""" t = ic.ThreadWithInvocationID(target=lambda: True) t.start() t.join() assert t.exception is None assert t.result is True + +def run_function_in_thread_and_propagate_cancellation(func, *args): + """Run a function in a ThreadWithInvocationID.""" + t = ic.ThreadWithInvocationID(target=func, args=args) + t.start() + try: + t.join_and_propagate_cancel(1) + except exc.InvocationCancelledError: + # We still want to return the finished thread if it's + # cancelled. + pass + return t + + +def test_thread_with_invocation_id_cancellation_propagates(): + """Check that a cancel event can propagate to our thread. + + ``join_and_propagate_cancellation`` should cancel the spawned thread if + the parent thread is cancelled while it's waiting for the spawned thread + to join. + """ # Check we can propagate cancellation. # First, we run `cancellable_sleep` and check it doesn't cancel with ic.fake_invocation_context(): # First test our function - there is only one thread here, and we # check it finishes and doesn't error. t = run_function_in_thread_and_propagate_cancellation( - ic.cancellable_sleep, 0.001 + ic.cancellable_sleep, 0.02 ) assert isinstance(t, ic.ThreadWithInvocationID) assert not t.is_alive() From bea820df795f55de2827a1a1ad6b078d14bbd65b Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 23 Oct 2025 00:09:52 +0100 Subject: [PATCH 08/11] Don't make loggers per-invocation This commit adds a logger for every Thing instance, and a custom log handler and filter that inject the invocation ID into LogRecord objects. This means we can still filter out invocation logs as we did before, but we no longer need to make a new logger for each invocation. This more or less follows the example given in the Logging Cookbook for adding context. --- docs/source/actions.rst | 13 +- src/labthings_fastapi/actions/__init__.py | 140 +++++++------------ src/labthings_fastapi/exceptions.py | 9 ++ src/labthings_fastapi/invocation_contexts.py | 20 --- src/labthings_fastapi/logs.py | 130 +++++++++++++++++ src/labthings_fastapi/server/__init__.py | 2 + src/labthings_fastapi/thing.py | 6 + tests/test_action_logging.py | 11 +- tests/test_invocation_contexts.py | 16 --- 9 files changed, 209 insertions(+), 138 deletions(-) create mode 100644 src/labthings_fastapi/logs.py diff --git a/docs/source/actions.rst b/docs/source/actions.rst index 1aa201f..1294a7c 100644 --- a/docs/source/actions.rst +++ b/docs/source/actions.rst @@ -61,15 +61,14 @@ other `.Things` on the same server. .. action_logging: Logging from actions -------------------- -It's helpful to be able to keep track of the log messages from action code. +Action code should use `.Thing.logger` to log messages. This will be configured +to handle messages on a per-invocation basis and make them available when the action +is queried over HTTP. + This may be used to display status updates to the user when an action takes -a long time to run, or it may simply be a helpful debugging aid. To log -messages that are associated with a particular invocation of an action, -use ``lt.get_invocation_logger()`` to obtain a `logging.Logger` that will -associate its messages with the current invocation. +a long time to run, or it may simply be a helpful debugging aid. -Logs are available over HTTP. If an action is run from another action, -it will use the same logger as its parent action. +See :mod:`.logs` for details of how this is implemented. .. action_cancellation: Cancelling actions diff --git a/src/labthings_fastapi/actions/__init__.py b/src/labthings_fastapi/actions/__init__.py index fbd9a55..cdca589 100644 --- a/src/labthings_fastapi/actions/__init__.py +++ b/src/labthings_fastapi/actions/__init__.py @@ -19,13 +19,15 @@ import logging from collections import deque from threading import Thread, Lock -from typing import MutableSequence, Optional, Any +from typing import Optional, Any import uuid from typing import TYPE_CHECKING import weakref from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel +from labthings_fastapi.logs import add_thing_log_destination + from ..utilities import model_to_dict from ..utilities.introspection import EmptyInput from ..thing_description._model import LinkElement @@ -39,7 +41,6 @@ CancelEvent, get_cancel_event, set_invocation_id, - get_invocation_logger, ) if TYPE_CHECKING: @@ -274,98 +275,57 @@ def run(self) -> None: # self.action evaluates to an ActionDescriptor. This confuses mypy, # which thinks we are calling ActionDescriptor.__get__. action: ActionDescriptor = self.action # type: ignore[call-overload] - # Create a logger just for this invocation, keyed to the invocation id - # Logs that go to this logger will be copied into `self._log` - handler = DequeLogHandler(dest=self._log) - logger = get_invocation_logger(self.id) - logger.setLevel(logging.INFO) - logger.addHandler(handler) - try: - action.emit_changed_event(self.thing, self._status.value) - - thing = self.thing - kwargs = model_to_dict(self.input) - if thing is None: # pragma: no cover - # The Thing is stored as a weakref, but it will always exist - # while the server is running - this error should never - # occur. - raise RuntimeError("Cannot start an invocation without a Thing.") - - with self._status_lock: - self._status = InvocationStatus.RUNNING - self._start_time = datetime.datetime.now() + logger = self.thing.logger + # The line below saves records matching our ID to ``self._log`` + add_thing_log_destination(self.id, self._log) + with set_invocation_id(self.id): + try: action.emit_changed_event(self.thing, self._status.value) - bound_method = action.__get__(thing) - # Actually run the action - with set_invocation_id(self.id): + thing = self.thing + kwargs = model_to_dict(self.input) + if thing is None: # pragma: no cover + # The Thing is stored as a weakref, but it will always exist + # while the server is running - this error should never + # occur. + raise RuntimeError("Cannot start an invocation without a Thing.") + + with self._status_lock: + self._status = InvocationStatus.RUNNING + self._start_time = datetime.datetime.now() + action.emit_changed_event(self.thing, self._status.value) + + bound_method = action.__get__(thing) + # Actually run the action ret = bound_method(**kwargs, **self.dependencies) - with self._status_lock: - self._return_value = ret - self._status = InvocationStatus.COMPLETED - action.emit_changed_event(self.thing, self._status.value) - except InvocationCancelledError: - logger.info(f"Invocation {self.id} was cancelled.") - with self._status_lock: - self._status = InvocationStatus.CANCELLED - action.emit_changed_event(self.thing, self._status.value) - except Exception as e: # skipcq: PYL-W0703 - # First log - if isinstance(e, InvocationError): - # Log without traceback - logger.error(e) - else: - logger.exception(e) - # Then set status - with self._status_lock: - self._status = InvocationStatus.ERROR - self._exception = e - action.emit_changed_event(self.thing, self._status.value) - finally: - with self._status_lock: - self._end_time = datetime.datetime.now() - self.expiry_time = self._end_time + datetime.timedelta( - seconds=self.retention_time, - ) - logger.removeHandler(handler) # Stop saving logs - # If we don't remove the log handler, it's a circular ref/memory leak. - - -class DequeLogHandler(logging.Handler): - """A log handler that stores entries in memory.""" - - def __init__( - self, - dest: MutableSequence, - level: int = logging.INFO, - ) -> None: - """Set up a log handler that appends messages to a deque. - - .. warning:: - This log handler does not currently rotate or truncate - the list - so if you use it on a thread that produces a - lot of log messages, you may run into memory problems. - - Using a `.deque` with a finite capacity helps to mitigate - this. - - :param dest: should specify a deque, to which we will append - each log entry as it comes in. This is assumed to be thread - safe. - :param level: sets the level of the logger. For most invocations, - a log level of `logging.INFO` is appropriate. - """ - logging.Handler.__init__(self) - self.setLevel(level) - self.dest = dest - - def emit(self, record: logging.LogRecord) -> None: - """Save a log record to the destination deque. - - :param record: the `logging.LogRecord` object to add. - """ - self.dest.append(record) + with self._status_lock: + self._return_value = ret + self._status = InvocationStatus.COMPLETED + action.emit_changed_event(self.thing, self._status.value) + except InvocationCancelledError: + logger.info(f"Invocation {self.id} was cancelled.") + with self._status_lock: + self._status = InvocationStatus.CANCELLED + action.emit_changed_event(self.thing, self._status.value) + except Exception as e: # skipcq: PYL-W0703 + # First log + if isinstance(e, InvocationError): + # Log without traceback + logger.error(e) + else: + logger.exception(e) + # Then set status + with self._status_lock: + self._status = InvocationStatus.ERROR + self._exception = e + action.emit_changed_event(self.thing, self._status.value) + finally: + with self._status_lock: + self._end_time = datetime.datetime.now() + self.expiry_time = self._end_time + datetime.timedelta( + seconds=self.retention_time, + ) class ActionManager: diff --git a/src/labthings_fastapi/exceptions.py b/src/labthings_fastapi/exceptions.py index 0c8e4c9..4895f85 100644 --- a/src/labthings_fastapi/exceptions.py +++ b/src/labthings_fastapi/exceptions.py @@ -128,3 +128,12 @@ class NoInvocationContextError(RuntimeError): To avoid this error in test code or manually created threads, you should supply an invocation context. """ + + +class LogConfigurationError(RuntimeError): + """There is a problem with logging configuration. + + LabThings uses the `logging` module to collect logs from actions. This requires + certain handlers and filters to be set up. This exception is raised if they + cannot be added, or if they are not present when they are needed. + """ diff --git a/src/labthings_fastapi/invocation_contexts.py b/src/labthings_fastapi/invocation_contexts.py index 5a581b0..03b20ad 100644 --- a/src/labthings_fastapi/invocation_contexts.py +++ b/src/labthings_fastapi/invocation_contexts.py @@ -17,7 +17,6 @@ from collections.abc import Iterator, Mapping, Sequence from contextvars import ContextVar from contextlib import contextmanager -import logging from threading import Event, Thread import time from typing import Any, Callable @@ -241,25 +240,6 @@ def raise_if_cancelled() -> None: pass -def get_invocation_logger(id: UUID | None = None) -> logging.Logger: - """Obtain a logger for the current invocation. - - Use this function to get a logger to use in action code. This - will associate the log messages with the invocation, so that - they may be used as status updates or related to a particular run - of the action. - - :param id: the invocation ID. This will be determined from context - so need not be specified in action code. - :return: a logger that is specific to a particular invocation of - an action. - """ - if id is None: - id = get_invocation_id() - logger = logging.getLogger(f"labthings_fastapi.actions.{id}") - return logger - - class ThreadWithInvocationID(Thread): r"""A thread that sets a new invocation ID. diff --git a/src/labthings_fastapi/logs.py b/src/labthings_fastapi/logs.py new file mode 100644 index 0000000..ebcfd7d --- /dev/null +++ b/src/labthings_fastapi/logs.py @@ -0,0 +1,130 @@ +"""Log-related functions and classes. + +This module currently contains code that allows us to filter out logs by invocaton +ID, so that they may be returned when invocations are queried. +""" + +from collections.abc import MutableSequence +import logging +from uuid import UUID +from weakref import WeakValueDictionary +from .invocation_contexts import get_invocation_id +from .exceptions import LogConfigurationError, NoInvocationContextError + + +THING_LOGGER = logging.getLogger("labthings_fastapi.things") + + +def inject_invocation_id(record: logging.LogRecord) -> bool: + r"""Add the invocation ID to records. + + This function adds the current invocation ID to log records. If it is not + available, we set the record's ``invocation_id`` property to `None`\ . + + :param record: the `logging.LogRecord` object to modify. + + :return: `True` (which signals we should keep every record if this is used + as a filter). + """ + try: + id = get_invocation_id() + record.invocation_id = id + except NoInvocationContextError: + record.invocation_id = None + return True + + +class DequeByInvocationIDHandler(logging.Handler): + """A log handler that stores entries in memory.""" + + def __init__( + self, + level: int = logging.INFO, + ) -> None: + """Set up a log handler that appends messages to a deque. + + .. warning:: + This log handler does not currently rotate or truncate + the list. It's best to use a `deque` with a finite capacity + to avoid memory leaks. + + :param level: sets the level of the logger. For most invocations, + a log level of `logging.INFO` is appropriate. + """ + super().__init__() + self.setLevel(level) + self.destinations = WeakValueDictionary[UUID, MutableSequence]() + self.addFilter(inject_invocation_id) + + def add_destination_for_id(self, id: UUID, destination: MutableSequence) -> None: + """Append logs matching ``id`` to a specified sequence. + + :param id: the ``invocation_id`` to match. + :param destination: should specify a deque, to which we will append + each log entry as it comes in. This is assumed to be thread + safe. + """ + self.destinations[id] = destination + + def emit(self, record: logging.LogRecord) -> None: + """Save a log record to the destination deque. + + :param record: the `logging.LogRecord` object to add. + """ + id = getattr(record, "invocation_id", None) + if isinstance(id, UUID): + try: + self.destinations[id].append(record) + except KeyError: + pass # If there's no destination for a particular log, ignore it. + + +def configure_thing_logger() -> None: + """Set up the logger for thing instances. + + We always set the logger for thing instances to level INFO, as this + is currently used to relay progress to the client. + + This function will collect logs on a per-invocation + basis by adding a `.DequeByInvocationIDHandler` to the log. Only one + such handler will be added - subsequent calls are ignored. + + Unfortunately, filters must be added to every sub-logger, so globally adding + a filter to add invocation ID is not possible. Instead, we attach a filter to + the handler, which filters all the records that propagate to it (i.e. anything + that starts with ``labthings_fastapi.things``). + """ + THING_LOGGER.setLevel(logging.INFO) + if not any( + isinstance(h, DequeByInvocationIDHandler) for h in THING_LOGGER.handlers + ): + THING_LOGGER.addHandler(DequeByInvocationIDHandler()) + + +def add_thing_log_destination( + invocation_id: UUID, destination: MutableSequence +) -> None: + """Append logs matching ``id`` to a specified sequence. + + This instructs a handler on the logger used for `.Thing` instances to append a copy + of the logs generated by that invocation to the specified sequence. + This is primarily used by invocation threads to collect their logs, so they + may be returned when the invocation is queried. + + :param invocation_id: the ``invocation_id`` to match. + :param destination: should specify a deque, to which we will append + each log entry as it comes in. This is assumed to be thread + safe. + """ + handlers = [ + h for h in THING_LOGGER.handlers if isinstance(h, DequeByInvocationIDHandler) + ] + if len(handlers) != 1: + if len(handlers) == 0: + msg = "There is no suitable handler on {THING_LOGGER}." + else: + msg = "There were multiple matching handlers on {THING_LOGGER}, " + msg += "which should not happen: this is a LabThings bug." + raise LogConfigurationError(msg) + handler = handlers[0] + handler.add_destination_for_id(invocation_id, destination) diff --git a/src/labthings_fastapi/server/__init__.py b/src/labthings_fastapi/server/__init__.py index 0af0ec9..d91f62a 100644 --- a/src/labthings_fastapi/server/__init__.py +++ b/src/labthings_fastapi/server/__init__.py @@ -26,6 +26,7 @@ object_reference_to_object, ) from ..actions import ActionManager +from ..logs import configure_thing_logger from ..thing import Thing from ..thing_server_interface import ThingServerInterface from ..thing_description._model import ThingDescription @@ -89,6 +90,7 @@ def __init__(self, settings_folder: Optional[str] = None) -> None: self.startup_status: dict[str, str | dict] = {"things": {}} global _thing_servers # noqa: F824 _thing_servers.add(self) + configure_thing_logger() # Note: this is safe to call multiple times. app: FastAPI action_manager: ActionManager diff --git a/src/labthings_fastapi/thing.py b/src/labthings_fastapi/thing.py index ccbd87f..9935a3c 100644 --- a/src/labthings_fastapi/thing.py +++ b/src/labthings_fastapi/thing.py @@ -20,6 +20,7 @@ from pydantic import BaseModel +from .logs import THING_LOGGER from .properties import BaseProperty, DataProperty, BaseSetting from .descriptors import ActionDescriptor from .thing_description._model import ThingDescription, NoSecurityScheme @@ -106,6 +107,11 @@ def name(self) -> str: """The name of this Thing, as known to the server.""" return self._thing_server_interface.name + @property + def logger(self) -> logging.Logger: + """A logger, named after this Thing.""" + return THING_LOGGER.getChild(self.name) + async def __aenter__(self) -> Self: """Context management is used to set up/close the thing. diff --git a/tests/test_action_logging.py b/tests/test_action_logging.py index b48823d..9804ae7 100644 --- a/tests/test_action_logging.py +++ b/tests/test_action_logging.py @@ -8,6 +8,7 @@ from .temp_client import poll_task import labthings_fastapi as lt from labthings_fastapi.actions.invocation_model import LogRecordModel +from labthings_fastapi.logs import THING_LOGGER class ThingThatLogsAndErrors(lt.Thing): @@ -18,9 +19,8 @@ class ThingThatLogsAndErrors(lt.Thing): @lt.thing_action def action_that_logs(self): - logger = lt.get_invocation_logger() for m in self.LOG_MESSAGES: - logger.info(m) + self.logger.info(m) @lt.thing_action def action_with_unhandled_error(self): @@ -42,11 +42,12 @@ def client(): def test_invocation_logging(caplog, client): """Check the expected items appear in the log when an action is invoked.""" - with caplog.at_level(logging.INFO, logger="labthings.action"): + with caplog.at_level(logging.INFO, logger=THING_LOGGER.name): r = client.post("/log_and_error_thing/action_that_logs") r.raise_for_status() invocation = poll_task(client, r.json()) assert invocation["status"] == "completed" + assert len(caplog.records) == len(ThingThatLogsAndErrors.LOG_MESSAGES) assert len(invocation["log"]) == len(ThingThatLogsAndErrors.LOG_MESSAGES) assert len(invocation["log"]) == len(caplog.records) for expected, entry in zip( @@ -57,7 +58,7 @@ def test_invocation_logging(caplog, client): def test_unhandled_error_logs(caplog, client): """Check that a log with a traceback is raised if there is an unhandled error.""" - with caplog.at_level(logging.INFO, logger="labthings.action"): + with caplog.at_level(logging.INFO, logger=THING_LOGGER.name): r = client.post("/log_and_error_thing/action_with_unhandled_error") r.raise_for_status() invocation = poll_task(client, r.json()) @@ -70,7 +71,7 @@ def test_unhandled_error_logs(caplog, client): def test_invocation_error_logs(caplog, client): """Check that a log with a traceback is raised if there is an unhandled error.""" - with caplog.at_level(logging.INFO, logger="labthings.action"): + with caplog.at_level(logging.INFO, logger=THING_LOGGER.name): r = client.post("/log_and_error_thing/action_with_invocation_error") r.raise_for_status() invocation = poll_task(client, r.json()) diff --git a/tests/test_invocation_contexts.py b/tests/test_invocation_contexts.py index df10da5..9de38ea 100644 --- a/tests/test_invocation_contexts.py +++ b/tests/test_invocation_contexts.py @@ -173,22 +173,6 @@ def test_raise_if_cancelled(): assert not event.is_set() -def test_invocation_logger(): - """Check `get_invocation_logger` behaves correctly.""" - # The function simply returns a logger with the ID in the name. - fake_id = uuid.uuid4() - logger = ic.get_invocation_logger(fake_id) - assert logger.name.endswith(str(fake_id)) - - # The ID is taken from context if not supplied. - with pytest.raises(exc.NoInvocationContextError): - ic.get_invocation_logger() - with ic.fake_invocation_context(): - logger = ic.get_invocation_logger() - id = ic.get_invocation_id() - assert logger.name.endswith(str(id)) - - def test_thread_with_invocation_id(): """Test our custom thread subclass makes a new ID and can be cancelled.""" ids = [] From f784f50e4877c4d65d03349778cfae09fa256113 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 23 Oct 2025 00:18:13 +0100 Subject: [PATCH 09/11] Fix old test and dependency For ease of migration, I've fixed the old InvocationLogger dependency. This uses a hard-coded Thing name, but will work well enough to enable a smooth migration to the new syntax. --- src/labthings_fastapi/__init__.py | 2 -- src/labthings_fastapi/dependencies/invocation.py | 5 +++-- tests/old_dependency_tests/test_action_logging.py | 7 ++++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/labthings_fastapi/__init__.py b/src/labthings_fastapi/__init__.py index a718431..e989268 100644 --- a/src/labthings_fastapi/__init__.py +++ b/src/labthings_fastapi/__init__.py @@ -33,7 +33,6 @@ from .server import ThingServer, cli from .client import ThingClient from .invocation_contexts import ( - get_invocation_logger, cancellable_sleep, raise_if_cancelled, ThreadWithInvocationID, @@ -61,7 +60,6 @@ "ThingServer", "cli", "ThingClient", - "get_invocation_logger", "cancellable_sleep", "raise_if_cancelled", "ThreadWithInvocationID", diff --git a/src/labthings_fastapi/dependencies/invocation.py b/src/labthings_fastapi/dependencies/invocation.py index 51f95dd..7cf5789 100644 --- a/src/labthings_fastapi/dependencies/invocation.py +++ b/src/labthings_fastapi/dependencies/invocation.py @@ -36,7 +36,8 @@ from typing import Annotated from fastapi import Depends import logging -from ..invocation_contexts import CancelEvent, get_invocation_logger +from ..invocation_contexts import CancelEvent +from ..logs import THING_LOGGER def invocation_id() -> uuid.UUID: @@ -87,7 +88,7 @@ def invocation_logger(id: InvocationID) -> logging.Logger: :return: A `logging.Logger` object specific to this invocation. """ - return get_invocation_logger(id) + return THING_LOGGER.getChild("OLD_DEPENDENCY_LOGGER") InvocationLogger = Annotated[logging.Logger, Depends(invocation_logger)] diff --git a/tests/old_dependency_tests/test_action_logging.py b/tests/old_dependency_tests/test_action_logging.py index 0c4e678..2c521d3 100644 --- a/tests/old_dependency_tests/test_action_logging.py +++ b/tests/old_dependency_tests/test_action_logging.py @@ -8,6 +8,7 @@ from ..temp_client import poll_task import labthings_fastapi as lt from labthings_fastapi.actions.invocation_model import LogRecordModel +from labthings_fastapi.logs import THING_LOGGER class ThingThatLogsAndErrors(lt.Thing): @@ -41,7 +42,7 @@ def client(): def test_invocation_logging(caplog, client): """Check the expected items appear in the log when an action is invoked.""" - with caplog.at_level(logging.INFO, logger="labthings.action"): + with caplog.at_level(logging.INFO, logger=THING_LOGGER.name): r = client.post("/log_and_error_thing/action_that_logs") r.raise_for_status() invocation = poll_task(client, r.json()) @@ -56,7 +57,7 @@ def test_invocation_logging(caplog, client): def test_unhandled_error_logs(caplog, client): """Check that a log with a traceback is raised if there is an unhandled error.""" - with caplog.at_level(logging.INFO, logger="labthings.action"): + with caplog.at_level(logging.INFO, logger=THING_LOGGER.name): r = client.post("/log_and_error_thing/action_with_unhandled_error") r.raise_for_status() invocation = poll_task(client, r.json()) @@ -69,7 +70,7 @@ def test_unhandled_error_logs(caplog, client): def test_invocation_error_logs(caplog, client): """Check that a log with a traceback is raised if there is an unhandled error.""" - with caplog.at_level(logging.INFO, logger="labthings.action"): + with caplog.at_level(logging.INFO, logger=THING_LOGGER.name): r = client.post("/log_and_error_thing/action_with_invocation_error") r.raise_for_status() invocation = poll_task(client, r.json()) From 056f61f14672c2cff2576196e0440b48be563412 Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Thu, 23 Oct 2025 00:19:32 +0100 Subject: [PATCH 10/11] Fix an edge case in ThreadWithInvocationID The test for ThreadWithInvocationID occasionally failed, because the thread was cancelled before it started running, and the CancelEvent was destroyed (and reset). I now hold a reference to the CancelEvent for the lifetime of the Thread, which means this is no longer possible. --- src/labthings_fastapi/invocation_contexts.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/labthings_fastapi/invocation_contexts.py b/src/labthings_fastapi/invocation_contexts.py index 03b20ad..b7c31ae 100644 --- a/src/labthings_fastapi/invocation_contexts.py +++ b/src/labthings_fastapi/invocation_contexts.py @@ -288,6 +288,13 @@ def __init__( self._invocation_id: UUID = uuid4() self._result: Any = None self._exception: BaseException | None = None + # We hold a reference to the CancelEvent below, to ensure that it + # doesn't get garbage collected. Garbage collection means that we + # might (or might not) ignore cancellation that happens before the + # thread has started properly. This is an edge case, but can mess + # up testing code, so it's safest to ensure the event exists for at + # least as long as this Thread. + self._cancel_event = CancelEvent.get_for_id(self._invocation_id) @property def invocation_id(self) -> UUID: From 3478e65511e4ed85c09ee2b45cef0dba0c47e11e Mon Sep 17 00:00:00 2001 From: Richard Bowman Date: Sun, 26 Oct 2025 21:08:44 +0000 Subject: [PATCH 11/11] Test the logs module This adds full unit test coverage, in addition to the more functional testing in `test_action_logging`. --- tests/test_logs.py | 178 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 tests/test_logs.py diff --git a/tests/test_logs.py b/tests/test_logs.py new file mode 100644 index 0000000..c09abd0 --- /dev/null +++ b/tests/test_logs.py @@ -0,0 +1,178 @@ +"""Unit tests for the `.logs` module. + +These tests are intended to complement the more functional tests +in ``test_aciton_logging`` with bottom-up tests for code in the +`.logs` module. +""" + +from collections import deque +import logging +from types import EllipsisType +import pytest +from uuid import UUID, uuid4 +from labthings_fastapi import logs +from labthings_fastapi.invocation_contexts import ( + fake_invocation_context, + set_invocation_id, +) +import labthings_fastapi as lt +from labthings_fastapi import exceptions as exc +from labthings_fastapi.thing_server_interface import create_thing_without_server + + +class ThingThatLogs(lt.Thing): + @lt.thing_action + def log_a_message(self, msg: str): + """Log a message to the thing's logger.""" + self.logger.info(msg) + + +def reset_thing_logger(): + """Remove all handlers from the THING_LOGGER to reset it.""" + logger = logs.THING_LOGGER + # Note that the [:] below is important: this copies the list and avoids + # issues with modifying a list as we're iterating through it. + for h in logger.handlers[:]: + logger.removeHandler(h) + for f in logger.filters[:]: + logger.removeFilter(f) + assert len(logger.handlers) == 0 + assert len(logger.filters) == 0 + + +def make_record(msg="A test message", id: UUID | EllipsisType | None = ...): + """A LogRecord object.""" + record = logging.LogRecord( + "labthings_fastapi.things.test", + logging.INFO, + "test/file/path.py", + 42, + msg, + None, + None, + "test_function", + None, + ) + if id is not ...: + record.invocation_id = id + return record + + +def test_inject_invocation_id_nocontext(): + """Check our filter function correctly adds invocation ID to a log record.""" + record = make_record() + # The record won't have an invocation ID to start with. + assert not hasattr(record, "invocation_id") + # The filter should return True (to keep the record) + assert logs.inject_invocation_id(record) is True + # It should add the attribute, but with no invocation + # context, it should be set to None + assert record.invocation_id is None + + # Currently, if we re-run the filter it silently overwrites, + # so there should be no error below: + assert logs.inject_invocation_id(record) is True + + # Currently, it should overwrite the value. This behaviour + # possibly wants to change in the future, and this test + # should be updated. + with fake_invocation_context() as id: + assert logs.inject_invocation_id(record) is True + assert record.invocation_id == id + + +def test_inject_invocation_id_withcontext(): + """Check our filter function correctly adds invocation ID to a log record.""" + record = make_record() + # The record won't have an invocation ID to start with. + assert not hasattr(record, "invocation_id") + # The filter should return True (to keep the record) + with fake_invocation_context() as id: + assert logs.inject_invocation_id(record) is True + assert record.invocation_id == id + + # Currently, it should overwrite the value. This behaviour + # possibly wants to change in the future, and this test + # should be updated. This ID should be a fresh one. + with fake_invocation_context() as id2: + assert logs.inject_invocation_id(record) is True + # Check the ID has changed and was overwritten. + assert id2 != id + assert record.invocation_id == id2 + + +def test_dequebyinvocationidhandler(): + """Check the custom log handler works as expected.""" + handler = logs.DequeByInvocationIDHandler() + assert handler.level == logging.INFO + + destinations = { + uuid4(): deque(), + uuid4(): deque(), + } + # We should be able to log with nothing set up, the record + # won't go anywhere but there shouldn't be any errors. + for id in destinations.keys(): + handler.emit(make_record(id=id)) + for dest in destinations.values(): + assert len(dest) == 0 + + # After adding the destinations, the logs should appear. + for id, dest in destinations.items(): + handler.add_destination_for_id(id, dest) + + for id in destinations.keys(): + handler.emit(make_record(id=id)) + for id, dest in destinations.items(): + assert len(dest) == 1 + assert dest[0].invocation_id == id + + +def test_configure_thing_logger(): + """Check the logger is configured correctly.""" + # Start by resetting the handlers on the logger + reset_thing_logger() + + # Then configure it + logs.configure_thing_logger() + + # Check it's correct + assert logs.THING_LOGGER.level == logging.INFO + assert len(logs.THING_LOGGER.handlers) == 1 + assert isinstance(logs.THING_LOGGER.handlers[0], logs.DequeByInvocationIDHandler) + + # Test it out + with fake_invocation_context() as id: + dest = deque() + logs.add_thing_log_destination(id, dest) + logger = logs.THING_LOGGER.getChild("foo") + logger.info("Test") + assert len(dest) == 1 + assert dest[0].msg == "Test" + + +def test_add_thing_log_destination(): + """Check the module-level function to add an invocation log destination.""" + reset_thing_logger() + id = uuid4() + dest = deque() + + with pytest.raises(exc.LogConfigurationError): + # This won't work until the handler is added + logs.add_thing_log_destination(id, dest) + + logs.THING_LOGGER.addHandler(logs.DequeByInvocationIDHandler()) + logs.THING_LOGGER.addHandler(logs.DequeByInvocationIDHandler()) + with pytest.raises(exc.LogConfigurationError): + # More than one handler will also make it fail with an error. + logs.add_thing_log_destination(id, dest) + + reset_thing_logger() + logs.configure_thing_logger() + + thing = create_thing_without_server(ThingThatLogs) + logs.add_thing_log_destination(id, dest) + with set_invocation_id(id): + thing.log_a_message("Test Message.") + assert len(dest) == 1 + assert dest[0].getMessage() == "Test Message."