From 7467a56a43f86770690bb69494d5c515d7e77cc3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 9 May 2022 12:19:28 +0100 Subject: [PATCH 01/12] Listen to more Redis channels so we can split up traffic later --- synapse/replication/tcp/redis.py | 63 +++++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 989c5be0327e..7f598b31cdee 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,7 +14,7 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast +from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast import attr import txredisapi @@ -24,6 +24,7 @@ from twisted.internet.interfaces import IAddress, IConnector from twisted.python.failure import Failure +from synapse.config.homeserver import HomeServerConfig from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import ( BackgroundProcessLoggingContext, @@ -85,14 +86,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): Attributes: synapse_handler: The command handler to handle incoming commands. - synapse_stream_name: The *redis* stream name to subscribe to and publish + synapse_stream_prefix: The *redis* stream name to subscribe to and publish from (not anything to do with Synapse replication streams). synapse_outbound_redis_connection: The connection to redis to use to send commands. """ synapse_handler: "ReplicationCommandHandler" - synapse_stream_name: str + synapse_stream_prefix: str + synapse_subscribed_channels: List[str] synapse_outbound_redis_connection: txredisapi.ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): @@ -117,8 +119,13 @@ async def _send_subscribe(self) -> None: # it's important to make sure that we only send the REPLICATE command once we # have successfully subscribed to the stream - otherwise we might miss the # POSITION response sent back by the other end. - logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name) - await make_deferred_yieldable(self.subscribe(self.synapse_stream_name)) + fully_qualified_stream_names = [ + f"{self.synapse_stream_prefix}/{stream_suffix}" + for stream_suffix in self.synapse_subscribed_channels + ] + [self.synapse_stream_prefix] + logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names) + await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names)) + logger.info( "Successfully subscribed to redis stream, sending REPLICATE command" ) @@ -217,7 +224,7 @@ async def _async_send_command(self, cmd: Command) -> None: await make_deferred_yieldable( self.synapse_outbound_redis_connection.publish( - self.synapse_stream_name, encoded_string + self.synapse_stream_prefix, encoded_string ) ) @@ -300,7 +307,7 @@ def format_address(address: IAddress) -> str: class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): """This is a reconnecting factory that connects to redis and immediately - subscribes to a stream. + subscribes to some streams. Args: hs @@ -326,10 +333,47 @@ def __init__( ) self.synapse_handler = hs.get_replication_command_handler() - self.synapse_stream_name = hs.hostname + self.synapse_stream_prefix = hs.hostname + self.synapse_subscribed_channels = ( + RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( + hs.config + ) + ) self.synapse_outbound_redis_connection = outbound_redis_connection + @staticmethod + def channels_to_subscribe_to_for_config(config: HomeServerConfig) -> List[str]: + subscribe_to = [] + + if config.worker.run_background_tasks or config.worker.worker_app is None: + # If we're the main process or the background worker, we want to process + # User IP addresses + subscribe_to.append("USER_IP") + + # Subscribe to the following RDATA channels. + # We may be able to reduce this in the future. + subscribe_to += [ + "RDATA/account_data", + "RDATA/backfill", + "RDATA/caches", + "RDATA/device_lists", + "RDATA/events", + "RDATA/federation", + "RDATA/groups", + "RDATA/presence", + "RDATA/presence_federation", + "RDATA/push_rules", + "RDATA/pushers", + "RDATA/receipts", + "RDATA/tag_account_data", + "RDATA/to_device", + "RDATA/typing", + "RDATA/user_signature", + ] + + return subscribe_to + def buildProtocol(self, addr: IAddress) -> RedisSubscriber: p = super().buildProtocol(addr) p = cast(RedisSubscriber, p) @@ -340,7 +384,8 @@ def buildProtocol(self, addr: IAddress) -> RedisSubscriber: # protocol. p.synapse_handler = self.synapse_handler p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection - p.synapse_stream_name = self.synapse_stream_name + p.synapse_stream_prefix = self.synapse_stream_prefix + p.synapse_subscribed_channels = self.synapse_subscribed_channels return p From f7ee5a89aff7efacb3d8f84537f7839f875ad028 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 9 May 2022 12:34:33 +0100 Subject: [PATCH 02/12] Test that we listen to the right channels on the right workers (for now) --- tests/replication/tcp/test_redis.py | 85 +++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 tests/replication/tcp/test_redis.py diff --git a/tests/replication/tcp/test_redis.py b/tests/replication/tcp/test_redis.py new file mode 100644 index 000000000000..a51ea9b39a7d --- /dev/null +++ b/tests/replication/tcp/test_redis.py @@ -0,0 +1,85 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory + +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.unittest import HomeserverTestCase + +ALL_RDATA_CHANNELS = [ + "RDATA/account_data", + "RDATA/backfill", + "RDATA/caches", + "RDATA/device_lists", + "RDATA/events", + "RDATA/federation", + "RDATA/groups", + "RDATA/presence", + "RDATA/presence_federation", + "RDATA/push_rules", + "RDATA/pushers", + "RDATA/receipts", + "RDATA/tag_account_data", + "RDATA/to_device", + "RDATA/typing", + "RDATA/user_signature", +] + + +class RedisTestCase(HomeserverTestCase): + def test_subscribed_to_enough_redis_channels(self) -> None: + # The default main process is subscribed to USER_IP and all RDATA channels. + self.assertCountEqual( + RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( + self.hs.config + ), + [ + "USER_IP", + ] + + ALL_RDATA_CHANNELS, + ) + + +class RedisWorkerTestCase(BaseMultiWorkerStreamTestCase): + def test_background_worker_subscribed_to_user_ip(self) -> None: + # The default main process is subscribed to USER_IP and all RDATA channels. + worker1 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker1", + "run_background_tasks_on": "worker1", + }, + ) + self.assertIn( + "USER_IP", + RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( + worker1.config + ), + ) + + def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: + # The default main process is subscribed to USER_IP and all RDATA channels. + worker2 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker2", + "run_background_tasks_on": "worker1", + }, + ) + self.assertNotIn( + "USER_IP", + RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( + worker2.config + ), + ) From 25698faf7ddaff421cdc74ad464b7438f9544e2b Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 9 May 2022 12:38:35 +0100 Subject: [PATCH 03/12] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/12672.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12672.misc diff --git a/changelog.d/12672.misc b/changelog.d/12672.misc new file mode 100644 index 000000000000..265e0a801f78 --- /dev/null +++ b/changelog.d/12672.misc @@ -0,0 +1 @@ +Lay some foundation work to allow workers to only subscribe to some kinds of messages, reducing replication traffic. \ No newline at end of file From f97cf42d48928e49448545eb1e3bdee098999a62 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 10 May 2022 11:09:36 +0100 Subject: [PATCH 04/12] Skip Redis test if Redis not installed --- tests/replication/tcp/test_redis.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/replication/tcp/test_redis.py b/tests/replication/tcp/test_redis.py index a51ea9b39a7d..7715d0bd88eb 100644 --- a/tests/replication/tcp/test_redis.py +++ b/tests/replication/tcp/test_redis.py @@ -12,7 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory +try: + # We only import it to see if it's installed, so ignore the 'unused' import + import txredisapi # noqa: F401 + + HAVE_TXREDISAPI = True +except ImportError: + HAVE_TXREDISAPI = False from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.unittest import HomeserverTestCase @@ -38,7 +44,12 @@ class RedisTestCase(HomeserverTestCase): + if not HAVE_TXREDISAPI: + skip = "Redis extras not installed" + def test_subscribed_to_enough_redis_channels(self) -> None: + from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory + # The default main process is subscribed to USER_IP and all RDATA channels. self.assertCountEqual( RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( @@ -52,7 +63,12 @@ def test_subscribed_to_enough_redis_channels(self) -> None: class RedisWorkerTestCase(BaseMultiWorkerStreamTestCase): + if not HAVE_TXREDISAPI: + skip = "Redis extras not installed" + def test_background_worker_subscribed_to_user_ip(self) -> None: + from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory + # The default main process is subscribed to USER_IP and all RDATA channels. worker1 = self.make_worker_hs( "synapse.app.generic_worker", @@ -69,6 +85,8 @@ def test_background_worker_subscribed_to_user_ip(self) -> None: ) def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: + from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory + # The default main process is subscribed to USER_IP and all RDATA channels. worker2 = self.make_worker_hs( "synapse.app.generic_worker", From 4bc2ab19d1ab249b98e5c3b0943068ce9c795d48 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 11 May 2022 15:29:25 +0100 Subject: [PATCH 05/12] Add channel support to Fake Redis --- tests/replication/_base.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index a7602b4c96ae..bd1684af4c6b 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set, Tuple from twisted.internet.address import IPv4Address from twisted.internet.protocol import Protocol @@ -475,22 +476,25 @@ class FakeRedisPubSubServer: """A fake Redis server for pub/sub.""" def __init__(self): - self._subscribers = set() + self._subscribers_by_channel: Dict[ + bytes, Set["FakeRedisPubSubProtocol"] + ] = defaultdict(set) - def add_subscriber(self, conn): + def add_subscriber(self, conn, channel: bytes): """A connection has called SUBSCRIBE""" - self._subscribers.add(conn) + self._subscribers_by_channel[channel].add(conn) def remove_subscriber(self, conn): - """A connection has called UNSUBSCRIBE""" - self._subscribers.discard(conn) + """A connection has lost connection""" + for subscribers in self._subscribers_by_channel.values(): + subscribers.discard(conn) - def publish(self, conn, channel, msg) -> int: + def publish(self, conn, channel: bytes, msg) -> int: """A connection want to publish a message to subscribers.""" - for sub in self._subscribers: + for sub in self._subscribers_by_channel[channel]: sub.send(["message", channel, msg]) - return len(self._subscribers) + return len(self._subscribers_by_channel) def buildProtocol(self, addr): return FakeRedisPubSubProtocol(self) @@ -531,9 +535,10 @@ def handle_command(self, command, *args): num_subscribers = self._server.publish(self, channel, message) self.send(num_subscribers) elif command == b"SUBSCRIBE": - (channel,) = args - self._server.add_subscriber(self) - self.send(["subscribe", channel, 1]) + for idx, channel in enumerate(args): + num_channels = idx + 1 + self._server.add_subscriber(self, channel) + self.send(["subscribe", channel, num_channels]) # Since we use SET/GET to cache things we can safely no-op them. elif command == b"SET": From cfa55edd7ac530de299689d5fc947315240f7673 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 13 May 2022 14:52:23 +0100 Subject: [PATCH 06/12] Restructure RedisSubscriber and the factory --- synapse/replication/tcp/handler.py | 6 ++++- synapse/replication/tcp/redis.py | 42 +++++++++--------------------- 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 9aba1cd45111..2e03f2c89647 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -321,7 +321,11 @@ def start_replication(self, hs: "HomeServer") -> None: # Now create the factory/connection for the subscription stream. self._factory = RedisDirectTcpReplicationClientFactory( - hs, outbound_redis_connection + hs, + outbound_redis_connection, + channel_names=RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( + hs.config + ), ) hs.get_reactor().connectTCP( hs.config.redis.redis_host, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 7f598b31cdee..c250b40217d8 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -94,7 +94,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): synapse_handler: "ReplicationCommandHandler" synapse_stream_prefix: str - synapse_subscribed_channels: List[str] + synapse_channel_names: List[str] synapse_outbound_redis_connection: txredisapi.ConnectionHandler def __init__(self, *args: Any, **kwargs: Any): @@ -121,7 +121,7 @@ async def _send_subscribe(self) -> None: # POSITION response sent back by the other end. fully_qualified_stream_names = [ f"{self.synapse_stream_prefix}/{stream_suffix}" - for stream_suffix in self.synapse_subscribed_channels + for stream_suffix in self.synapse_channel_names ] + [self.synapse_stream_prefix] logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names) await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names)) @@ -314,13 +314,20 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): outbound_redis_connection: A connection to redis that will be used to send outbound commands (this is separate to the redis connection used to subscribe). + channel_names: A list of channel names to append to the base channel name + to additionally subscribe to. + e.g. if ['ABC', 'DEF'] is specified then we'll listen to: + example.com; example.com/ABC; and example.com/DEF. """ maxDelay = 5 protocol = RedisSubscriber def __init__( - self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler + self, + hs: "HomeServer", + outbound_redis_connection: txredisapi.ConnectionHandler, + channel_names: List[str], ): super().__init__( @@ -334,11 +341,7 @@ def __init__( self.synapse_handler = hs.get_replication_command_handler() self.synapse_stream_prefix = hs.hostname - self.synapse_subscribed_channels = ( - RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( - hs.config - ) - ) + self.synapse_channel_names = channel_names self.synapse_outbound_redis_connection = outbound_redis_connection @@ -351,27 +354,6 @@ def channels_to_subscribe_to_for_config(config: HomeServerConfig) -> List[str]: # User IP addresses subscribe_to.append("USER_IP") - # Subscribe to the following RDATA channels. - # We may be able to reduce this in the future. - subscribe_to += [ - "RDATA/account_data", - "RDATA/backfill", - "RDATA/caches", - "RDATA/device_lists", - "RDATA/events", - "RDATA/federation", - "RDATA/groups", - "RDATA/presence", - "RDATA/presence_federation", - "RDATA/push_rules", - "RDATA/pushers", - "RDATA/receipts", - "RDATA/tag_account_data", - "RDATA/to_device", - "RDATA/typing", - "RDATA/user_signature", - ] - return subscribe_to def buildProtocol(self, addr: IAddress) -> RedisSubscriber: @@ -385,7 +367,7 @@ def buildProtocol(self, addr: IAddress) -> RedisSubscriber: p.synapse_handler = self.synapse_handler p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection p.synapse_stream_prefix = self.synapse_stream_prefix - p.synapse_subscribed_channels = self.synapse_subscribed_channels + p.synapse_channel_names = self.synapse_channel_names return p From 33702fadf7dcbfea682366af8a905fc72973124f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 13 May 2022 15:09:15 +0100 Subject: [PATCH 07/12] Restructure the way we subscribe to channels --- synapse/replication/tcp/handler.py | 34 ++++++++++++++++++++++++++---- synapse/replication/tcp/redis.py | 12 ----------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 2e03f2c89647..e1cbfa50ebd2 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -1,5 +1,5 @@ # Copyright 2017 Vector Creations Ltd -# Copyright 2020 The Matrix.org Foundation C.I.C. +# Copyright 2020, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -101,6 +101,9 @@ def __init__(self, hs: "HomeServer"): self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + # Additional Redis channel suffixes to subscribe to. + self._channels_to_subscribe_to: List[str] = [] + self._is_presence_writer = ( hs.get_instance_name() in hs.config.worker.writers.presence ) @@ -243,6 +246,31 @@ def __init__(self, hs: "HomeServer"): # If we're NOT using Redis, this must be handled by the master self._should_insert_client_ips = hs.get_instance_name() == "master" + if self._is_master or self._should_insert_client_ips: + self.subscribe_to_channel("USER_IP") + + def subscribe_to_channel(self, channel_name: str) -> None: + """ + Indicates that we wish to subscribe to a Redis channel by name. + + (The name will later be prefixed with the server name; i.e. subscribing + to the 'ABC' channel actually subscribes to 'example.com/ABC' Redis-side.) + + Raises: + - If replication has already started, then it's too late to subscribe + to new channels. + """ + + if self._factory is not None: + # We don't allow subscribing after the fact to avoid the chance + # of missing an important message because we didn't subscribe in time. + raise RuntimeError( + "Cannot subscribe to more channels after replication started." + ) + + if channel_name not in self._channels_to_subscribe_to: + self._channels_to_subscribe_to.append(channel_name) + def _add_command_to_stream_queue( self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: @@ -323,9 +351,7 @@ def start_replication(self, hs: "HomeServer") -> None: self._factory = RedisDirectTcpReplicationClientFactory( hs, outbound_redis_connection, - channel_names=RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( - hs.config - ), + channel_names=self._channels_to_subscribe_to, ) hs.get_reactor().connectTCP( hs.config.redis.redis_host, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index c250b40217d8..73294654eff1 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -24,7 +24,6 @@ from twisted.internet.interfaces import IAddress, IConnector from twisted.python.failure import Failure -from synapse.config.homeserver import HomeServerConfig from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import ( BackgroundProcessLoggingContext, @@ -345,17 +344,6 @@ def __init__( self.synapse_outbound_redis_connection = outbound_redis_connection - @staticmethod - def channels_to_subscribe_to_for_config(config: HomeServerConfig) -> List[str]: - subscribe_to = [] - - if config.worker.run_background_tasks or config.worker.worker_app is None: - # If we're the main process or the background worker, we want to process - # User IP addresses - subscribe_to.append("USER_IP") - - return subscribe_to - def buildProtocol(self, addr: IAddress) -> RedisSubscriber: p = super().buildProtocol(addr) p = cast(RedisSubscriber, p) From 15c08e5fec9b3fe2c0982d3e23a6ac6827fce1db Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 13 May 2022 15:21:17 +0100 Subject: [PATCH 08/12] Migrate tests to new way of doing things --- tests/replication/tcp/test_handler.py | 58 +++++++++++++++++ tests/replication/tcp/test_redis.py | 90 --------------------------- 2 files changed, 58 insertions(+), 90 deletions(-) create mode 100644 tests/replication/tcp/test_handler.py diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py new file mode 100644 index 000000000000..760b8c8a8a51 --- /dev/null +++ b/tests/replication/tcp/test_handler.py @@ -0,0 +1,58 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.unittest import HomeserverTestCase, override_config + + +class ChannelsMainTestCase(HomeserverTestCase): + @override_config({"redis": {"enabled": True}}) + def test_subscribed_to_enough_redis_channels(self) -> None: + # The default main process is subscribed to USER_IP and all RDATA channels. + self.assertCountEqual( + self.hs.get_replication_command_handler()._channels_to_subscribe_to, + ["USER_IP"], + ) + + +class ChannelsWorkerTestCase(BaseMultiWorkerStreamTestCase): + def test_background_worker_subscribed_to_user_ip(self) -> None: + # The default main process is subscribed to USER_IP and all RDATA channels. + worker1 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker1", + "run_background_tasks_on": "worker1", + "redis": {"enabled": True}, + }, + ) + self.assertIn( + "USER_IP", + worker1.get_replication_command_handler()._channels_to_subscribe_to, + ) + + def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: + # The default main process is subscribed to USER_IP and all RDATA channels. + worker2 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker2", + "run_background_tasks_on": "worker1", + "redis": {"enabled": True}, + }, + ) + self.assertNotIn( + "USER_IP", + worker2.get_replication_command_handler()._channels_to_subscribe_to, + ) diff --git a/tests/replication/tcp/test_redis.py b/tests/replication/tcp/test_redis.py index 7715d0bd88eb..3a5f22c02235 100644 --- a/tests/replication/tcp/test_redis.py +++ b/tests/replication/tcp/test_redis.py @@ -11,93 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -try: - # We only import it to see if it's installed, so ignore the 'unused' import - import txredisapi # noqa: F401 - - HAVE_TXREDISAPI = True -except ImportError: - HAVE_TXREDISAPI = False - -from tests.replication._base import BaseMultiWorkerStreamTestCase -from tests.unittest import HomeserverTestCase - -ALL_RDATA_CHANNELS = [ - "RDATA/account_data", - "RDATA/backfill", - "RDATA/caches", - "RDATA/device_lists", - "RDATA/events", - "RDATA/federation", - "RDATA/groups", - "RDATA/presence", - "RDATA/presence_federation", - "RDATA/push_rules", - "RDATA/pushers", - "RDATA/receipts", - "RDATA/tag_account_data", - "RDATA/to_device", - "RDATA/typing", - "RDATA/user_signature", -] - - -class RedisTestCase(HomeserverTestCase): - if not HAVE_TXREDISAPI: - skip = "Redis extras not installed" - - def test_subscribed_to_enough_redis_channels(self) -> None: - from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory - - # The default main process is subscribed to USER_IP and all RDATA channels. - self.assertCountEqual( - RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( - self.hs.config - ), - [ - "USER_IP", - ] - + ALL_RDATA_CHANNELS, - ) - - -class RedisWorkerTestCase(BaseMultiWorkerStreamTestCase): - if not HAVE_TXREDISAPI: - skip = "Redis extras not installed" - - def test_background_worker_subscribed_to_user_ip(self) -> None: - from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory - - # The default main process is subscribed to USER_IP and all RDATA channels. - worker1 = self.make_worker_hs( - "synapse.app.generic_worker", - extra_config={ - "worker_name": "worker1", - "run_background_tasks_on": "worker1", - }, - ) - self.assertIn( - "USER_IP", - RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( - worker1.config - ), - ) - - def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: - from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory - - # The default main process is subscribed to USER_IP and all RDATA channels. - worker2 = self.make_worker_hs( - "synapse.app.generic_worker", - extra_config={ - "worker_name": "worker2", - "run_background_tasks_on": "worker1", - }, - ) - self.assertNotIn( - "USER_IP", - RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config( - worker2.config - ), - ) From 55c9efe2c3a48613b2e7fa3c3258841f78c5c342 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 17 May 2022 16:36:11 +0100 Subject: [PATCH 09/12] Skip tests if no Redis --- tests/replication/tcp/test_handler.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 760b8c8a8a51..d4bcb3aca819 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -15,8 +15,16 @@ from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.unittest import HomeserverTestCase, override_config +try: + import hiredis +except ImportError: + hiredis = None # type: ignore + class ChannelsMainTestCase(HomeserverTestCase): + if not hiredis: + skip = "Requires hiredis" + @override_config({"redis": {"enabled": True}}) def test_subscribed_to_enough_redis_channels(self) -> None: # The default main process is subscribed to USER_IP and all RDATA channels. @@ -27,6 +35,9 @@ def test_subscribed_to_enough_redis_channels(self) -> None: class ChannelsWorkerTestCase(BaseMultiWorkerStreamTestCase): + if not hiredis: + skip = "Requires hiredis" + def test_background_worker_subscribed_to_user_ip(self) -> None: # The default main process is subscribed to USER_IP and all RDATA channels. worker1 = self.make_worker_hs( From 3b0a1d001883d6fbb997b54de558becfed22e951 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 18 May 2022 12:45:10 +0100 Subject: [PATCH 10/12] Remove obsolete file --- tests/replication/tcp/test_redis.py | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 tests/replication/tcp/test_redis.py diff --git a/tests/replication/tcp/test_redis.py b/tests/replication/tcp/test_redis.py deleted file mode 100644 index 3a5f22c02235..000000000000 --- a/tests/replication/tcp/test_redis.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2022 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. From 8f362fb489e2c33a3ca5a6cfb4cebd619cf48467 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 19 May 2022 12:30:21 +0100 Subject: [PATCH 11/12] Fix comments --- tests/replication/tcp/test_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index d4bcb3aca819..530bb4f9c019 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -27,7 +27,7 @@ class ChannelsMainTestCase(HomeserverTestCase): @override_config({"redis": {"enabled": True}}) def test_subscribed_to_enough_redis_channels(self) -> None: - # The default main process is subscribed to USER_IP and all RDATA channels. + # The default main process is subscribed to the USER_IP channel. self.assertCountEqual( self.hs.get_replication_command_handler()._channels_to_subscribe_to, ["USER_IP"], @@ -39,7 +39,7 @@ class ChannelsWorkerTestCase(BaseMultiWorkerStreamTestCase): skip = "Requires hiredis" def test_background_worker_subscribed_to_user_ip(self) -> None: - # The default main process is subscribed to USER_IP and all RDATA channels. + # The default main process is subscribed to the USER_IP channel. worker1 = self.make_worker_hs( "synapse.app.generic_worker", extra_config={ @@ -54,7 +54,7 @@ def test_background_worker_subscribed_to_user_ip(self) -> None: ) def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: - # The default main process is subscribed to USER_IP and all RDATA channels. + # The default main process is subscribed to the USER_IP channel. worker2 = self.make_worker_hs( "synapse.app.generic_worker", extra_config={ From 79983c9ed85dfc2cf0aeec6f6b20bc516dd4cfc8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 19 May 2022 15:50:26 +0100 Subject: [PATCH 12/12] Fix up the Redis tests --- tests/replication/_base.py | 25 ++++++++++++++++++ tests/replication/tcp/test_handler.py | 38 +++++++++++++++------------ 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index bd1684af4c6b..970d5e533b35 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -33,6 +33,7 @@ from tests import unittest from tests.server import FakeTransport +from tests.utils import USE_POSTGRES_FOR_TESTS try: import hiredis @@ -581,3 +582,27 @@ def encode(self, obj): def connectionLost(self, reason): self._server.remove_subscriber(self) + + +class RedisMultiWorkerStreamTestCase(BaseMultiWorkerStreamTestCase): + """ + A test case that enables Redis, providing a fake Redis server. + """ + + if not hiredis: + skip = "Requires hiredis" + + if not USE_POSTGRES_FOR_TESTS: + # Redis replication only takes place on Postgres + skip = "Requires Postgres" + + def default_config(self) -> Dict[str, Any]: + """ + Overrides the default config to enable Redis. + Even if the test only uses make_worker_hs, the main process needs Redis + enabled otherwise it won't create a Fake Redis server to listen on the + Redis port and accept fake TCP connections. + """ + base = super().default_config() + base["redis"] = {"enabled": True} + return base diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 530bb4f9c019..e6a19eafd578 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -12,20 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from tests.replication._base import BaseMultiWorkerStreamTestCase -from tests.unittest import HomeserverTestCase, override_config +from tests.replication._base import RedisMultiWorkerStreamTestCase -try: - import hiredis -except ImportError: - hiredis = None # type: ignore - -class ChannelsMainTestCase(HomeserverTestCase): - if not hiredis: - skip = "Requires hiredis" - - @override_config({"redis": {"enabled": True}}) +class ChannelsTestCase(RedisMultiWorkerStreamTestCase): def test_subscribed_to_enough_redis_channels(self) -> None: # The default main process is subscribed to the USER_IP channel. self.assertCountEqual( @@ -33,11 +23,6 @@ def test_subscribed_to_enough_redis_channels(self) -> None: ["USER_IP"], ) - -class ChannelsWorkerTestCase(BaseMultiWorkerStreamTestCase): - if not hiredis: - skip = "Requires hiredis" - def test_background_worker_subscribed_to_user_ip(self) -> None: # The default main process is subscribed to the USER_IP channel. worker1 = self.make_worker_hs( @@ -53,6 +38,15 @@ def test_background_worker_subscribed_to_user_ip(self) -> None: worker1.get_replication_command_handler()._channels_to_subscribe_to, ) + # Advance so the Redis subscription gets processed + self.pump(0.1) + + # The counts are 2 because both the main process and the worker are subscribed. + self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2) + self.assertEqual( + len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 2 + ) + def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: # The default main process is subscribed to the USER_IP channel. worker2 = self.make_worker_hs( @@ -67,3 +61,13 @@ def test_non_background_worker_not_subscribed_to_user_ip(self) -> None: "USER_IP", worker2.get_replication_command_handler()._channels_to_subscribe_to, ) + + # Advance so the Redis subscription gets processed + self.pump(0.1) + + # The count is 2 because both the main process and the worker are subscribed. + self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2) + # For USER_IP, the count is 1 because only the main process is subscribed. + self.assertEqual( + len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1 + )