diff --git a/changelog.d/15417.bugfix b/changelog.d/15417.bugfix new file mode 100644 index 000000000000..300635cbdc11 --- /dev/null +++ b/changelog.d/15417.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where cached key results which were directly fetched would not be properly re-used. diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d2f99dc2acd1..afdf6863d6d1 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -150,18 +150,19 @@ class Keyring: def __init__( self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None ): - self.clock = hs.get_clock() - if key_fetchers is None: - key_fetchers = ( - # Fetch keys from the database. - StoreKeyFetcher(hs), - # Fetch keys from a configured Perspectives server. - PerspectivesKeyFetcher(hs), - # Fetch keys from the origin server directly. - ServerKeyFetcher(hs), - ) - self._key_fetchers = key_fetchers + # Always fetch keys from the database. + mutable_key_fetchers: List[KeyFetcher] = [StoreKeyFetcher(hs)] + # Fetch keys from configured trusted key servers, if any exist. + key_servers = hs.config.key.key_servers + if key_servers: + mutable_key_fetchers.append(PerspectivesKeyFetcher(hs)) + # Finally, fetch keys from the origin server directly. + mutable_key_fetchers.append(ServerKeyFetcher(hs)) + + self._key_fetchers: Iterable[KeyFetcher] = tuple(mutable_key_fetchers) + else: + self._key_fetchers = key_fetchers self._fetch_keys_queue: BatchingQueue[ _FetchKeyRequest, Dict[str, Dict[str, FetchKeyResult]] @@ -510,7 +511,7 @@ async def _fetch_keys( for key_id in queue_value.key_ids ) - res = await self.store.get_server_verify_keys(key_ids_to_fetch) + res = await self.store.get_server_keys_json(key_ids_to_fetch) keys: Dict[str, Dict[str, FetchKeyResult]] = {} for (server_name, key_id), key in res.items(): keys.setdefault(server_name, {})[key_id] = key @@ -522,7 +523,6 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) self.store = hs.get_datastores().main - self.config = hs.config async def process_v2_response( self, from_server: str, response_json: JsonDict, time_added_ms: int @@ -626,7 +626,7 @@ def __init__(self, hs: "HomeServer"): super().__init__(hs) self.clock = hs.get_clock() self.client = hs.get_federation_http_client() - self.key_servers = self.config.key.key_servers + self.key_servers = hs.config.key.key_servers async def _fetch_keys( self, keys_to_fetch: List[_FetchKeyRequest] @@ -775,7 +775,7 @@ async def get_server_verify_key_v2_indirect( keys.setdefault(server_name, {}).update(processed_response) - await self.store.store_server_verify_keys( + await self.store.store_server_signature_keys( perspective_name, time_now_ms, added_keys ) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 3bdb6ec9098d..ff0454ca5706 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -155,7 +155,7 @@ async def query_keys( for key_id in key_ids: store_queries.append((server_name, key_id, None)) - cached = await self.store.get_server_keys_json(store_queries) + cached = await self.store.get_server_keys_json_for_remote(store_queries) json_results: Set[bytes] = set() diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index 89c37a4eb560..1666e3c43b44 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -14,10 +14,12 @@ # limitations under the License. import itertools +import json import logging from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple from signedjson.key import decode_verify_key_bytes +from unpaddedbase64 import decode_base64 from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction @@ -36,15 +38,16 @@ class KeyStore(SQLBaseStore): """Persistence for signature verification keys""" @cached() - def _get_server_verify_key( + def _get_server_signature_key( self, server_name_and_key_id: Tuple[str, str] ) -> FetchKeyResult: raise NotImplementedError() @cachedList( - cached_method_name="_get_server_verify_key", list_name="server_name_and_key_ids" + cached_method_name="_get_server_signature_key", + list_name="server_name_and_key_ids", ) - async def get_server_verify_keys( + async def get_server_signature_keys( self, server_name_and_key_ids: Iterable[Tuple[str, str]] ) -> Dict[Tuple[str, str], FetchKeyResult]: """ @@ -62,10 +65,12 @@ def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None: """Processes a batch of keys to fetch, and adds the result to `keys`.""" # batch_iter always returns tuples so it's safe to do len(batch) - sql = ( - "SELECT server_name, key_id, verify_key, ts_valid_until_ms " - "FROM server_signature_keys WHERE 1=0" - ) + " OR (server_name=? AND key_id=?)" * len(batch) + sql = """ + SELECT server_name, key_id, verify_key, ts_valid_until_ms + FROM server_signature_keys WHERE 1=0 + """ + " OR (server_name=? AND key_id=?)" * len( + batch + ) txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) @@ -89,9 +94,9 @@ def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]: _get_keys(txn, batch) return keys - return await self.db_pool.runInteraction("get_server_verify_keys", _txn) + return await self.db_pool.runInteraction("get_server_signature_keys", _txn) - async def store_server_verify_keys( + async def store_server_signature_keys( self, from_server: str, ts_added_ms: int, @@ -119,7 +124,7 @@ async def store_server_verify_keys( ) ) # invalidate takes a tuple corresponding to the params of - # _get_server_verify_key. _get_server_verify_key only takes one + # _get_server_signature_key. _get_server_signature_key only takes one # param, which is itself the 2-tuple (server_name, key_id). invalidations.append((server_name, key_id)) @@ -134,10 +139,10 @@ async def store_server_verify_keys( "verify_key", ), value_values=value_values, - desc="store_server_verify_keys", + desc="store_server_signature_keys", ) - invalidate = self._get_server_verify_key.invalidate + invalidate = self._get_server_signature_key.invalidate for i in invalidations: invalidate((i,)) @@ -180,7 +185,75 @@ async def store_server_keys_json( desc="store_server_keys_json", ) + # invalidate takes a tuple corresponding to the params of + # _get_server_keys_json. _get_server_keys_json only takes one + # param, which is itself the 2-tuple (server_name, key_id). + self._get_server_keys_json.invalidate((((server_name, key_id),))) + + @cached() + def _get_server_keys_json( + self, server_name_and_key_id: Tuple[str, str] + ) -> FetchKeyResult: + raise NotImplementedError() + + @cachedList( + cached_method_name="_get_server_keys_json", list_name="server_name_and_key_ids" + ) async def get_server_keys_json( + self, server_name_and_key_ids: Iterable[Tuple[str, str]] + ) -> Dict[Tuple[str, str], FetchKeyResult]: + """ + Args: + server_name_and_key_ids: + iterable of (server_name, key-id) tuples to fetch keys for + + Returns: + A map from (server_name, key_id) -> FetchKeyResult, or None if the + key is unknown + """ + keys = {} + + def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None: + """Processes a batch of keys to fetch, and adds the result to `keys`.""" + + # batch_iter always returns tuples so it's safe to do len(batch) + sql = """ + SELECT server_name, key_id, key_json, ts_valid_until_ms + FROM server_keys_json WHERE 1=0 + """ + " OR (server_name=? AND key_id=?)" * len( + batch + ) + + txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) + + for server_name, key_id, key_json_bytes, ts_valid_until_ms in txn: + if ts_valid_until_ms is None: + # Old keys may be stored with a ts_valid_until_ms of null, + # in which case we treat this as if it was set to `0`, i.e. + # it won't match key requests that define a minimum + # `ts_valid_until_ms`. + ts_valid_until_ms = 0 + + # The entire signed JSON response is stored in server_keys_json, + # fetch out the bits needed. + key_json = json.loads(bytes(key_json_bytes)) + key_base64 = key_json["verify_keys"][key_id]["key"] + + keys[(server_name, key_id)] = FetchKeyResult( + verify_key=decode_verify_key_bytes( + key_id, decode_base64(key_base64) + ), + valid_until_ts=ts_valid_until_ms, + ) + + def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]: + for batch in batch_iter(server_name_and_key_ids, 50): + _get_keys(txn, batch) + return keys + + return await self.db_pool.runInteraction("get_server_keys_json", _txn) + + async def get_server_keys_json_for_remote( self, server_keys: Iterable[Tuple[str, Optional[str], Optional[str]]] ) -> Dict[Tuple[str, Optional[str], Optional[str]], List[Dict[str, Any]]]: """Retrieve the key json for a list of server_keys and key ids. @@ -188,8 +261,10 @@ async def get_server_keys_json( that server, key_id, and source triplet entry will be an empty list. The JSON is returned as a byte array so that it can be efficiently used in an HTTP response. + Args: server_keys: List of (server_name, key_id, source) triplets. + Returns: A mapping from (server_name, key_id, source) triplets to a list of dicts """ diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 66102ab93487..7c63b2ea4c15 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -190,10 +190,23 @@ def test_verify_json_for_server(self) -> None: kr = keyring.Keyring(self.hs) key1 = signedjson.key.generate_signing_key("1") - r = self.hs.get_datastores().main.store_server_verify_keys( + r = self.hs.get_datastores().main.store_server_keys_json( "server9", - int(time.time() * 1000), - {("server9", get_key_id(key1)): FetchKeyResult(get_verify_key(key1), 1000)}, + get_key_id(key1), + from_server="test", + ts_now_ms=int(time.time() * 1000), + ts_expires_ms=1000, + # The entire response gets signed & stored, just include the bits we + # care about. + key_json_bytes=canonicaljson.encode_canonical_json( + { + "verify_keys": { + get_key_id(key1): { + "key": encode_verify_key_base64(get_verify_key(key1)) + } + } + } + ), ) self.get_success(r) @@ -280,17 +293,13 @@ def test_verify_json_for_server_with_null_valid_until_ms(self) -> None: mock_fetcher = Mock() mock_fetcher.get_keys = Mock(return_value=make_awaitable({})) - kr = keyring.Keyring( - self.hs, key_fetchers=(StoreKeyFetcher(self.hs), mock_fetcher) - ) - key1 = signedjson.key.generate_signing_key("1") - r = self.hs.get_datastores().main.store_server_verify_keys( + r = self.hs.get_datastores().main.store_server_signature_keys( "server9", int(time.time() * 1000), # None is not a valid value in FetchKeyResult, but we're abusing this # API to insert null values into the database. The nulls get converted - # to 0 when fetched in KeyStore.get_server_verify_keys. + # to 0 when fetched in KeyStore.get_server_signature_keys. {("server9", get_key_id(key1)): FetchKeyResult(get_verify_key(key1), None)}, # type: ignore[arg-type] ) self.get_success(r) @@ -298,27 +307,12 @@ def test_verify_json_for_server_with_null_valid_until_ms(self) -> None: json1: JsonDict = {} signedjson.sign.sign_json(json1, "server9", key1) - # should fail immediately on an unsigned object - d = kr.verify_json_for_server("server9", {}, 0) - self.get_failure(d, SynapseError) - - # should fail on a signed object with a non-zero minimum_valid_until_ms, - # as it tries to refetch the keys and fails. - d = kr.verify_json_for_server("server9", json1, 500) - self.get_failure(d, SynapseError) - - # We expect the keyring tried to refetch the key once. - mock_fetcher.get_keys.assert_called_once_with( - "server9", [get_key_id(key1)], 500 - ) - # should succeed on a signed object with a 0 minimum_valid_until_ms - d = kr.verify_json_for_server( - "server9", - json1, - 0, + d = self.hs.get_datastores().main.get_server_signature_keys( + [("server9", get_key_id(key1))] ) - self.get_success(d) + result = self.get_success(d) + self.assertEquals(result[("server9", get_key_id(key1))].valid_until_ts, 0) def test_verify_json_dedupes_key_requests(self) -> None: """Two requests for the same key should be deduped.""" @@ -464,7 +458,9 @@ async def get_json(destination: str, path: str, **kwargs: Any) -> JsonDict: # check that the perspectives store is correctly updated lookup_triplet = (SERVER_NAME, testverifykey_id, None) key_json = self.get_success( - self.hs.get_datastores().main.get_server_keys_json([lookup_triplet]) + self.hs.get_datastores().main.get_server_keys_json_for_remote( + [lookup_triplet] + ) ) res_keys = key_json[lookup_triplet] self.assertEqual(len(res_keys), 1) @@ -582,7 +578,9 @@ def test_get_keys_from_perspectives(self) -> None: # check that the perspectives store is correctly updated lookup_triplet = (SERVER_NAME, testverifykey_id, None) key_json = self.get_success( - self.hs.get_datastores().main.get_server_keys_json([lookup_triplet]) + self.hs.get_datastores().main.get_server_keys_json_for_remote( + [lookup_triplet] + ) ) res_keys = key_json[lookup_triplet] self.assertEqual(len(res_keys), 1) @@ -703,7 +701,9 @@ def test_get_perspectives_own_key(self) -> None: # check that the perspectives store is correctly updated lookup_triplet = (SERVER_NAME, testverifykey_id, None) key_json = self.get_success( - self.hs.get_datastores().main.get_server_keys_json([lookup_triplet]) + self.hs.get_datastores().main.get_server_keys_json_for_remote( + [lookup_triplet] + ) ) res_keys = key_json[lookup_triplet] self.assertEqual(len(res_keys), 1) diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py index 5901d80f26d7..5d7c13e6d04c 100644 --- a/tests/storage/test_keys.py +++ b/tests/storage/test_keys.py @@ -37,13 +37,13 @@ def decode_verify_key_base64( class KeyStoreTestCase(tests.unittest.HomeserverTestCase): - def test_get_server_verify_keys(self) -> None: + def test_get_server_signature_keys(self) -> None: store = self.hs.get_datastores().main key_id_1 = "ed25519:key1" key_id_2 = "ed25519:KEY_ID_2" self.get_success( - store.store_server_verify_keys( + store.store_server_signature_keys( "from_server", 10, { @@ -54,7 +54,7 @@ def test_get_server_verify_keys(self) -> None: ) res = self.get_success( - store.get_server_verify_keys( + store.get_server_signature_keys( [ ("server1", key_id_1), ("server1", key_id_2), @@ -87,7 +87,7 @@ def test_cache(self) -> None: key_id_2 = "ed25519:key2" self.get_success( - store.store_server_verify_keys( + store.store_server_signature_keys( "from_server", 0, { @@ -98,7 +98,7 @@ def test_cache(self) -> None: ) res = self.get_success( - store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)]) + store.get_server_signature_keys([("srv1", key_id_1), ("srv1", key_id_2)]) ) self.assertEqual(len(res.keys()), 2) @@ -111,20 +111,20 @@ def test_cache(self) -> None: self.assertEqual(res2.valid_until_ts, 200) # we should be able to look up the same thing again without a db hit - res = self.get_success(store.get_server_verify_keys([("srv1", key_id_1)])) + res = self.get_success(store.get_server_signature_keys([("srv1", key_id_1)])) self.assertEqual(len(res.keys()), 1) self.assertEqual(res[("srv1", key_id_1)].verify_key, KEY_1) new_key_2 = signedjson.key.get_verify_key( signedjson.key.generate_signing_key("key2") ) - d = store.store_server_verify_keys( + d = store.store_server_signature_keys( "from_server", 10, {("srv1", key_id_2): FetchKeyResult(new_key_2, 300)} ) self.get_success(d) res = self.get_success( - store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)]) + store.get_server_signature_keys([("srv1", key_id_1), ("srv1", key_id_2)]) ) self.assertEqual(len(res.keys()), 2) diff --git a/tests/unittest.py b/tests/unittest.py index 93fee1c0e6e4..d1a7039ed792 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -67,7 +67,6 @@ ) from synapse.rest import RegisterServletsFunc from synapse.server import HomeServer -from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util import Clock from synapse.util.httpresourcetree import create_resource_tree @@ -790,15 +789,23 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version) self.get_success( - hs.get_datastores().main.store_server_verify_keys( + hs.get_datastores().main.store_server_keys_json( + self.OTHER_SERVER_NAME, + verify_key_id, from_server=self.OTHER_SERVER_NAME, - ts_added_ms=clock.time_msec(), - verify_keys={ - (self.OTHER_SERVER_NAME, verify_key_id): FetchKeyResult( - verify_key=verify_key, - valid_until_ts=clock.time_msec() + 10000, - ), - }, + ts_now_ms=clock.time_msec(), + ts_expires_ms=clock.time_msec() + 10000, + key_json_bytes=canonicaljson.encode_canonical_json( + { + "verify_keys": { + verify_key_id: { + "key": signedjson.key.encode_verify_key_base64( + verify_key + ) + } + } + } + ), ) ) diff --git a/tests/utils.py b/tests/utils.py index a0ac11bc5cd2..e73b46944bd9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -131,6 +131,9 @@ def default_config( # the test signing key is just an arbitrary ed25519 key to keep the config # parser happy "signing_key": "ed25519 a_lPym qvioDNmfExFBRPgdTU+wtFYKq4JfwFRv7sYVgWvmgJg", + # Disable trusted key servers, otherwise unit tests might try to actually + # reach out to matrix.org. + "trusted_key_servers": [], "event_cache_size": 1, "enable_registration": True, "enable_registration_captcha": False,