Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include eventid in log lines when processing incoming federation transactions #3959

Merged
merged 6 commits into from Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3959.feature
@@ -0,0 +1 @@
Include eventid in log lines when processing incoming federation transactions
32 changes: 17 additions & 15 deletions synapse/federation/federation_server.py
Expand Up @@ -46,6 +46,7 @@
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function

# when processing incoming transactions, we try to handle multiple rooms in
Expand Down Expand Up @@ -187,21 +188,22 @@ def process_pdus_for_room(room_id):

for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
with nested_logging_context(event_id):
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)

yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
Expand Down
65 changes: 39 additions & 26 deletions synapse/handlers/federation.py
Expand Up @@ -339,14 +339,18 @@ def on_receive_pdu(
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,

with logcontext.nested_logging_context(p):
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,
)
)
)
auth_chains.update(got_auth_chain)
state_group = {(x.type, x.state_key): x.event_id for x in state}
state_groups.append(state_group)
auth_chains.update(got_auth_chain)
state_group = {
(x.type, x.state_key): x.event_id for x in state
}
state_groups.append(state_group)

# Resolve any conflicting state
def fetch(ev_ids):
Expand Down Expand Up @@ -483,20 +487,21 @@ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
with logcontext.nested_logging_context(ev.event_id):
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
else:
raise
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
)
else:
raise

@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
Expand Down Expand Up @@ -1135,7 +1140,8 @@ def _handle_queued_pdus(self, room_queue):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
with logcontext.nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
Expand Down Expand Up @@ -1581,15 +1587,22 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):

Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
self._prep_event,

@defer.inlineCallbacks
def prep(ev_info):
event = ev_info["event"]
with logcontext.nested_logging_context(suffix=event.event_id):
res = yield self._prep_event(
origin,
ev_info["event"],
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
defer.returnValue(res)

contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
Expand Down
41 changes: 37 additions & 4 deletions synapse/util/logcontext.py
Expand Up @@ -200,7 +200,7 @@ def __nonzero__(self):

sentinel = Sentinel()

def __init__(self, name=None, parent_context=None):
def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name

Expand All @@ -218,6 +218,13 @@ def __init__(self, name=None, parent_context=None):

self.parent_context = parent_context

if self.parent_context is not None:
self.parent_context.copy_to(self)

if request is not None:
# the request param overrides the request from the parent context
self.request = request

def __str__(self):
return "%s@%x" % (self.name, id(self))

Expand Down Expand Up @@ -256,9 +263,6 @@ def __enter__(self):
)
self.alive = True

if self.parent_context is not None:
self.parent_context.copy_to(self)

return self

def __exit__(self, type, value, traceback):
Expand Down Expand Up @@ -439,6 +443,35 @@ def __exit__(self, type, value, traceback):
)


def nested_logging_context(suffix, parent_context=None):
"""Creates a new logging context as a child of another.

The nested logging context will have a 'request' made up of the parent context's
request, plus the given suffix.

CPU/db usage stats will be added to the parent context's on exit.

Normal usage looks like:

with nested_logging_context(suffix):
# ... do stuff

Args:
suffix (str): suffix to add to the parent context's 'request'.
parent_context (LoggingContext|None): parent context. Will use the current context
if None.

Returns:
LoggingContext: new logging context.
"""
if parent_context is None:
parent_context = LoggingContext.current_context()
return LoggingContext(
parent_context=parent_context,
request=parent_context.request + "-" + suffix,
)


def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
Expand Down
28 changes: 16 additions & 12 deletions tests/test_federation.py
Expand Up @@ -6,6 +6,7 @@
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from synapse.util import Clock
from synapse.util.logcontext import LoggingContext

from tests import unittest
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
Expand Down Expand Up @@ -117,9 +118,10 @@ def post_json(destination, path, data, headers=None, timeout=0):
}
)

d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
with LoggingContext(request="lying_event"):
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)

# Step the reactor, so the database fetches come back
self.reactor.advance(1)
Expand Down Expand Up @@ -209,11 +211,12 @@ def get_json(destination, path, args, headers=None):
}
)

d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
with LoggingContext(request="good_event"):
d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)

bad_event = FrozenEvent(
{
Expand All @@ -230,10 +233,11 @@ def get_json(destination, path, args, headers=None):
}
)

d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)
with LoggingContext(request="bad_event"):
d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)

extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
Expand Down
5 changes: 5 additions & 0 deletions tests/util/test_logcontext.py
Expand Up @@ -159,6 +159,11 @@ def test_make_deferred_yieldable_on_non_deferred(self):
self.assertEqual(r, "bum")
self._check_test_key("one")

def test_nested_logging_context(self):
with LoggingContext(request="foo"):
nested_context = logcontext.nested_logging_context(suffix="bar")
self.assertEqual(nested_context.request, "foo-bar")


# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on
Expand Down