Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add experimental support for sharding event persister. Again. #8294

Merged
merged 2 commits into from Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8294.feature
@@ -0,0 +1 @@
Add experimental support for sharding event persister.
21 changes: 18 additions & 3 deletions synapse/config/_base.py
Expand Up @@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key.
"""

# If multiple instances are not defined we always return true.
# If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True

return self.get_instance(key) == instance_name

def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.

Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
"""

if not self.instances:
return "master"

if len(self.instances) == 1:
return self.instances[0]

# We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that
# index.
Expand All @@ -846,7 +861,7 @@ def should_handle(self, instance_name: str, key: str) -> bool:
dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
return self.instances[remainder] == instance_name
return self.instances[remainder]


__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
1 change: 1 addition & 0 deletions synapse/config/_base.pyi
Expand Up @@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
def get_instance(self, key: str) -> str: ...
37 changes: 27 additions & 10 deletions synapse/config/workers.py
Expand Up @@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import attr

from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def


def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
option expecting a list of strings.
"""

if isinstance(obj, str):
return [obj]
return obj


@attr.s
class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication.
Expand All @@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams.

Attributes:
events: The instance that writes to the event and backfill streams.
events: The instance that writes to the typing stream.
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
"""

events = attr.ib(default="master", type=str)
events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str)


Expand Down Expand Up @@ -105,15 +119,18 @@ def read_config(self, config, **kwargs):
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)

# Check that the configured writer for events and typing also appears in
# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
instance = getattr(self.writers, stream)
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
Expand Down
44 changes: 30 additions & 14 deletions synapse/handlers/federation.py
Expand Up @@ -896,7 +896,8 @@ async def backfill(self, dest, room_id, limit, extremities):
)
)

await self._handle_new_events(dest, ev_infos, backfilled=True)
if ev_infos:
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)

# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -1189,7 +1190,7 @@ async def get_event(event_id: str):
event_infos.append(_NewEventInfo(event, None, auth))

await self._handle_new_events(
destination, event_infos,
destination, room_id, event_infos,
)

def _sanity_check_event(self, ev):
Expand Down Expand Up @@ -1336,15 +1337,15 @@ async def do_invite_join(
)

max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
origin, room_id, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
self.config.worker.writers.events, "events", max_stream_id
self.config.worker.events_shard_config.get_instance(room_id),
"events",
max_stream_id,
)

# Check whether this room is the result of an upgrade of a room we already know
Expand Down Expand Up @@ -1593,7 +1594,7 @@ async def on_invite_request(
)

context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)])
await self.persist_events_and_notify(event.room_id, [(event, context)])

return event

Expand All @@ -1620,7 +1621,9 @@ async def do_remotely_reject_invite(
await self.federation_client.send_leave(host_list, event)

context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)])
stream_id = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)

return event, stream_id

Expand Down Expand Up @@ -1868,7 +1871,7 @@ async def _handle_new_event(
)

await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
Expand All @@ -1881,6 +1884,7 @@ async def _handle_new_event(
async def _handle_new_events(
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
Expand Down Expand Up @@ -1912,6 +1916,7 @@ async def prep(ev_info: _NewEventInfo):
)

await self.persist_events_and_notify(
room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
Expand All @@ -1922,6 +1927,7 @@ async def prep(ev_info: _NewEventInfo):
async def _persist_auth_tree(
self,
origin: str,
room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
Expand All @@ -1936,6 +1942,7 @@ async def _persist_auth_tree(

Args:
origin: Where the events came from
room_id,
auth_events
state
event
Expand Down Expand Up @@ -2010,17 +2017,20 @@ async def _persist_auth_tree(
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
]
],
)

new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)

return await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)

async def _prep_event(
self,
Expand Down Expand Up @@ -2871,21 +2881,27 @@ async def _check_key_revocation(self, public_key, url):

async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.

Args:
event_and_contexts:
room_id: The room ID of events being persisted.
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
"""
if self.config.worker.writers.events != self._instance_name:
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events(
instance_name=self.config.worker.writers.events,
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
Expand Down
14 changes: 8 additions & 6 deletions synapse/handlers/message.py
Expand Up @@ -376,9 +376,8 @@ def __init__(self, hs: "HomeServer"):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._is_event_writer = (
self.config.worker.writers.events == hs.get_instance_name()
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()

self.room_invite_state_types = self.hs.config.room_invite_state_types

Expand Down Expand Up @@ -902,9 +901,10 @@ async def handle_new_client_event(

try:
# If we're a worker we need to hit out to the master.
if not self._is_event_writer:
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
result = await self.send_event(
instance_name=self.config.worker.writers.events,
instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
Expand Down Expand Up @@ -972,8 +972,10 @@ async def persist_and_notify_client_event(

This should only be run on the instance in charge of persisting events.
"""
assert self._is_event_writer
assert self.storage.persistence is not None
assert self._events_shard_config.should_handle(
self._instance_name, event.room_id
)

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
14 changes: 9 additions & 5 deletions synapse/handlers/room.py
Expand Up @@ -804,7 +804,9 @@ async def create_room(

# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", last_stream_id
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
last_stream_id,
)

return result, last_stream_id
Expand Down Expand Up @@ -1259,10 +1261,10 @@ async def shutdown_room(
# We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propogated before
# we try and auto join below.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
"events",
stream_id,
)
else:
new_room_id = None
Expand Down Expand Up @@ -1292,7 +1294,9 @@ async def shutdown_room(

# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
stream_id,
)

await self.room_member_handler.forget(target_requester.user, room_id)
Expand Down
7 changes: 0 additions & 7 deletions synapse/handlers/room_member.py
Expand Up @@ -82,13 +82,6 @@ def __init__(self, hs: "HomeServer"):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles

self._event_stream_writer_instance = hs.config.worker.writers.events
self._is_on_event_persistence_instance = (
self._event_stream_writer_instance == hs.get_instance_name()
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence

self._join_rate_limiter_local = Ratelimiter(
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
Expand Down
12 changes: 9 additions & 3 deletions synapse/replication/http/federation.py
Expand Up @@ -65,10 +65,11 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler

@staticmethod
async def _serialize_payload(store, event_and_contexts, backfilled):
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
Expand All @@ -88,14 +89,19 @@ async def _serialize_payload(store, event_and_contexts, backfilled):
}
)

payload = {"events": event_payloads, "backfilled": backfilled}
payload = {
"events": event_payloads,
"backfilled": backfilled,
"room_id": room_id,
}

return payload

async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)

room_id = content["room_id"]
backfilled = content["backfilled"]

event_payloads = content["events"]
Expand All @@ -120,7 +126,7 @@ async def _handle_request(self, request):
logger.info("Got %d events from federation", len(event_and_contexts))

max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
room_id, event_and_contexts, backfilled
)

return 200, {"max_stream_id": max_stream_id}
Expand Down