Skip to content

Commit

Permalink
Merge pull request #648 from rohanpm/improve-actor-logging
Browse files Browse the repository at this point in the history
Revise and improve actor logging
  • Loading branch information
rohanpm committed Dec 14, 2023
2 parents 256f1f0 + 0c2ecdf commit a5e050c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 62 deletions.
112 changes: 83 additions & 29 deletions exodus_gw/dramatiq/middleware/log_actor.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,108 @@
import logging
from contextvars import ContextVar
from contextvars import ContextVar, copy_context
from functools import wraps
from time import monotonic
from typing import Any

from dramatiq import Middleware
from dramatiq import Actor, Middleware
from dramatiq.middleware import CurrentMessage

CURRENT_PREFIX: ContextVar[str] = ContextVar("CURRENT_PREFIX")
LOG = logging.getLogger("exodus-gw.actor")


class PrefixFilter(logging.Filter):
# A filter which will add CURRENT_PREFIX onto each message.
CURRENT_ACTOR: ContextVar[Actor[[], Any]] = ContextVar("CURRENT_ACTOR")
CURRENT_PUBLISH_ID: ContextVar[str] = ContextVar("CURRENT_PUBLISH_ID")
CURRENT_MESSAGE_ID: ContextVar[str] = ContextVar("CURRENT_MESSAGE_ID")


def in_copied_context(fn):
# Returns a function wrapped to always run in a copy of the
# current context at time of invocation.
#
# This means the function can freely set contextvars without
# having to worry about resetting them later.
@wraps(fn)
def new_fn(*args, **kwargs):
ctx = copy_context()
return ctx.run(fn, *args, **kwargs)

return new_fn


def new_timer():
# Returns a callable which returns the number of milliseconds
# passed since new_timer() was called.
start = monotonic()

def fn():
return int((monotonic() - start) * 1000)

return fn


class ActorFilter(logging.Filter):
# A filter which will add extra fields onto each log record
# with info about the currently executing actor.

def filter(self, record: logging.LogRecord) -> bool:
record.msg = CURRENT_PREFIX.get("") + record.msg
if actor := CURRENT_ACTOR.get(None):
record.actor = actor.actor_name
if publish_id := CURRENT_PUBLISH_ID.get(None):
record.publish_id = publish_id
if message_id := CURRENT_MESSAGE_ID.get(None):
record.message_id = message_id
return True


class LogActorMiddleware(Middleware):
"""Middleware to prefix every log message with the current actor name."""
"""Middleware to add certain logging behaviors onto all actors."""

def __init__(self):
self.filter = PrefixFilter()
self.filter = ActorFilter()

def after_process_boot(self, broker):
logging.getLogger().handlers[0].addFilter(self.filter)

def before_declare_actor(self, broker, actor):
actor.fn = self.wrap_fn_with_prefix(actor.fn)
old_fn = actor.fn

def wrap_fn_with_prefix(self, fn):
# Given a function, returns a wrapped version of it which will adjust
# CURRENT_PREFIX around the function's invocation.

@wraps(fn)
@wraps(old_fn)
def new_fn(*args, **kwargs):
# We want to show the function name (which is the actor name)...
prefix = fn.__name__
# Wrapped function sets context vars before execution...
CURRENT_ACTOR.set(actor)

# If the actor takes a publish or task ID as an argument, we want
# to show that as well
for key in ("publish_id", "task_id"):
if key in kwargs:
prefix = f"{prefix} {kwargs[key]}"
break
if publish_id := kwargs.get("publish_id"):
CURRENT_PUBLISH_ID.set(publish_id)

prefix = f"[{prefix}] "
if message := CurrentMessage.get_current_message():
# This is copied into a contextvar because get_current_message()
# is a thread-local and not a contextvar, and therefore it
# wouldn't be available on log records coming from other
# threads if we don't copy it.
CURRENT_MESSAGE_ID.set(message.message_id)

token = CURRENT_PREFIX.set(prefix)
try:
return fn(*args, **kwargs)
finally:
CURRENT_PREFIX.reset(token)
# ...and also ensures some consistent logs appear around
# the actor invocation: start/stop/error, with timing info.
timer = new_timer()

return new_fn
LOG.info("Starting")
try:
out = old_fn(*args, **kwargs)
LOG.info(
"Succeeded",
extra={"duration_ms": timer(), "success": True},
)
return out
except:
LOG.warning(
"Failed",
exc_info=True,
extra={"duration_ms": timer(), "success": False},
)
raise

# Make the function run in a copied context so that it can
# freely set contextvars without having to unset them.
new_fn = in_copied_context(new_fn)

actor.fn = new_fn
11 changes: 10 additions & 1 deletion exodus_gw/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def __init__(self, datefmt=None):
"message": "message",
"event": "event",
"success": "success",
"actor": "actor",
"publish_id": "publish_id",
"message_id": "message_id",
"duration_ms": "duration_ms",
}
self.datefmt = datefmt

Expand All @@ -81,7 +85,12 @@ def formatTime(self, record, datefmt=None):
return s

def formatMessage(self, record):
return {k: record.__dict__.get(v) for k, v in self.fmt.items()}
absent = object()
return {
k: record.__dict__.get(v)
for k, v in self.fmt.items()
if record.__dict__.get(v, absent) is not absent
}

def format(self, record):
record.message = record.getMessage()
Expand Down
55 changes: 23 additions & 32 deletions tests/dramatiq/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def blocking_fn(self):
self.blocking_sem.acquire()
self.record_call("blocking_fn end")

def log_warning(self, task_id, value):
def log_warning(self, publish_id, value):
self.record_call("log_warning")
logging.getLogger("any-logger").warning(
"warning from actor: %s", value
Expand Down Expand Up @@ -316,29 +316,41 @@ def test_mixed_queues(actors, db):
assert sorted(actors.actor_calls) == ["basic", "basic_other_queue"]


def test_logs_prefixed(actors, caplog):
"""Log messages generated within an actor are prefixed."""
def test_logs_actor_info(actors, caplog: pytest.LogCaptureFixture):
"""Log records generated within an actor include info about that
actor.
"""

actors.log_warning.send(task_id="task-abc123", value="some value")
msg = actors.log_warning.send(
publish_id="publish-abc123", value="some value"
)

# Ensure actor is invoked
assert_soon(lambda: len(actors.actor_calls) == 1)
assert sorted(actors.actor_calls) == ["log_warning"]

# When the warning was logged, it should have automatically embedded
# both the actor name and the task id
assert (
"[log_warning task-abc123] warning from actor: some value"
in caplog.text
)
# Find the log which came from within the actor
for record in caplog.records:
if "warning from actor" in record.getMessage():
break
else:
raise AssertionError("Can't find expected log record")

# It should have included info about the actor
attrs = record.__dict__
assert attrs["actor"] == "log_warning"
assert attrs["publish_id"] == "publish-abc123"
assert attrs["message_id"] == msg.message_id


def test_logs_request_id(actors, caplog):
"""Log messages include "request_id", propagated from asgi_correlation_id."""

token = correlation_id.set("aabbccdd")
try:
actors.log_warning.send(task_id="task-abc123", value="some value")
actors.log_warning.send(
publish_id="publish-abc123", value="some value"
)

# Ensure actor is invoked
assert_soon(lambda: len(actors.actor_calls) == 1)
Expand All @@ -350,27 +362,6 @@ def test_logs_request_id(actors, caplog):
correlation_id.reset(token)


def test_logs_prefixed_threaded(actors, caplog):
"""Log messages generated within an actor-spawned thread are prefixed
(as long as contextvars.Context was propagated).
"""

actors.log_warning_from_thread.send(
task_id="task-abc123", value="some value"
)

# Ensure actor is invoked
assert_soon(lambda: len(actors.actor_calls) == 1)
assert sorted(actors.actor_calls) == ["log_warning_from_thread"]

# When the warning was logged, it should have automatically embedded
# both the actor name and the task id
assert (
"[log_warning_from_thread task-abc123] warning from actor: some value"
in caplog.text
)


def test_queue_backlog(actors, db):
"""Consumers only prefetch a limited number of messages."""

Expand Down

0 comments on commit a5e050c

Please sign in to comment.