Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix occasional "Re-starting finished log context" from keyring #8398

Merged
merged 4 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8398.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix "Re-starting finished log context" warning when receiving an event we already had over federation.
70 changes: 44 additions & 26 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
)
from synapse.logging.context import (
PreserveLoggingContext,
current_context,
make_deferred_yieldable,
preserve_fn,
run_in_background,
Expand Down Expand Up @@ -233,8 +232,6 @@ async def _start_key_lookups(self, verify_requests):
"""

try:
ctx = current_context()

# map from server name to a set of outstanding request ids
server_to_request_ids = {}

Expand Down Expand Up @@ -265,12 +262,8 @@ def lookup_done(res, verify_request):

# if there are no more requests for this server, we can drop the lock.
if not server_requests:
with PreserveLoggingContext(ctx):
logger.debug("Releasing key lookup lock on %s", server_name)

# ... but not immediately, as that can cause stack explosions if
# we get a long queue of lookups.
self.clock.call_later(0, drop_server_lock, server_name)
logger.debug("Releasing key lookup lock on %s", server_name)
drop_server_lock(server_name)

return res

Expand Down Expand Up @@ -335,20 +328,32 @@ async def do_iterations():
)

# look for any requests which weren't satisfied
with PreserveLoggingContext():
for verify_request in remaining_requests:
verify_request.key_ready.errback(
SynapseError(
401,
"No key for %s with ids in %s (min_validity %i)"
% (
verify_request.server_name,
verify_request.key_ids,
verify_request.minimum_valid_until_ts,
),
Codes.UNAUTHORIZED,
)
while remaining_requests:
verify_request = remaining_requests.pop()
rq_str = (
"VerifyJsonRequest(server=%s, key_ids=%s, min_valid=%i)"
% (
verify_request.server_name,
verify_request.key_ids,
verify_request.minimum_valid_until_ts,
)
)

# If we run the errback immediately, it may cancel our
# loggingcontext while we are still in it, so instead we
# schedule it for the next time round the reactor.
#
# (this also ensures that we don't get a stack overflow if we
# has a massive queue of lookups waiting for this server).
self.clock.call_later(
0,
verify_request.key_ready.errback,
SynapseError(
401,
"Failed to find any key to satisfy %s" % (rq_str,),
Codes.UNAUTHORIZED,
),
)
except Exception as err:
# we don't really expect to get here, because any errors should already
# have been caught and logged. But if we do, let's log the error and make
Expand Down Expand Up @@ -410,10 +415,23 @@ async def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests):
# key was not valid at this point
continue

with PreserveLoggingContext():
verify_request.key_ready.callback(
(server_name, key_id, fetch_key_result.verify_key)
)
# we have a valid key for this request. If we run the callback
# immediately, it may cancel our loggingcontext while we are still in
# it, so instead we schedule it for the next time round the reactor.
#
# (this also ensures that we don't get a stack overflow if we had
# a massive queue of lookups waiting for this server).
logger.debug(
"Found key %s:%s for %s",
server_name,
key_id,
verify_request.request_name,
)
self.clock.call_later(
0,
verify_request.key_ready.callback,
(server_name, key_id, fetch_key_result.verify_key),
)
completed.append(verify_request)
break

Expand Down
122 changes: 58 additions & 64 deletions tests/crypto/test_keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from signedjson.key import encode_verify_key_base64, get_verify_key

from twisted.internet import defer
from twisted.internet.defer import Deferred, ensureDeferred

from synapse.api.errors import SynapseError
from synapse.crypto import keyring
Expand All @@ -33,7 +34,6 @@
)
from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
current_context,
make_deferred_yieldable,
)
Expand Down Expand Up @@ -68,54 +68,40 @@ def sign_response(self, res):


class KeyringTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
self.mock_perspective_server = MockPerspectiveServer()
self.http_client = Mock()

config = self.default_config()
config["trusted_key_servers"] = [
{
"server_name": self.mock_perspective_server.server_name,
"verify_keys": self.mock_perspective_server.get_verify_keys(),
}
]

return self.setup_test_homeserver(
handlers=None, http_client=self.http_client, config=config
)

def check_context(self, _, expected):
def check_context(self, val, expected):
self.assertEquals(getattr(current_context(), "request", None), expected)
return val

def test_verify_json_objects_for_server_awaits_previous_requests(self):
key1 = signedjson.key.generate_signing_key(1)
mock_fetcher = keyring.KeyFetcher()
mock_fetcher.get_keys = Mock()
kr = keyring.Keyring(self.hs, key_fetchers=(mock_fetcher,))

kr = keyring.Keyring(self.hs)
# a signed object that we are going to try to validate
key1 = signedjson.key.generate_signing_key(1)
json1 = {}
signedjson.sign.sign_json(json1, "server10", key1)

persp_resp = {
"server_keys": [
self.mock_perspective_server.get_signed_key(
"server10", signedjson.key.get_verify_key(key1)
)
]
}
persp_deferred = defer.Deferred()
# start off a first set of lookups. We make the mock fetcher block until this
# deferred completes.
first_lookup_deferred = Deferred()

async def first_lookup_fetch(keys_to_fetch):
self.assertEquals(current_context().request, "context_11")
self.assertEqual(keys_to_fetch, {"server10": {get_key_id(key1): 0}})

async def get_perspectives(**kwargs):
self.assertEquals(current_context().request, "11")
with PreserveLoggingContext():
await persp_deferred
return persp_resp
await make_deferred_yieldable(first_lookup_deferred)
return {
"server10": {
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
}
}

self.http_client.post_json.side_effect = get_perspectives
mock_fetcher.get_keys.side_effect = first_lookup_fetch

# start off a first set of lookups
@defer.inlineCallbacks
def first_lookup():
with LoggingContext("11") as context_11:
context_11.request = "11"
async def first_lookup():
with LoggingContext("context_11") as context_11:
context_11.request = "context_11"

res_deferreds = kr.verify_json_objects_for_server(
[("server10", json1, 0, "test10"), ("server11", {}, 0, "test11")]
Expand All @@ -124,53 +110,61 @@ def first_lookup():
# the unsigned json should be rejected pretty quickly
self.assertTrue(res_deferreds[1].called)
try:
yield res_deferreds[1]
await res_deferreds[1]
self.assertFalse("unsigned json didn't cause a failure")
except SynapseError:
pass

self.assertFalse(res_deferreds[0].called)
res_deferreds[0].addBoth(self.check_context, None)

yield make_deferred_yieldable(res_deferreds[0])
await make_deferred_yieldable(res_deferreds[0])

# let verify_json_objects_for_server finish its work before we kill the
# logcontext
yield self.clock.sleep(0)
d0 = ensureDeferred(first_lookup())

d0 = first_lookup()

# wait a tick for it to send the request to the perspectives server
# (it first tries the datastore)
self.pump()
self.http_client.post_json.assert_called_once()
# wait a tick for it to send the request to the fetcher
# self.pump()
richvdh marked this conversation as resolved.
Show resolved Hide resolved
mock_fetcher.get_keys.assert_called_once()

# a second request for a server with outstanding requests
# should block rather than start a second call
@defer.inlineCallbacks
def second_lookup():
with LoggingContext("12") as context_12:
context_12.request = "12"
self.http_client.post_json.reset_mock()
self.http_client.post_json.return_value = defer.Deferred()

async def second_lookup_fetch(keys_to_fetch):
self.assertEquals(current_context().request, "context_12")
return {
"server10": {
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
}
}

mock_fetcher.get_keys.reset_mock()
mock_fetcher.get_keys.side_effect = second_lookup_fetch
second_lookup_state = [0]

async def second_lookup():
with LoggingContext("context_12") as context_12:
context_12.request = "context_12"

res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1, 0, "test")]
)
res_deferreds_2[0].addBoth(self.check_context, None)
yield make_deferred_yieldable(res_deferreds_2[0])
second_lookup_state[0] = 1
await make_deferred_yieldable(res_deferreds_2[0])
second_lookup_state[0] = 2

# let verify_json_objects_for_server finish its work before we kill the
# logcontext
yield self.clock.sleep(0)

d2 = second_lookup()
d2 = ensureDeferred(second_lookup())

self.pump()
self.http_client.post_json.assert_not_called()
# the second request should be pending, but the fetcher should not yet have been
# called
self.assertEqual(second_lookup_state[0], 1)
mock_fetcher.get_keys.assert_not_called()

# complete the first request
persp_deferred.callback(persp_resp)
first_lookup_deferred.callback(None)

# and now both verifications should succeed.
self.get_success(d0)
self.get_success(d2)

Expand Down