Skip to content

Commit

Permalink
Revise and improve actor logging
Browse files Browse the repository at this point in the history
This commit revamps the LogActor dramatiq middleware to resolve some
issues observed in the logs.

The main changes are:

- Instead of prefixing log messages with e.g. "[commit abc123]", put
  that info into proper fields rendered into our JSON logs, e.g.
  {"actor": "commit", "publish_id": "abc123", ...}

- Include the message ID (which we reuse as task ID). In the case
  of phase1 commits which can happen multiple times for a single
  publish, without logging the message ID we can't match up which logs
  came from which task.

- Produce generic start/stop/failed messages for every actor, which
  also include the actor's duration. Without this there was no clear way
  to tell when actors started and ended, since most of our actors
  themselves aren't designed to log that internally.

- In the JSON log formatter, just skip any fields which are missing
  from the log record rather than logging them as null. This is done
  because the set of fields differ between web & worker, so either we
  need to use two separate log formats or we just ignore absent fields.
  • Loading branch information
rohanpm committed Dec 13, 2023
1 parent 256f1f0 commit 0c2ecdf
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 62 deletions.
112 changes: 83 additions & 29 deletions exodus_gw/dramatiq/middleware/log_actor.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,108 @@
import logging
from contextvars import ContextVar
from contextvars import ContextVar, copy_context
from functools import wraps
from time import monotonic
from typing import Any

from dramatiq import Middleware
from dramatiq import Actor, Middleware
from dramatiq.middleware import CurrentMessage

CURRENT_PREFIX: ContextVar[str] = ContextVar("CURRENT_PREFIX")
LOG = logging.getLogger("exodus-gw.actor")


class PrefixFilter(logging.Filter):
# A filter which will add CURRENT_PREFIX onto each message.
CURRENT_ACTOR: ContextVar[Actor[[], Any]] = ContextVar("CURRENT_ACTOR")
CURRENT_PUBLISH_ID: ContextVar[str] = ContextVar("CURRENT_PUBLISH_ID")
CURRENT_MESSAGE_ID: ContextVar[str] = ContextVar("CURRENT_MESSAGE_ID")


def in_copied_context(fn):
# Returns a function wrapped to always run in a copy of the
# current context at time of invocation.
#
# This means the function can freely set contextvars without
# having to worry about resetting them later.
@wraps(fn)
def new_fn(*args, **kwargs):
ctx = copy_context()
return ctx.run(fn, *args, **kwargs)

return new_fn


def new_timer():
# Returns a callable which returns the number of milliseconds
# passed since new_timer() was called.
start = monotonic()

def fn():
return int((monotonic() - start) * 1000)

return fn


class ActorFilter(logging.Filter):
# A filter which will add extra fields onto each log record
# with info about the currently executing actor.

def filter(self, record: logging.LogRecord) -> bool:
record.msg = CURRENT_PREFIX.get("") + record.msg
if actor := CURRENT_ACTOR.get(None):
record.actor = actor.actor_name
if publish_id := CURRENT_PUBLISH_ID.get(None):
record.publish_id = publish_id
if message_id := CURRENT_MESSAGE_ID.get(None):
record.message_id = message_id
return True


class LogActorMiddleware(Middleware):
"""Middleware to prefix every log message with the current actor name."""
"""Middleware to add certain logging behaviors onto all actors."""

def __init__(self):
self.filter = PrefixFilter()
self.filter = ActorFilter()

def after_process_boot(self, broker):
logging.getLogger().handlers[0].addFilter(self.filter)

def before_declare_actor(self, broker, actor):
actor.fn = self.wrap_fn_with_prefix(actor.fn)
old_fn = actor.fn

def wrap_fn_with_prefix(self, fn):
# Given a function, returns a wrapped version of it which will adjust
# CURRENT_PREFIX around the function's invocation.

@wraps(fn)
@wraps(old_fn)
def new_fn(*args, **kwargs):
# We want to show the function name (which is the actor name)...
prefix = fn.__name__
# Wrapped function sets context vars before execution...
CURRENT_ACTOR.set(actor)

# If the actor takes a publish or task ID as an argument, we want
# to show that as well
for key in ("publish_id", "task_id"):
if key in kwargs:
prefix = f"{prefix} {kwargs[key]}"
break
if publish_id := kwargs.get("publish_id"):
CURRENT_PUBLISH_ID.set(publish_id)

prefix = f"[{prefix}] "
if message := CurrentMessage.get_current_message():
# This is copied into a contextvar because get_current_message()
# is a thread-local and not a contextvar, and therefore it
# wouldn't be available on log records coming from other
# threads if we don't copy it.
CURRENT_MESSAGE_ID.set(message.message_id)

token = CURRENT_PREFIX.set(prefix)
try:
return fn(*args, **kwargs)
finally:
CURRENT_PREFIX.reset(token)
# ...and also ensures some consistent logs appear around
# the actor invocation: start/stop/error, with timing info.
timer = new_timer()

return new_fn
LOG.info("Starting")
try:
out = old_fn(*args, **kwargs)
LOG.info(
"Succeeded",
extra={"duration_ms": timer(), "success": True},
)
return out
except:
LOG.warning(
"Failed",
exc_info=True,
extra={"duration_ms": timer(), "success": False},
)
raise

# Make the function run in a copied context so that it can
# freely set contextvars without having to unset them.
new_fn = in_copied_context(new_fn)

actor.fn = new_fn
11 changes: 10 additions & 1 deletion exodus_gw/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def __init__(self, datefmt=None):
"message": "message",
"event": "event",
"success": "success",
"actor": "actor",
"publish_id": "publish_id",
"message_id": "message_id",
"duration_ms": "duration_ms",
}
self.datefmt = datefmt

Expand All @@ -81,7 +85,12 @@ def formatTime(self, record, datefmt=None):
return s

def formatMessage(self, record):
return {k: record.__dict__.get(v) for k, v in self.fmt.items()}
absent = object()
return {
k: record.__dict__.get(v)
for k, v in self.fmt.items()
if record.__dict__.get(v, absent) is not absent
}

def format(self, record):
record.message = record.getMessage()
Expand Down
55 changes: 23 additions & 32 deletions tests/dramatiq/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def blocking_fn(self):
self.blocking_sem.acquire()
self.record_call("blocking_fn end")

def log_warning(self, task_id, value):
def log_warning(self, publish_id, value):
self.record_call("log_warning")
logging.getLogger("any-logger").warning(
"warning from actor: %s", value
Expand Down Expand Up @@ -316,29 +316,41 @@ def test_mixed_queues(actors, db):
assert sorted(actors.actor_calls) == ["basic", "basic_other_queue"]


def test_logs_prefixed(actors, caplog):
"""Log messages generated within an actor are prefixed."""
def test_logs_actor_info(actors, caplog: pytest.LogCaptureFixture):
"""Log records generated within an actor include info about that
actor.
"""

actors.log_warning.send(task_id="task-abc123", value="some value")
msg = actors.log_warning.send(
publish_id="publish-abc123", value="some value"
)

# Ensure actor is invoked
assert_soon(lambda: len(actors.actor_calls) == 1)
assert sorted(actors.actor_calls) == ["log_warning"]

# When the warning was logged, it should have automatically embedded
# both the actor name and the task id
assert (
"[log_warning task-abc123] warning from actor: some value"
in caplog.text
)
# Find the log which came from within the actor
for record in caplog.records:
if "warning from actor" in record.getMessage():
break
else:
raise AssertionError("Can't find expected log record")

# It should have included info about the actor
attrs = record.__dict__
assert attrs["actor"] == "log_warning"
assert attrs["publish_id"] == "publish-abc123"
assert attrs["message_id"] == msg.message_id


def test_logs_request_id(actors, caplog):
"""Log messages include "request_id", propagated from asgi_correlation_id."""

token = correlation_id.set("aabbccdd")
try:
actors.log_warning.send(task_id="task-abc123", value="some value")
actors.log_warning.send(
publish_id="publish-abc123", value="some value"
)

# Ensure actor is invoked
assert_soon(lambda: len(actors.actor_calls) == 1)
Expand All @@ -350,27 +362,6 @@ def test_logs_request_id(actors, caplog):
correlation_id.reset(token)


def test_logs_prefixed_threaded(actors, caplog):
"""Log messages generated within an actor-spawned thread are prefixed
(as long as contextvars.Context was propagated).
"""

actors.log_warning_from_thread.send(
task_id="task-abc123", value="some value"
)

# Ensure actor is invoked
assert_soon(lambda: len(actors.actor_calls) == 1)
assert sorted(actors.actor_calls) == ["log_warning_from_thread"]

# When the warning was logged, it should have automatically embedded
# both the actor name and the task id
assert (
"[log_warning_from_thread task-abc123] warning from actor: some value"
in caplog.text
)


def test_queue_backlog(actors, db):
"""Consumers only prefetch a limited number of messages."""

Expand Down

0 comments on commit 0c2ecdf

Please sign in to comment.