Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit '0d4f614fd' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit '0d4f614fd':
  Refactor `_get_e2e_device_keys_for_federation_query_txn` (#8225)
  Add experimental support for sharding event persister. (#8170)
  Add /user/{user_id}/shared_rooms/ api (#7785)
  Do not try to store invalid data in the stats table (#8226)
  Convert the main methods run by the reactor to async. (#8213)
  • Loading branch information
anoadragon453 committed Oct 20, 2020
2 parents 2bd022e + 0d4f614 commit 56722c6
Show file tree
Hide file tree
Showing 34 changed files with 531 additions and 121 deletions.
13 changes: 13 additions & 0 deletions UPGRADE.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
Upgrading to v1.20.0
====================

Shared rooms endpoint (MSC2666)
-------------------------------

This release contains a new unstable endpoint `/_matrix/client/unstable/uk.half-shot.msc2666/user/shared_rooms/.*`
for fetching rooms one user has in common with another. This feature requires the
`update_user_directory` config flag to be `True`. If you are you are using a `synapse.app.user_dir`
worker, requests to this endpoint must be handled by that worker.
See `docs/workers.md <docs/workers.md>`_ for more details.


Upgrading Synapse
=================

Expand Down
1 change: 1 addition & 0 deletions changelog.d/7785.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an endpoint to query your shared rooms with another user as an implementation of [MSC2666](https://github.com/matrix-org/matrix-doc/pull/2666).
1 change: 1 addition & 0 deletions changelog.d/8170.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for sharding event persister.
1 change: 1 addition & 0 deletions changelog.d/8213.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
1 change: 1 addition & 0 deletions changelog.d/8225.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor queries for device keys and cross-signatures.
1 change: 1 addition & 0 deletions changelog.d/8226.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a longstanding bug where stats updates could break when unexpected profile data was included in events.
1 change: 1 addition & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ Handles searches in the user directory. It can handle REST endpoints matching
the following regular expressions:

^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$
^/_matrix/client/unstable/uk.half-shot.msc2666/user/shared_rooms/.*$

When using this worker you must also set `update_user_directory: False` in the
shared configuration file to stop the main synapse running background
Expand Down
20 changes: 9 additions & 11 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ def start_listening(self, listeners):
pass


@defer.inlineCallbacks
def export_data_command(hs, args):
async def export_data_command(hs, args):
"""Export data for a user.
Args:
Expand All @@ -91,10 +90,8 @@ def export_data_command(hs, args):
user_id = args.user_id
directory = args.output_directory

res = yield defer.ensureDeferred(
hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
res = await hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
print(res)

Expand Down Expand Up @@ -232,14 +229,15 @@ def start(config_options):
# We also make sure that `_base.start` gets run before we actually run the
# command.

@defer.inlineCallbacks
def run(_reactor):
async def run():
with LoggingContext("command"):
yield _base.start(ss, [])
yield args.func(ss, args)
_base.start(ss, [])
await args.func(ss, args)

_base.start_worker_reactor(
"synapse-admin-cmd", config, run_command=lambda: task.react(run)
"synapse-admin-cmd",
config,
run_command=lambda: task.react(lambda _reactor: defer.ensureDeferred(run())),
)


Expand Down
18 changes: 8 additions & 10 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,26 +411,24 @@ async def do_acme() -> bool:

return provision

@defer.inlineCallbacks
def reprovision_acme():
async def reprovision_acme():
"""
Provision a certificate from ACME, if required, and reload the TLS
certificate if it's renewed.
"""
reprovisioned = yield defer.ensureDeferred(do_acme())
reprovisioned = await do_acme()
if reprovisioned:
_base.refresh_certificate(hs)

@defer.inlineCallbacks
def start():
async def start():
try:
# Run the ACME provisioning code, if it's enabled.
if hs.config.acme_enabled:
acme = hs.get_acme_handler()
# Start up the webservices which we will respond to ACME
# challenges with, and then provision.
yield defer.ensureDeferred(acme.start_listening())
yield defer.ensureDeferred(do_acme())
await acme.start_listening()
await do_acme()

# Check if it needs to be reprovisioned every day.
hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
Expand All @@ -439,8 +437,8 @@ def start():
if hs.config.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
yield defer.ensureDeferred(oidc.load_metadata())
yield defer.ensureDeferred(oidc.load_jwks())
await oidc.load_metadata()
await oidc.load_jwks()

_base.start(hs, config.listeners)

Expand All @@ -456,7 +454,7 @@ def start():
reactor.stop()
sys.exit(1)

reactor.callWhenRunning(start)
reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))

return hs

Expand Down
21 changes: 18 additions & 3 deletions synapse/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,11 +833,26 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key.
"""

# If multiple instances are not defined we always return true.
# If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True

return self.get_instance(key) == instance_name

def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.
Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
"""

if not self.instances:
return "master"

if len(self.instances) == 1:
return self.instances[0]

# We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that
# index.
Expand All @@ -847,7 +862,7 @@ def should_handle(self, instance_name: str, key: str) -> bool:
dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
return self.instances[remainder] == instance_name
return self.instances[remainder]


__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
1 change: 1 addition & 0 deletions synapse/config/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
def get_instance(self, key: str) -> str: ...
37 changes: 27 additions & 10 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import attr

from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def


def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
option expecting a list of strings.
"""

if isinstance(obj, str):
return [obj]
return obj


@attr.s
class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication.
Expand All @@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams.
Attributes:
events: The instance that writes to the event and backfill streams.
events: The instance that writes to the typing stream.
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
"""

events = attr.ib(default="master", type=str)
events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str)


Expand Down Expand Up @@ -105,15 +119,18 @@ def read_config(self, config, **kwargs):
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)

# Check that the configured writer for events and typing also appears in
# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
instance = getattr(self.writers, stream)
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
Expand Down
44 changes: 30 additions & 14 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,8 @@ async def backfill(self, dest, room_id, limit, extremities):
)
)

await self._handle_new_events(dest, ev_infos, backfilled=True)
if ev_infos:
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)

# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -1219,7 +1220,7 @@ async def get_event(event_id: str):
event_infos.append(_NewEventInfo(event, None, auth))

await self._handle_new_events(
destination, event_infos,
destination, room_id, event_infos,
)

def _sanity_check_event(self, ev):
Expand Down Expand Up @@ -1366,15 +1367,15 @@ async def do_invite_join(
)

max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
origin, room_id, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
self.config.worker.writers.events, "events", max_stream_id
self.config.worker.events_shard_config.get_instance(room_id),
"events",
max_stream_id,
)

# Check whether this room is the result of an upgrade of a room we already know
Expand Down Expand Up @@ -1635,7 +1636,7 @@ async def on_invite_request(
)

context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)])
await self.persist_events_and_notify(event.room_id, [(event, context)])

return event

Expand All @@ -1662,7 +1663,9 @@ async def do_remotely_reject_invite(
await self.federation_client.send_leave(host_list, event)

context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)])
stream_id = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)

return event, stream_id

Expand Down Expand Up @@ -1910,7 +1913,7 @@ async def _handle_new_event(
)

await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
Expand All @@ -1923,6 +1926,7 @@ async def _handle_new_event(
async def _handle_new_events(
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
Expand Down Expand Up @@ -1954,6 +1958,7 @@ async def prep(ev_info: _NewEventInfo):
)

await self.persist_events_and_notify(
room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
Expand All @@ -1964,6 +1969,7 @@ async def prep(ev_info: _NewEventInfo):
async def _persist_auth_tree(
self,
origin: str,
room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
Expand All @@ -1978,6 +1984,7 @@ async def _persist_auth_tree(
Args:
origin: Where the events came from
room_id,
auth_events
state
event
Expand Down Expand Up @@ -2052,17 +2059,20 @@ async def _persist_auth_tree(
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
]
],
)

new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)

return await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)

async def _prep_event(
self,
Expand Down Expand Up @@ -2913,21 +2923,27 @@ async def _check_key_revocation(self, public_key, url):

async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Args:
event_and_contexts:
room_id: The room ID of events being persisted.
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
"""
if self.config.worker.writers.events != self._instance_name:
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events(
instance_name=self.config.worker.writers.events,
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
Expand Down
Loading

0 comments on commit 56722c6

Please sign in to comment.