Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/source/actions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,49 @@ The first is ``self`` (the first positional argument), which is always the
supply resources needed by the action. Most often, this is a way of accessing
other `.Things` on the same server.

.. action_logging:
Logging from actions
--------------------
Action code should use `.Thing.logger` to log messages. This will be configured
to handle messages on a per-invocation basis and make them available when the action
is queried over HTTP.

This may be used to display status updates to the user when an action takes
a long time to run, or it may simply be a helpful debugging aid.

See :mod:`.logs` for details of how this is implemented.

.. action_cancellation:
Cancelling actions
------------------
If an action could run for a long time, it is useful to be able to cancel it
cleanly. LabThings makes provision for this by allowing actions to be cancelled
using a ``DELETE`` HTTP request. In order to allow an action to be cancelled,
you must give LabThings opportunities to interrupt it. This is most often done
by replacing a `time.sleep()` statement with `.cancellable_sleep()` which
is equivalent, but will raise an exception if the action is cancelled.

For more advanced options, see `.invocation_contexts` for detail.

.. invocation_context:
Invocation contexts
-------------------
Cancelling actions and capturing their logs requires action code to use a
specific logger and check for cancel events. This is done using `contextvars`
such that the action code can use module-level symbols rather than needing
to explicitly pass the logger and cancel hook as arguments to the action
method.

Usually, you don't need to consider this mechanism: simply use the invocation
logger or cancel hook as explained above. However, if you want to run actions
outside of the server (for example, for testing purposes) or if you want to
call one action from another action, but not share the cancellation signal
or log, functions are provided in `.invocation_contexts` to manage this.

If you start a new thread from an action, code running in that thread will
not have the invocation ID set in a context variable. A subclass of
`threading.Thread` is provided to do this, `.ThreadWithInvocationID`\ .

Raising exceptions
------------------
If an action raises an unhandled exception, the action will terminate with an Error
Expand Down
8 changes: 8 additions & 0 deletions docs/source/concurrency.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ Calling Things from other Things

When one `Thing` calls the actions or properties of another `.Thing`, either directly or via a `.DirectThingClient`, no new threads are spawned: the action or property is run in the same thread as the caller. This mirrors the behaviour of the `.ThingClient`, which blocks until the action or property is complete. See :doc:`using_things` for more details on how to call actions and properties of other Things.

Invocations and concurrency
---------------------------

Each time an action is run ("invoked" in :ref:`wot_cc`), we create a new thread to run it. This thread has a context variable set, such that ``lt.cancellable_sleep`` and ``lt.get_invocation_logger`` are aware of which invocation is currently running. If an action spawns a new thread (e.g. using `threading.Thread`\ ), this new thread will not have an invocation ID, and consequently the two invocation-specific functions mentioned will not work.

Usually, the best solution to this problem is to generate a new invocation ID for the thread. This means only the original action thread will receive cancellation events, and only the original action thread will log to the invocation logger. If the action is cancelled, you must cancel the background thread. This is the behaviour of `.ThreadWithInvocationID`\ .

It is also possible to copy the current invocation ID to a new thread. This is often a bad idea, as it's ill-defined whether the exception will arise in the original thread or the new one if the invocation is cancelled. Logs from the two threads will also be interleaved. If it's desirable to log from the background thread, the invocation logger may safely be passed as an argument, rather than accessed via ``lt.get_invocation_logger``\ .
8 changes: 8 additions & 0 deletions src/labthings_fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
from .outputs import blob
from .server import ThingServer, cli
from .client import ThingClient
from .invocation_contexts import (
cancellable_sleep,
raise_if_cancelled,
ThreadWithInvocationID,
)

# The symbols in __all__ are part of our public API.
# They are imported when using `import labthings_fastapi as lt`.
Expand All @@ -55,4 +60,7 @@
"ThingServer",
"cli",
"ThingClient",
"cancellable_sleep",
"raise_if_cancelled",
"ThreadWithInvocationID",
]
165 changes: 64 additions & 101 deletions src/labthings_fastapi/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,29 @@
import logging
from collections import deque
from threading import Thread, Lock
from typing import MutableSequence, Optional, Any
from typing import Optional, Any
import uuid
from typing import TYPE_CHECKING
import weakref
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel

from labthings_fastapi.logs import add_thing_log_destination

from ..utilities import model_to_dict
from ..utilities.introspection import EmptyInput
from ..thing_description._model import LinkElement
from .invocation_model import InvocationModel, InvocationStatus, LogRecordModel
from ..dependencies.invocation import (
CancelHook,
from ..exceptions import (
InvocationCancelledError,
InvocationError,
invocation_logger,
)
from ..outputs.blob import BlobIOContextDep, blobdata_to_url_ctx
from ..invocation_contexts import (
CancelEvent,
get_cancel_event,
set_invocation_id,
)

if TYPE_CHECKING:
# We only need these imports for type hints, so this avoids circular imports.
Expand Down Expand Up @@ -77,7 +82,6 @@ def __init__(
input: Optional[BaseModel] = None,
dependencies: Optional[dict[str, Any]] = None,
log_len: int = 1000,
cancel_hook: Optional[CancelHook] = None,
) -> None:
"""Create a thread to run an action and track its outputs.

Expand All @@ -96,8 +100,6 @@ def __init__(
FastAPI by its dependency injection mechanism.
:param log_len: sets the number of log entries that will be held in
memory by the invocation's logger.
:param cancel_hook: is a `threading.Event` subclass that tells the
invocation it's time to stop. See `.CancelHook`.
"""
Thread.__init__(self, daemon=True)

Expand All @@ -106,7 +108,6 @@ def __init__(
self.thing_ref = weakref.ref(thing)
self.input = input if input is not None else EmptyInput()
self.dependencies = dependencies if dependencies is not None else {}
self.cancel_hook = cancel_hook

# A UUID for the Invocation (not the same as the threading.Thread ident)
self._ID = id # Task ID
Expand Down Expand Up @@ -195,14 +196,18 @@ def thing(self) -> Thing:
raise RuntimeError("The `Thing` on which an action was run is missing!")
return thing

@property
def cancel_hook(self) -> CancelEvent:
"""The cancel event associated with this Invocation."""
return get_cancel_event(self.id)

def cancel(self) -> None:
"""Cancel the task by requesting the code to stop.

This is an opt-in feature: the action must use
a `.CancelHook` dependency and periodically check it.
"""
if self.cancel_hook is not None:
self.cancel_hook.set()
self.cancel_hook.set()

def response(self, request: Optional[Request] = None) -> InvocationModel:
"""Generate a representation of the invocation suitable for HTTP.
Expand Down Expand Up @@ -270,95 +275,57 @@ def run(self) -> None:
# self.action evaluates to an ActionDescriptor. This confuses mypy,
# which thinks we are calling ActionDescriptor.__get__.
action: ActionDescriptor = self.action # type: ignore[call-overload]
# Create a logger just for this invocation, keyed to the invocation id
# Logs that go to this logger will be copied into `self._log`
handler = DequeLogHandler(dest=self._log)
logger = invocation_logger(self.id)
logger.addHandler(handler)
try:
action.emit_changed_event(self.thing, self._status.value)

thing = self.thing
kwargs = model_to_dict(self.input)
if thing is None: # pragma: no cover
# The Thing is stored as a weakref, but it will always exist
# while the server is running - this error should never
# occur.
raise RuntimeError("Cannot start an invocation without a Thing.")

with self._status_lock:
self._status = InvocationStatus.RUNNING
self._start_time = datetime.datetime.now()
action.emit_changed_event(self.thing, self._status.value)

# The next line actually runs the action.
ret = action.__get__(thing)(**kwargs, **self.dependencies)

with self._status_lock:
self._return_value = ret
self._status = InvocationStatus.COMPLETED
action.emit_changed_event(self.thing, self._status.value)
except InvocationCancelledError:
logger.info(f"Invocation {self.id} was cancelled.")
with self._status_lock:
self._status = InvocationStatus.CANCELLED
action.emit_changed_event(self.thing, self._status.value)
except Exception as e: # skipcq: PYL-W0703
# First log
if isinstance(e, InvocationError):
# Log without traceback
logger.error(e)
else:
logger.exception(e)
# Then set status
with self._status_lock:
self._status = InvocationStatus.ERROR
self._exception = e
logger = self.thing.logger
# The line below saves records matching our ID to ``self._log``
add_thing_log_destination(self.id, self._log)
with set_invocation_id(self.id):
try:
action.emit_changed_event(self.thing, self._status.value)
finally:
with self._status_lock:
self._end_time = datetime.datetime.now()
self.expiry_time = self._end_time + datetime.timedelta(
seconds=self.retention_time,
)
logger.removeHandler(handler) # Stop saving logs
# If we don't remove the log handler, it's a circular ref/memory leak.


class DequeLogHandler(logging.Handler):
"""A log handler that stores entries in memory."""

def __init__(
self,
dest: MutableSequence,
level: int = logging.INFO,
) -> None:
"""Set up a log handler that appends messages to a deque.

.. warning::
This log handler does not currently rotate or truncate
the list - so if you use it on a thread that produces a
lot of log messages, you may run into memory problems.

Using a `.deque` with a finite capacity helps to mitigate
this.

:param dest: should specify a deque, to which we will append
each log entry as it comes in. This is assumed to be thread
safe.
:param level: sets the level of the logger. For most invocations,
a log level of `logging.INFO` is appropriate.
"""
logging.Handler.__init__(self)
self.setLevel(level)
self.dest = dest

def emit(self, record: logging.LogRecord) -> None:
"""Save a log record to the destination deque.

:param record: the `logging.LogRecord` object to add.
"""
self.dest.append(record)
thing = self.thing
kwargs = model_to_dict(self.input)
if thing is None: # pragma: no cover
# The Thing is stored as a weakref, but it will always exist
# while the server is running - this error should never
# occur.
raise RuntimeError("Cannot start an invocation without a Thing.")

with self._status_lock:
self._status = InvocationStatus.RUNNING
self._start_time = datetime.datetime.now()
action.emit_changed_event(self.thing, self._status.value)

bound_method = action.__get__(thing)
# Actually run the action
ret = bound_method(**kwargs, **self.dependencies)

with self._status_lock:
self._return_value = ret
self._status = InvocationStatus.COMPLETED
action.emit_changed_event(self.thing, self._status.value)
except InvocationCancelledError:
logger.info(f"Invocation {self.id} was cancelled.")
with self._status_lock:
self._status = InvocationStatus.CANCELLED
action.emit_changed_event(self.thing, self._status.value)
except Exception as e: # skipcq: PYL-W0703
# First log
if isinstance(e, InvocationError):
# Log without traceback
logger.error(e)
else:
logger.exception(e)
# Then set status
with self._status_lock:
self._status = InvocationStatus.ERROR
self._exception = e
action.emit_changed_event(self.thing, self._status.value)
finally:
with self._status_lock:
self._end_time = datetime.datetime.now()
self.expiry_time = self._end_time + datetime.timedelta(
seconds=self.retention_time,
)


class ActionManager:
Expand Down Expand Up @@ -390,7 +357,6 @@ def invoke_action(
id: uuid.UUID,
input: Any,
dependencies: dict[str, Any],
cancel_hook: CancelHook,
) -> Invocation:
"""Invoke an action, returning the thread where it's running.

Expand All @@ -409,8 +375,6 @@ def invoke_action(
keyword arguments.
:param dependencies: is a dictionary of keyword arguments, supplied by
FastAPI by its dependency injection mechanism.
:param cancel_hook: is a `threading.Event` subclass that tells the
invocation it's time to stop. See `.CancelHook`.

:return: an `.Invocation` object that has been started.
"""
Expand All @@ -420,7 +384,6 @@ def invoke_action(
input=input,
dependencies=dependencies,
id=id,
cancel_hook=cancel_hook,
)
self.append_invocation(thread)
thread.start()
Expand Down
Loading
Loading