Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Backfill remote event fetched by MSC3030 so we can paginate from it later #13205

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
93bbac7
Backfill remote event fetched by MSC3030 so we can paginate from it…
MadLittleMods Jul 6, 2022
f645864
Working logs
MadLittleMods Jul 6, 2022
6c4b618
Working Complement test by getting new raw event which isn't an outlier
MadLittleMods Jul 6, 2022
2e01535
Add changelog
MadLittleMods Jul 6, 2022
b335d44
Debug logs
MadLittleMods Jul 8, 2022
dab7ad9
More debugging
MadLittleMods Jul 9, 2022
f3072c9
Working with get_pdu
MadLittleMods Jul 11, 2022
3db79a1
Merge branch 'develop' into madlittlemods/msc3030-backfill-at-remote-…
MadLittleMods Jul 11, 2022
3f0da1b
Install tabulate back and working
MadLittleMods Jul 11, 2022
b9c936c
Clean up debug logs
MadLittleMods Jul 11, 2022
e76fbc0
More cleanup
MadLittleMods Jul 11, 2022
7d9c20a
Better order and fix lints
MadLittleMods Jul 11, 2022
48ca870
Revert fast complement changes
MadLittleMods Jul 11, 2022
6543bd5
Remove unused persist_events_store
MadLittleMods Jul 11, 2022
8b0dd8c
Better logging
MadLittleMods Jul 11, 2022
ba344b4
Add comment why _process_pulled_events
MadLittleMods Jul 11, 2022
caa0fce
Add docstring
MadLittleMods Jul 11, 2022
05dc230
Fix logic error
MadLittleMods Jul 11, 2022
7a316a5
Fix wrong scope in log
MadLittleMods Jul 15, 2022
682399f
Remove whitespace changes
MadLittleMods Jul 15, 2022
4df2f0c
Use shorthand
MadLittleMods Jul 15, 2022
ce447f0
Merge branch 'develop' into madlittlemods/msc3030-backfill-at-remote-…
MadLittleMods Jul 15, 2022
2beeccd
Remove duplicate information from error
MadLittleMods Jul 15, 2022
7f866f4
Log what the remote event is closer than
MadLittleMods Jul 15, 2022
337d8be
Explain why no persisting outliers in _process_pulled_event
MadLittleMods Jul 15, 2022
efaf434
get_pdu returns pristine EventBase
MadLittleMods Jul 15, 2022
344e63e
Fix lints
MadLittleMods Jul 15, 2022
cf5a324
Fix unused ex lint
MadLittleMods Jul 15, 2022
b3743c2
Fix lint
MadLittleMods Jul 15, 2022
b2be2bc
Use timestamp from the event we backfilled instead of trusting the re…
MadLittleMods Jul 16, 2022
7dbc4f7
Merge branch 'develop' into madlittlemods/msc3030-backfill-at-remote-…
MadLittleMods Jul 20, 2022
2d1a84b
Restore whitespace
MadLittleMods Jul 20, 2022
bf4e5d6
Add ideas the comment
MadLittleMods Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13205.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow pagination from remote event after discovering it from MSC3030 `/timestamp_to_event`.
43 changes: 37 additions & 6 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
RoomVersion,
RoomVersions,
)
from synapse.events import EventBase, builder
from synapse.events import EventBase, builder, make_event_from_dict
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
Expand Down Expand Up @@ -309,7 +309,7 @@ async def get_pdu_from_destination_raw(
)

logger.debug(
"retrieved event id %s from %s: %r",
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
Expand Down Expand Up @@ -360,9 +360,25 @@ async def get_pdu(

# TODO: Rate limit the number of times we try and get the same event.

ev = self._get_pdu_cache.get(event_id)
if ev:
return ev
event_from_cache = self._get_pdu_cache.get(event_id)
if event_from_cache:
assert not event_from_cache.internal_metadata.outlier, (
"Event from cache unexpectedly an `outlier` when it should be pristine and untouched without metadata set. "
"We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache."
)

# Make sure to return a copy because downstream callers will use
# this event reference directly and change our original, pristine,
# untouched PDU. For example when people mark the event as an
# `outlier` (`event.internal_metadata.outlier = true`), we don't
# want that to propagate back into the cache.
event_copy = make_event_from_dict(
event_from_cache.get_pdu_json(),
event_from_cache.room_version,
internal_metadata_dict=None,
)

return event_copy

pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

Expand Down Expand Up @@ -405,7 +421,22 @@ async def get_pdu(
if signed_pdu:
self._get_pdu_cache[event_id] = signed_pdu

return signed_pdu
# Make sure to return a copy because downstream callers will use this
# event reference directly and change our original, pristine, untouched
# PDU. For example when people mark the event as an `outlier`
# (`event.internal_metadata.outlier = true`), we don't want that to
# propagate back into the cache.
#
# We could get away with only making a new copy of the event when
# pulling from cache but it's probably better to have good hygiene and
# not dirty the cache in the first place as well.
event_copy = make_event_from_dict(
signed_pdu.get_pdu_json(),
signed_pdu.room_version,
internal_metadata_dict=None,
)

return event_copy

async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
Expand Down
69 changes: 64 additions & 5 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,22 @@ async def _process_pulled_event(
"""
logger.info("Processing pulled event %s", event)

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# these should not be outliers.
assert (
not event.internal_metadata.is_outlier()
), "pulled event unexpectedly flagged as outlier"
# This function should only be used to backfill events. If you're trying
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# to persist an outlier, use another method. If you happen to run into a
# situation where the event you're trying to backfill is marked as an
# `outlier`, then you should update that spot to return an `EventBase`
# without the `outlier` flag set.
#
# `EventBase` is used to represent both an event we have not yet
# persisted, and one that we have persisted and now keep in the cache.
# In an ideal world this method would only be called with the first type
# of event, but it turns out that's not actually the case and for
# example, you could get an event from cache that is marked as an
# `outlier` (fix up that spot though).
assert not event.internal_metadata.is_outlier(), (
"This is a safe-guard to make sure you're not trying to persist an outlier using this function (use something else). "
"If you're trying to backfill an event, this is the right method but you need pass in an event copy that doesn't have `event.internal_metada.outlier = true`."
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

event_id = event.event_id

Expand All @@ -779,7 +791,7 @@ async def _process_pulled_event(
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"Ignoring received event %s which we have already seen",
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
Expand Down Expand Up @@ -1312,6 +1324,53 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
marker_event,
)

async def backfill_event_id(
self, destination: str, room_id: str, event_id: str
) -> EventBase:
"""Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it.

Args:
destination: The homeserver to pull the given event_id from.
room_id: The room where the event is from.
event_id: The event ID to backfill.

Raises:
FederationError if we are unable to find the event from the destination
"""
logger.info(
"backfill_event_id: event_id=%s from destination=%s", event_id, destination
)

room_version = await self._store.get_room_version(room_id)

event_from_response = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)

if not event_from_response:
raise FederationError(
"ERROR",
404,
"Unable to find event_id=%s from destination=%s to backfill."
% (event_id, destination),
affected=event_id,
)

# Persist the event we just fetched, including pulling all of the state
# and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event.
await self._process_pulled_events(
destination,
[event_from_response],
# Prevent notifications going to clients
backfilled=True,
)

return event_from_response

async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
Expand Down
56 changes: 42 additions & 14 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.state_handler = hs.get_state_handler()
self.federation_client = hs.get_federation_client()
self.federation_event_handler = hs.get_federation_event_handler()
self._storage_controllers = hs.get_storage_controllers()

async def get_event_for_timestamp(
Expand Down Expand Up @@ -1479,38 +1480,65 @@ async def get_event_for_timestamp(
remote_response,
)

# TODO: Do we want to persist this as an extremity?
# TODO: I think ideally, we would try to backfill from
# this event and run this whole
# `get_event_for_timestamp` function again to make sure
# they didn't give us an event from their gappy history.
remote_event_id = remote_response.event_id
origin_server_ts = remote_response.origin_server_ts
remote_origin_server_ts = remote_response.origin_server_ts
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# Backfill this event so we can get a pagination token for
# it with `/context` and paginate `/messages` from this
# point.
#
# FIXME: After this backfill, we might want to run this
# whole `get_event_for_timestamp` function again to make
# sure they didn't give us an event from their gappy
# history. Also need a heuristic for when to stop recursing
# if they keep giving us gappy results.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
remote_event = (
await self.federation_event_handler.backfill_event_id(
domain, room_id, remote_event_id
)
)

# XXX: When we see that the remote server is not trustworthy,
# maybe we should not ask them first in the future.
if remote_origin_server_ts != remote_event.origin_server_ts:
logger.info(
"get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
domain,
remote_event_id,
remote_origin_server_ts,
remote_event.origin_server_ts,
)

# Only return the remote event if it's closer than the local event
if not local_event or (
abs(origin_server_ts - timestamp)
abs(remote_event.origin_server_ts - timestamp)
< abs(local_event.origin_server_ts - timestamp)
):
return remote_event_id, origin_server_ts
logger.info(
"get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
remote_event_id,
remote_event.origin_server_ts,
timestamp,
local_event.event_id if local_event else None,
local_event.origin_server_ts if local_event else None,
)
return remote_event_id, remote_origin_server_ts
except (HttpResponseException, InvalidResponseError) as ex:
# Let's not put a high priority on some other homeserver
# failing to respond or giving a random response
logger.debug(
"Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
"get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
domain,
type(ex).__name__,
ex,
ex.args,
)
except Exception as ex:
except Exception:
# But we do want to see some exceptions in our code
logger.warning(
"Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
"get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception",
domain,
type(ex).__name__,
ex,
ex.args,
exc_info=True,
)

# To appease mypy, we have to add both of these conditions to check for
Expand Down
23 changes: 20 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,9 +1346,24 @@ def _update_outliers_txn(
event_id: outlier for event_id, outlier in txn
}

logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s",
[ev.event_id for ev, _ in events_and_contexts],
have_persisted,
)

to_remove = set()
for event, context in events_and_contexts:
if event.event_id not in have_persisted:
outlier_persisted = have_persisted.get(event.event_id)
logger.debug(
"_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s",
event.event_id,
event.internal_metadata.is_outlier(),
outlier_persisted,
)

# Ignore events which we haven't persisted at all
if outlier_persisted is None:
continue

to_remove.add(event)
Expand All @@ -1358,7 +1373,6 @@ def _update_outliers_txn(
# was an outlier or not - what we have is at least as good.
continue

outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that event
Expand All @@ -1369,7 +1383,10 @@ def _update_outliers_txn(
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.

logger.info("Updating state for ex-outlier event %s", event.event_id)
logger.info(
"_update_outliers_txn: Updating state for ex-outlier event %s",
event.event_id,
)

# insert into event_to_state_groups.
try:
Expand Down