diff --git a/changelog.d/15121.misc b/changelog.d/15121.misc new file mode 100644 index 000000000000..b2cb638960b2 --- /dev/null +++ b/changelog.d/15121.misc @@ -0,0 +1 @@ +Route remote key requests via federation senders. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 58c62f2231f3..3525290ed555 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -100,7 +100,7 @@ }, "federation_sender": { "app": "synapse.app.generic_worker", - "listener_resources": [], + "listener_resources": ["replication"], "endpoint_patterns": [], "shared_extra_conf": {}, "worker_extra_conf": "", @@ -345,7 +345,13 @@ def add_worker_roles_to_shared_config( shared_config.setdefault("pusher_instances", []).append(worker_name) elif worker_type == "federation_sender": + # Some outbound federation requests can be routed via federation senders, + # so federation senders need to be accessible by other workers. shared_config.setdefault("federation_sender_instances", []).append(worker_name) + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } elif worker_type == "event_persister": # Event persisters write to the events stream, so we need to update diff --git a/docs/upgrade.md b/docs/upgrade.md index 15167b8c5825..274a4082a7b1 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -87,6 +87,29 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.79.0 + +## Changes to federation sender config + +_This notice only applies to deployments using multiple workers. Deployments +not using workers are not affected._ + +From Synapse 1.79, only [federation senders]( +https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#federation_sender_instances +) will make outgoing key requests to homeservers and [trusted key servers]( +https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#trusted_key_servers +). This will make it easier for server operators to reason about how Synapse +communicates with the wider federation. As a consequence, all other workers now +ask federation senders to fetch keys on their behalf. + +To facilitate this, + +- federation senders must now be present in the [instance map]( + https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#instance_map + ), and +- federation senders must now run an [http listener]( + https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#listeners + ) which includes the `replication` resource. # Upgrading to v1.78.0 diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 58c695568972..ca52f4a58b5c 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3811,7 +3811,7 @@ send_federation: false ### `federation_sender_instances` It is possible to scale the processes that handle sending outbound federation requests -by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to +by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding its [`worker_name`](#worker_name) to a `federation_sender_instances` map. Doing so will remove handling of this function from the main process. Multiple workers can be added to this map, in which case the work is balanced across them. @@ -3821,6 +3821,10 @@ sending, and if changed all federation sender workers must be stopped at the sam and then started, to ensure that all instances are running with the same config (otherwise events may be dropped). +Federation senders should have a replication [`http` listener](#listeners) +configured, and should be present in the [`instance_map`](#instance_map) +so that other workers can make internal http requests to the federation senders. + Example configuration for a single worker: ```yaml federation_sender_instances: @@ -3832,6 +3836,10 @@ federation_sender_instances: - federation_sender1 - federation_sender2 ``` + +_Changed in Synapse 1.79: Federation senders should now have an http listener +listening for `replication`, and should be present in the `instance_map`._ + --- ### `instance_map` diff --git a/docs/workers.md b/docs/workers.md index 2eb970ffa6a0..5f8cdfb27e1e 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -590,10 +590,18 @@ It is likely this option will be deprecated in the future and not recommended fo new installations. Instead, [use `synapse.app.generic_worker` with the `federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances). Handles sending federation traffic to other servers. Doesn't handle any -REST endpoints itself, but you should set +client-facing REST endpoints itself, but you should set [`send_federation: false`](usage/configuration/config_documentation.md#send_federation) in the shared configuration file to stop the main synapse sending this traffic. +Federation senders should have a replication [`http` listener]( +usage/configuration/config_documentation.md#listeners +) configured, and +should be present in the [`instance_map`]( +usage/configuration/config_documentation.md#instance_map +) so that other workers can make internal +http requests to the federation senders. + If running multiple federation senders then you must list each instance in the [`federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances) @@ -607,6 +615,13 @@ send_federation: false federation_sender_instances: - federation_sender1 - federation_sender2 +instance_map: + - federation_sender1: + - host: localhost + - port: 1001 + - federation_sender2: + - host: localhost + - port: 1002 ``` An example for a federation sender instance: @@ -615,6 +630,9 @@ An example for a federation sender instance: {{#include systemd-with-workers/workers/federation_sender.yaml}} ``` +_Changed in Synapse 1.79: Federation senders should now have an http listener +listening for `replication`, and should be present in the `instance_map`._ + ### `synapse.app.media_repository` Handles the media repository. It can handle all endpoints starting with: diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 2580660b6c27..0b03ad223eed 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -174,7 +174,10 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: "synapse.app.federation_sender", "federation_sender_instances", ) - self.send_federation = self.instance_name in federation_sender_instances + self.send_federation = (self.instance_name in federation_sender_instances) or ( + not federation_sender_instances and self.instance_name == "master" + ) + self.federation_shard_config = ShardedWorkerHandlingConfig( federation_sender_instances ) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 86cd4af9bd5a..7de3e8781fa6 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -14,6 +14,7 @@ import abc import logging +import random from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple import attr @@ -40,10 +41,16 @@ RequestSendFailed, SynapseError, ) +from synapse.config import ConfigError from synapse.config.key import TrustedKeyServer +from synapse.crypto.types import _FetchKeyRequest from synapse.events import EventBase from synapse.events.utils import prune_event_dict from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.replication.http.keys import ( + ReplicationFetchKeysEndpoint, + deserialise_fetch_key_result, +) from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict from synapse.util import unwrapFirstError @@ -123,25 +130,6 @@ class KeyLookupError(ValueError): pass -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _FetchKeyRequest: - """A request for keys for a given server. - - We will continue to try and fetch until we have all the keys listed under - `key_ids` (with an appropriate `valid_until_ts` property) or we run out of - places to fetch keys from. - - Attributes: - server_name: The name of the server that owns the keys. - minimum_valid_until_ts: The timestamp which the keys must be valid until. - key_ids: The IDs of the keys to attempt to fetch - """ - - server_name: str - minimum_valid_until_ts: int - key_ids: List[str] - - class Keyring: """Handles verifying signed JSON objects and fetching the keys needed to do so. @@ -153,14 +141,22 @@ def __init__( self.clock = hs.get_clock() if key_fetchers is None: - key_fetchers = ( - # Fetch keys from the database. - StoreKeyFetcher(hs), - # Fetch keys from a configured Perspectives server. - PerspectivesKeyFetcher(hs), - # Fetch keys from the origin server directly. - ServerKeyFetcher(hs), - ) + if hs.config.worker.send_federation: + key_fetchers = ( + # Fetch keys from the database. + StoreKeyFetcher(hs), + # Fetch keys from a configured Perspectives server. + PerspectivesKeyFetcher(hs), + # Fetch keys from the origin server directly. + ServerKeyFetcher(hs), + ) + else: + key_fetchers = ( + # Fetch keys from the database. + StoreKeyFetcher(hs), + # Ask a federation sender to fetch the keys for us. + InternalWorkerRequestKeyFetcher(hs), + ) self._key_fetchers = key_fetchers self._fetch_keys_queue: BatchingQueue[ @@ -291,9 +287,7 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None: minimum_valid_until_ts=verify_request.minimum_valid_until_ts, key_ids=list(key_ids_to_find), ) - found_keys_by_server = await self._fetch_keys_queue.add_to_queue( - key_request, key=verify_request.server_name - ) + found_keys_by_server = await self.fetch_keys(key_request) # Since we batch up requests the returned set of keys may contain keys # from other servers, so we pull out only the ones we care about. @@ -320,6 +314,15 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None: Codes.UNAUTHORIZED, ) + async def fetch_keys( + self, key_request: _FetchKeyRequest + ) -> Dict[str, Dict[str, FetchKeyResult]]: + """Returns: {server name: {key id: fetch key result}}""" + found_keys_by_server = await self._fetch_keys_queue.add_to_queue( + key_request, key=key_request.server_name + ) + return found_keys_by_server + async def _process_json( self, verify_key: VerifyKey, verify_request: VerifyJsonRequest ) -> None: @@ -469,6 +472,8 @@ async def _inner_fetch_key_request( class KeyFetcher(metaclass=abc.ABCMeta): + """Abstract gadget for fetching keys to validate other homeservers' signatures.""" + def __init__(self, hs: "HomeServer"): self._queue = BatchingQueue( self.__class__.__name__, hs.get_clock(), self._fetch_keys @@ -490,11 +495,15 @@ async def get_keys( async def _fetch_keys( self, keys_to_fetch: List[_FetchKeyRequest] ) -> Dict[str, Dict[str, FetchKeyResult]]: + """ + Returns: + Map from server_name -> key_id -> FetchKeyResult + """ pass class StoreKeyFetcher(KeyFetcher): - """KeyFetcher impl which fetches keys from our data store""" + """Try to retrieve a previously-fetched key from the DB.""" def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -518,6 +527,8 @@ async def _fetch_keys( class BaseV2KeyFetcher(KeyFetcher): + """Abstract helper. Fetch keys by requesting it from some server.""" + def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -620,7 +631,10 @@ async def process_v2_response( class PerspectivesKeyFetcher(BaseV2KeyFetcher): - """KeyFetcher impl which fetches keys from the "perspectives" servers""" + """Fetch keys for some homeserver X by requesting them from a trusted key server Y. + + These trusted key servers were seemingly once known as "perspectives" servers. + """ def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -803,7 +817,7 @@ def _validate_perspectives_response( class ServerKeyFetcher(BaseV2KeyFetcher): - """KeyFetcher impl which fetches keys from the origin servers""" + """Fetch keys for some homeserver X by requesting them directly from X.""" def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -903,3 +917,37 @@ async def get_server_verify_keys_v2_direct( response_json=response, time_added_ms=time_now_ms, ) + + +class InternalWorkerRequestKeyFetcher(KeyFetcher): + """Ask a federation_sender worker to request keys for some homeserver X. + + It may choose to do so via a notary or directly from X itself; we don't care. + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self._federation_shard_config = hs.config.worker.federation_shard_config + if not self._federation_shard_config.instances: + raise ConfigError("No federation senders configured") + self._client = ReplicationFetchKeysEndpoint.make_client(hs) + + async def _fetch_keys( + self, keys_to_fetch: List[_FetchKeyRequest] + ) -> Dict[str, Dict[str, FetchKeyResult]]: + # For simplicity's sake, pick a random federation sender + instance_name = random.choice(self._federation_shard_config.instances) + response = await self._client( + keys_to_fetch=keys_to_fetch, + instance_name=instance_name, + ) + + parsed_response: Dict[str, Dict[str, FetchKeyResult]] = {} + for server_name, keys in response["server_keys"].items(): + deserialised_keys = { + key_id: deserialise_fetch_key_result(key_id, verify_key) + for key_id, verify_key in keys.items() + } + parsed_response.setdefault(server_name, {}).update(deserialised_keys) + + return parsed_response diff --git a/synapse/crypto/types.py b/synapse/crypto/types.py new file mode 100644 index 000000000000..2342a8a125c8 --- /dev/null +++ b/synapse/crypto/types.py @@ -0,0 +1,35 @@ +# Copyright 2023- The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List + +import attr + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _FetchKeyRequest: + """A request for keys for a given server. + + We will continue to try and fetch until we have all the keys listed under + `key_ids` (with an appropriate `valid_until_ts` property) or we run out of + places to fetch keys from. + + Attributes: + server_name: The name of the server that owns the keys. + minimum_valid_until_ts: The timestamp which the keys must be valid until. + key_ids: The IDs of the keys to attempt to fetch + """ + + server_name: str + minimum_valid_until_ts: int + key_ids: List[str] diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index ac9a92240af2..393e19b01bcb 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -19,6 +19,7 @@ account_data, devices, federation, + keys, login, membership, presence, @@ -52,6 +53,7 @@ def register_servlets(self, hs: "HomeServer") -> None: account_data.register_servlets(hs, self) push.register_servlets(hs, self) state.register_servlets(hs, self) + keys.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/keys.py b/synapse/replication/http/keys.py new file mode 100644 index 000000000000..3a053e5acff5 --- /dev/null +++ b/synapse/replication/http/keys.py @@ -0,0 +1,123 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import TYPE_CHECKING, Dict, List, Tuple + +import attr +from signedjson.key import decode_verify_key_bytes, encode_verify_key_base64 +from unpaddedbase64 import decode_base64 + +from twisted.web.server import Request + +from synapse.crypto.types import _FetchKeyRequest +from synapse.http.server import HttpServer +from synapse.replication.http._base import ReplicationEndpoint +from synapse.storage.keys import FetchKeyResult +from synapse.types import JsonDict +from synapse.util.async_helpers import yieldable_gather_results + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__file__) + + +class ReplicationFetchKeysEndpoint(ReplicationEndpoint): + """Another worker is asking us to fetch keys for a homeserver X. + + The request looks like: + + POST /_synapse/replication/fetch_keys + { + keys_to_fetch: [ + { + "server_name": "example.com", + "minimum_valid_until_ts": 123456, + "key_ids": ["ABC", "DEF"] + } + ] + } + + We would normally return a group of FetchKeyResponse structs like the + normal code path does, but FetchKeyResponse holds a nacl.signing.VerifyKey + which is not JSON-serialisable. Instead, for each requested key we respond + with a boolean: `true` meaning we fetched this key, and `false` meaning we + didn't. + + The response takes the form: + + 200 OK + { + "fetched_count": 1 + } + """ + + NAME = "fetch_keys" + PATH_ARGS = () + METHOD = "POST" + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._keyring = hs.get_keyring() + + async def _handle_request( # type: ignore[override] + self, + request: Request, + content: JsonDict, + ) -> Tuple[int, JsonDict]: + parsed_requests = [ + _FetchKeyRequest(**entry) for entry in content["keys_to_fetch"] + ] + + results: List[ + Dict[str, Dict[str, FetchKeyResult]] + ] = await yieldable_gather_results( + self._keyring.fetch_keys, + parsed_requests, + ) + + union_of_keys: Dict[str, Dict[str, JsonDict]] = {} + for result in results: + for server_name, keys in result.items(): + serialised_keys = { + key_id: _serialise_fetch_key_result(verify_key) + for key_id, verify_key in keys.items() + } + union_of_keys.setdefault(server_name, {}).update(serialised_keys) + + return 200, {"server_keys": union_of_keys} + + @staticmethod + async def _serialize_payload(*, keys_to_fetch: List[_FetchKeyRequest]) -> JsonDict: # type: ignore[override] + return {"keys_to_fetch": [attr.asdict(key) for key in keys_to_fetch]} + + +def _serialise_fetch_key_result(result: FetchKeyResult) -> JsonDict: + return { + "verify_key": encode_verify_key_base64(result.verify_key), + "valid_until_ts": result.valid_until_ts, + } + + +def deserialise_fetch_key_result(key_id: str, data: JsonDict) -> FetchKeyResult: + return FetchKeyResult( + verify_key=decode_verify_key_bytes(key_id, decode_base64(data["verify_key"])), + valid_until_ts=data["valid_until_ts"], + ) + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + if hs.config.worker.send_federation: + ReplicationFetchKeysEndpoint(hs).register(http_server) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 1b9696748fdc..f414418adb1d 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -25,10 +25,12 @@ from twisted.internet import defer from twisted.internet.defer import Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import NoResource, Resource -from synapse.api.errors import SynapseError +from synapse.api.errors import HttpResponseException, SynapseError from synapse.crypto import keyring from synapse.crypto.keyring import ( + InternalWorkerRequestKeyFetcher, PerspectivesKeyFetcher, ServerKeyFetcher, StoreKeyFetcher, @@ -39,12 +41,15 @@ current_context, make_deferred_yieldable, ) +from synapse.rest.key.v2 import KeyResource from synapse.server import HomeServer from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict from synapse.util import Clock +from synapse.util.httpresourcetree import create_resource_tree from tests import unittest +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.test_utils import make_awaitable from tests.unittest import logcontext_clean, override_config @@ -757,6 +762,83 @@ def get_key_from_perspectives(response: JsonDict) -> Dict[str, FetchKeyResult]: self.assertEqual(keys, {}, "Expected empty dict with missing origin server sig") +class InternalWorkerRequestKeyFetcherTestCase(BaseMultiWorkerStreamTestCase): + def create_test_resource(self) -> Resource: # type: ignore[override] + return create_resource_tree( + {"/_matrix/key/v2": KeyResource(self.hs)}, root_resource=NoResource() + ) + + def default_config(self) -> Dict[str, Any]: + config = super().default_config() + config.update( + federation_sender_instances=["federation_sender1"], + instance_map={ + "federation_sender1": {"host": "testserv", "port": 1001}, + }, + ) + return config + + def test_key_fetching_works_across_workers(self) -> None: + """Test that a non-fed-sender worker requests keys via a fed-sender.""" + mock_http_client = Mock() + + # 1. Mock out the response from the notary server. + async def mock_post_json(*args: Any, **kwargs: Any) -> JsonDict: + """Mock the request to the notary server.""" + if kwargs.get("path") != "/_matrix/key/v2/query": + raise HttpResponseException(500, "ruh", b"roh") + return {"server_keys": []} + + mock_http_client.post_json = mock_post_json + + # 2. Build a valid response to /_matrix/key/v2/server for the server being + # queried. + SERVER_NAME = "server2" + testkey = signedjson.key.generate_signing_key("ver1") + testverifykey = signedjson.key.get_verify_key(testkey) + testverifykey_id = "ed25519:ver1" + VALID_UNTIL_TS = 200 * 1000 + response = { + "server_name": SERVER_NAME, + "old_verify_keys": {}, + "valid_until_ts": VALID_UNTIL_TS, + "verify_keys": { + testverifykey_id: { + "key": signedjson.key.encode_verify_key_base64(testverifykey) + } + }, + } + signedjson.sign.sign_json(response, SERVER_NAME, testkey) + + async def mock_get_json(*args: Any, **kwargs: Any) -> JsonDict: + if kwargs.get("path") != "/_matrix/key/v2/server": + raise HttpResponseException(500, "ruh", b"roh") + return response + + mock_http_client.get_json = mock_get_json + + # 3. Make a federation homeserver to actually make the request. + self.make_worker_hs( + "synapse.app.generic_worker", + { + "worker_name": "federation_sender1", + "federation_sender_instances": ["federation_sender1"], + }, + federation_http_client=mock_http_client, + ) + + # 4. Use the via-fed-sender fetcher to get keys. + fetcher = InternalWorkerRequestKeyFetcher(self.hs) + keys = self.get_success( + fetcher.get_keys(SERVER_NAME, [testverifykey_id], 0), by=0.1 + ) + k = keys[testverifykey_id] + self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS) + self.assertEqual(k.verify_key, testverifykey) + self.assertEqual(k.verify_key.alg, "ed25519") + self.assertEqual(k.verify_key.version, "ver1") + + def get_key_id(key: SigningKey) -> str: """Get the matrix ID tag for a given SigningKey or VerifyKey""" return "%s:%s" % (key.alg, key.version)