diff --git a/changelog.d/15491.misc b/changelog.d/15491.misc new file mode 100644 index 000000000000..98f88dbf19b2 --- /dev/null +++ b/changelog.d/15491.misc @@ -0,0 +1 @@ +Remove need for `worker_replication_*` based settings in worker configuration yaml by placing this data directly on the `instance_map` instead. diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2 index 42131afc9583..44c6e413cf94 100644 --- a/docker/conf-workers/worker.yaml.j2 +++ b/docker/conf-workers/worker.yaml.j2 @@ -6,10 +6,6 @@ worker_app: "{{ app }}" worker_name: "{{ name }}" -# The replication listener on the main synapse process. -worker_replication_host: 127.0.0.1 -worker_replication_http_port: 9093 - worker_listeners: - type: http port: {{ port }} diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 4beec3daaf8b..79b5b8739764 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -69,6 +69,9 @@ from jinja2 import Environment, FileSystemLoader MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 +MAIN_PROCESS_INSTANCE_NAME = "main" +MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1" +MAIN_PROCESS_REPLICATION_PORT = 9093 # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced # during processing with the name of the worker. @@ -719,8 +722,8 @@ def generate_worker_files( # shared config file. listeners = [ { - "port": 9093, - "bind_address": "127.0.0.1", + "port": MAIN_PROCESS_REPLICATION_PORT, + "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, "type": "http", "resources": [{"names": ["replication"]}], } @@ -870,6 +873,14 @@ def generate_worker_files( workers_in_use = len(requested_worker_types) > 0 + # If there are workers, add the main process to the instance_map too. + if workers_in_use: + instance_map = shared_config.setdefault("instance_map", {}) + instance_map[MAIN_PROCESS_INSTANCE_NAME] = { + "host": MAIN_PROCESS_LOCALHOST_ADDRESS, + "port": MAIN_PROCESS_REPLICATION_PORT, + } + # Shared homeserver config convert( "/conf/shared.yaml.j2", diff --git a/docs/systemd-with-workers/workers/generic_worker.yaml b/docs/systemd-with-workers/workers/generic_worker.yaml index a858f99ed1d9..db6436ee6ebb 100644 --- a/docs/systemd-with-workers/workers/generic_worker.yaml +++ b/docs/systemd-with-workers/workers/generic_worker.yaml @@ -1,10 +1,6 @@ worker_app: synapse.app.generic_worker worker_name: generic_worker1 -# The replication listener on the main synapse process. -worker_replication_host: 127.0.0.1 -worker_replication_http_port: 9093 - worker_listeners: - type: http port: 8083 diff --git a/docs/upgrade.md b/docs/upgrade.md index 0886b0311571..0625de8afb7f 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -88,6 +88,84 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.84.0 + +## Deprecation of `worker_replication_*` configuration settings + +When using workers, +* `worker_replication_host` +* `worker_replication_http_port` +* `worker_replication_http_tls` + +can now be removed from individual worker YAML configuration ***if*** you add the main process to the `instance_map` in the shared YAML configuration, +using the name `main`. + +### Before: +Shared YAML +```yaml +instance_map: + generic_worker1: + host: localhost + port: 5678 + tls: false +``` +Worker YAML +```yaml +worker_app: synapse.app.generic_worker +worker_name: generic_worker1 + +worker_replication_host: localhost +worker_replication_http_port: 3456 +worker_replication_http_tls: false + +worker_listeners: + - type: http + port: 1234 + resources: + - names: [client, federation] + - type: http + port: 5678 + resources: + - names: [replication] + +worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml +``` +### After: +Shared YAML +```yaml +instance_map: + main: + host: localhost + port: 3456 + tls: false + generic_worker1: + host: localhost + port: 5678 + tls: false +``` +Worker YAML +```yaml +worker_app: synapse.app.generic_worker +worker_name: generic_worker1 + +worker_listeners: + - type: http + port: 1234 + resources: + - names: [client, federation] + - type: http + port: 5678 + resources: + - names: [replication] + +worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml + +``` +Notes: +* `tls` is optional but mirrors the functionality of `worker_replication_http_tls` + + + # Upgrading to v1.81.0 ## Application service path & authentication deprecations diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 1b6f2569490e..9d0729d065d0 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3846,15 +3846,20 @@ federation_sender_instances: ### `instance_map` When using workers this should be a map from [`worker_name`](#worker_name) to the -HTTP replication listener of the worker, if configured. +HTTP replication listener of the worker, if configured, and to the main process. Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs a HTTP replication listener, and that listener should be included in the `instance_map`. -(The main process also needs an HTTP replication listener, but it should not be -listed in the `instance_map`.) +The main process also needs an entry on the `instance_map`, and it should be listed under +`main` **if even one other worker exists**. Ensure the port matches with what is declared +inside the `listener` block for a `replication` listener. + Example configuration: ```yaml instance_map: + main: + host: localhost + port: 8030 worker1: host: localhost port: 8034 @@ -3986,6 +3991,7 @@ worker_name: generic_worker1 ``` --- ### `worker_replication_host` +*Deprecated as of version 1.84.0. Place `host` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.* The HTTP replication endpoint that it should talk to on the main Synapse process. The main Synapse process defines this with a `replication` resource in @@ -3997,6 +4003,7 @@ worker_replication_host: 127.0.0.1 ``` --- ### `worker_replication_http_port` +*Deprecated as of version 1.84.0. Place `port` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.* The HTTP replication port that it should talk to on the main Synapse process. The main Synapse process defines this with a `replication` resource in @@ -4008,6 +4015,7 @@ worker_replication_http_port: 9093 ``` --- ### `worker_replication_http_tls` +*Deprecated as of version 1.84.0. Place `tls` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.* Whether TLS should be used for talking to the HTTP replication port on the main Synapse process. @@ -4033,9 +4041,9 @@ A worker can handle HTTP requests. To do so, a `worker_listeners` option must be declared, in the same way as the [`listeners` option](#listeners) in the shared config. -Workers declared in [`stream_writers`](#stream_writers) will need to include a -`replication` listener here, in order to accept internal HTTP requests from -other workers. +Workers declared in [`stream_writers`](#stream_writers) and [`instance_map`](#instance_map) + will need to include a `replication` listener here, in order to accept internal HTTP +requests from other workers. Example configuration: ```yaml diff --git a/docs/workers.md b/docs/workers.md index 6192a46e0950..76e052e8f4f1 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -87,12 +87,18 @@ shared configuration file. ### Shared configuration -Normally, only a couple of changes are needed to make an existing configuration -file suitable for use with workers. First, you need to enable an +Normally, only a few changes are needed to make an existing configuration +file suitable for use with workers: +* First, you need to enable an ["HTTP replication listener"](usage/configuration/config_documentation.md#listeners) -for the main process; and secondly, you need to enable -[redis-based replication](usage/configuration/config_documentation.md#redis). -Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret) +for the main process +* Secondly, you need to enable +[redis-based replication](usage/configuration/config_documentation.md#redis) +* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map) +with the `main` process defined, as well as the relevant connection information from +it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined +is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to. +* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret) can be used to authenticate HTTP traffic between workers. For example: ```yaml @@ -111,6 +117,11 @@ worker_replication_secret: "" redis: enabled: true + +instance_map: + main: + host: 'localhost' + port: 9093 ``` See the [configuration manual](usage/configuration/config_documentation.md) @@ -130,13 +141,13 @@ In the config file for each worker, you must specify: * The type of worker ([`worker_app`](usage/configuration/config_documentation.md#worker_app)). The currently available worker applications are listed [below](#available-worker-applications). * A unique name for the worker ([`worker_name`](usage/configuration/config_documentation.md#worker_name)). - * The HTTP replication endpoint that it should talk to on the main synapse process - ([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and - [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). * If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option with an `http` listener. * **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer. + * **Synapse 1.83 and older:** The HTTP replication endpoint that the worker should talk to on the main synapse process + ([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and + [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). If using Synapse 1.84 and newer, these are not needed if `main` is defined on the [shared configuration](#shared-configuration) `instance_map` For example: @@ -355,11 +366,14 @@ effects of bursts of events from that bridge on events sent by normal users. Additionally, the writing of specific streams (such as events) can be moved off of the main process to a particular worker. -To enable this, the worker must have a -[HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured, -have a [`worker_name`](usage/configuration/config_documentation.md#worker_name) +To enable this, the worker must have: +* An [HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured, +* Have a [`worker_name`](usage/configuration/config_documentation.md#worker_name) and be listed in the [`instance_map`](usage/configuration/config_documentation.md#instance_map) -config. The same worker can handle multiple streams, but unless otherwise documented, +config. +* Have the main process declared on the [`instance_map`](usage/configuration/config_documentation.md#instance_map) as well. + +Note: The same worker can handle multiple streams, but unless otherwise documented, each stream can only have a single writer. For example, to move event persistence off to a dedicated worker, the shared @@ -367,6 +381,9 @@ configuration would include: ```yaml instance_map: + main: + host: localhost + port: 8030 event_persister1: host: localhost port: 8034 diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 95b4047f1d3f..d2311cc857bb 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -39,6 +39,19 @@ Synapse version. Please use ``%s: name_of_worker`` instead. """ +_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA = """ +Missing data for a worker to connect to main process. Please include '%s' in the +`instance_map` declared in your shared yaml configuration, or optionally(as a deprecated +solution) in every worker's yaml as various `worker_replication_*` settings as defined +in workers documentation here: +`https://matrix-org.github.io/synapse/latest/workers.html#worker-configuration` +""" +# This allows for a handy knob when it's time to change from 'master' to +# something with less 'history' +MAIN_PROCESS_INSTANCE_NAME = "master" +# Use this to adjust what the main process is known as in the yaml instance_map +MAIN_PROCESS_INSTANCE_MAP_NAME = "main" + logger = logging.getLogger(__name__) @@ -161,27 +174,15 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: raise ConfigError("worker_log_config must be a string") self.worker_log_config = worker_log_config - # The host used to connect to the main synapse - self.worker_replication_host = config.get("worker_replication_host", None) - # The port on the main synapse for TCP replication if "worker_replication_port" in config: raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",)) - # The port on the main synapse for HTTP replication endpoint - self.worker_replication_http_port = config.get("worker_replication_http_port") - - # The tls mode on the main synapse for HTTP replication endpoint. - # For backward compatibility this defaults to False. - self.worker_replication_http_tls = config.get( - "worker_replication_http_tls", False - ) - # The shared secret used for authentication when connecting to the main synapse. self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) - self.instance_name = self.worker_name or "master" + self.instance_name = self.worker_name or MAIN_PROCESS_INSTANCE_NAME # FIXME: Remove this check after a suitable amount of time. self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -215,12 +216,55 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) # A map from instance name to host/port of their HTTP replication endpoint. + # Check if the main process is declared. Inject it into the map if it's not, + # based first on if a 'main' block is declared then on 'worker_replication_*' + # data. If both are available, default to instance_map. The main process + # itself doesn't need this data as it would never have to talk to itself. + instance_map: Dict[str, Any] = config.get("instance_map", {}) + + if instance_map and self.instance_name is not MAIN_PROCESS_INSTANCE_NAME: + # The host used to connect to the main synapse + main_host = config.get("worker_replication_host", None) + + # The port on the main synapse for HTTP replication endpoint + main_port = config.get("worker_replication_http_port") + + # The tls mode on the main synapse for HTTP replication endpoint. + # For backward compatibility this defaults to False. + main_tls = config.get("worker_replication_http_tls", False) + + # For now, accept 'main' in the instance_map, but the replication system + # expects 'master', force that into being until it's changed later. + if MAIN_PROCESS_INSTANCE_MAP_NAME in instance_map: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = instance_map[ + MAIN_PROCESS_INSTANCE_MAP_NAME + ] + del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME] + + # This is the backwards compatibility bit that handles the + # worker_replication_* bits using setdefault() to not overwrite anything. + elif main_host is not None and main_port is not None: + instance_map.setdefault( + MAIN_PROCESS_INSTANCE_NAME, + { + "host": main_host, + "port": main_port, + "tls": main_tls, + }, + ) + + else: + # If we've gotten here, it means that the main process is not on the + # instance_map and that not enough worker_replication_* variables + # were declared in the worker's yaml. + raise ConfigError( + _MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA + % MAIN_PROCESS_INSTANCE_MAP_NAME + ) + self.instance_map: Dict[ str, InstanceLocationConfig - ] = parse_and_validate_mapping( - config.get("instance_map", {}), - InstanceLocationConfig, - ) + ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) # Map from type of streams to source, c.f. WriterLocations. writers = config.get("stream_writers") or {} diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 8c2c54c07a49..41e988f637a6 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -25,6 +25,7 @@ from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError +from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.http import RequestTimedOutError from synapse.http.server import HttpServer from synapse.http.servlet import parse_json_object_from_request @@ -197,11 +198,6 @@ def make_client(cls, hs: "HomeServer") -> Callable: client = hs.get_simple_http_client() local_instance_name = hs.get_instance_name() - # The value of these option should match the replication listener settings - master_host = hs.config.worker.worker_replication_host - master_port = hs.config.worker.worker_replication_http_port - master_tls = hs.config.worker.worker_replication_http_tls - instance_map = hs.config.worker.instance_map outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) @@ -213,7 +209,9 @@ def make_client(cls, hs: "HomeServer") -> Callable: ) @trace_with_opname("outgoing_replication_request") - async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: + async def send_request( + *, instance_name: str = MAIN_PROCESS_INSTANCE_NAME, **kwargs: Any + ) -> Any: # We have to pull these out here to avoid circular dependencies... streams = hs.get_replication_command_handler().get_streams_to_replicate() replication = hs.get_replication_data_handler() @@ -221,11 +219,7 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: with outgoing_gauge.track_inprogress(): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") - if instance_name == "master": - host = master_host - port = master_port - tls = master_tls - elif instance_name in instance_map: + if instance_name in instance_map: host = instance_map[instance_name].host port = instance_map[instance_name].port tls = instance_map[instance_name].tls diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 758b4bc38bdd..bff7114cd89c 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -837,6 +837,7 @@ def default_config(self) -> Dict[str, Any]: conf = super().default_config() conf["stream_writers"] = {"presence": ["presence_writer"]} conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, "presence_writer": {"host": "testserv", "port": 1001}, } return conf diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 0f1a8a145f6f..eb9b1f1cd9be 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -110,8 +110,7 @@ def create_resource_dict(self) -> Dict[str, Resource]: def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" + config["instance_map"] = {"main": {"host": "testserv", "port": 8765}} return config def _build_replication_data_handler(self) -> "TestReplicationDataHandler": @@ -249,6 +248,7 @@ def default_config(self) -> Dict[str, Any]: """ base = super().default_config() base["redis"] = {"enabled": True} + base["instance_map"] = {"main": {"host": "testserv", "port": 8765}} return base def setUp(self) -> None: @@ -310,7 +310,7 @@ def create_test_resource(self) -> ReplicationRestResource: def make_worker_hs( self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any ) -> HomeServer: - """Make a new worker HS instance, correctly connecting replcation + """Make a new worker HS instance, correctly connecting replication stream to the master HS. Args: @@ -388,8 +388,6 @@ def make_worker_hs( def _get_worker_hs_config(self) -> dict: config = self.default_config() - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" return config def replicate(self) -> None: diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py index 98602371e467..f7bca0063da8 100644 --- a/tests/replication/test_auth.py +++ b/tests/replication/test_auth.py @@ -43,9 +43,6 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" - return config def _test_register(self) -> FakeChannel: diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py index eca503376110..a18859099fac 100644 --- a/tests/replication/test_client_reader_shard.py +++ b/tests/replication/test_client_reader_shard.py @@ -29,8 +29,6 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" return config def test_register_single_worker(self) -> None: diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 7f9cc67e735a..4623d737fbf5 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -50,6 +50,7 @@ def default_config(self) -> dict: conf = super().default_config() conf["stream_writers"] = {"events": ["worker1", "worker2"]} conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, "worker1": {"host": "testserv", "port": 1001}, "worker2": {"host": "testserv", "port": 1002}, }