Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Backfill remote event fetched by MSC3030 so we can paginate from it later #13205

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
93bbac7
Backfill remote event fetched by MSC3030 so we can paginate from it…
MadLittleMods Jul 6, 2022
f645864
Working logs
MadLittleMods Jul 6, 2022
6c4b618
Working Complement test by getting new raw event which isn't an outlier
MadLittleMods Jul 6, 2022
2e01535
Add changelog
MadLittleMods Jul 6, 2022
b335d44
Debug logs
MadLittleMods Jul 8, 2022
dab7ad9
More debugging
MadLittleMods Jul 9, 2022
f3072c9
Working with get_pdu
MadLittleMods Jul 11, 2022
3db79a1
Merge branch 'develop' into madlittlemods/msc3030-backfill-at-remote-…
MadLittleMods Jul 11, 2022
3f0da1b
Install tabulate back and working
MadLittleMods Jul 11, 2022
b9c936c
Clean up debug logs
MadLittleMods Jul 11, 2022
e76fbc0
More cleanup
MadLittleMods Jul 11, 2022
7d9c20a
Better order and fix lints
MadLittleMods Jul 11, 2022
48ca870
Revert fast complement changes
MadLittleMods Jul 11, 2022
6543bd5
Remove unused persist_events_store
MadLittleMods Jul 11, 2022
8b0dd8c
Better logging
MadLittleMods Jul 11, 2022
ba344b4
Add comment why _process_pulled_events
MadLittleMods Jul 11, 2022
caa0fce
Add docstring
MadLittleMods Jul 11, 2022
05dc230
Fix logic error
MadLittleMods Jul 11, 2022
7a316a5
Fix wrong scope in log
MadLittleMods Jul 15, 2022
682399f
Remove whitespace changes
MadLittleMods Jul 15, 2022
4df2f0c
Use shorthand
MadLittleMods Jul 15, 2022
ce447f0
Merge branch 'develop' into madlittlemods/msc3030-backfill-at-remote-…
MadLittleMods Jul 15, 2022
2beeccd
Remove duplicate information from error
MadLittleMods Jul 15, 2022
7f866f4
Log what the remote event is closer than
MadLittleMods Jul 15, 2022
337d8be
Explain why no persisting outliers in _process_pulled_event
MadLittleMods Jul 15, 2022
efaf434
get_pdu returns pristine EventBase
MadLittleMods Jul 15, 2022
344e63e
Fix lints
MadLittleMods Jul 15, 2022
cf5a324
Fix unused ex lint
MadLittleMods Jul 15, 2022
b3743c2
Fix lint
MadLittleMods Jul 15, 2022
b2be2bc
Use timestamp from the event we backfilled instead of trusting the re…
MadLittleMods Jul 16, 2022
7dbc4f7
Merge branch 'develop' into madlittlemods/msc3030-backfill-at-remote-…
MadLittleMods Jul 20, 2022
2d1a84b
Restore whitespace
MadLittleMods Jul 20, 2022
bf4e5d6
Add ideas the comment
MadLittleMods Jul 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13205.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow pagination from remote event after discovering it from MSC3030 `/timestamp_to_event`.
20 changes: 10 additions & 10 deletions docker/complement/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ ARG SYNAPSE_VERSION=latest
FROM matrixdotorg/synapse-workers:$SYNAPSE_VERSION

# Install postgresql
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -yqq postgresql-13
# RUN apt-get update && \
# DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -yqq postgresql-13

# Configure a user and create a database for Synapse
RUN pg_ctlcluster 13 main start && su postgres -c "echo \
\"ALTER USER postgres PASSWORD 'somesecret'; \
CREATE DATABASE synapse \
ENCODING 'UTF8' \
LC_COLLATE='C' \
LC_CTYPE='C' \
template=template0;\" | psql" && pg_ctlcluster 13 main stop
# RUN pg_ctlcluster 13 main start && su postgres -c "echo \
# \"ALTER USER postgres PASSWORD 'somesecret'; \
# CREATE DATABASE synapse \
# ENCODING 'UTF8' \
# LC_COLLATE='C' \
# LC_CTYPE='C' \
# template=template0;\" | psql" && pg_ctlcluster 13 main stop
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# Extend the shared homeserver config to disable rate-limiting,
# set Complement's static shared secret, enable registration, amongst other
Expand All @@ -42,4 +42,4 @@ ENTRYPOINT ["/start_for_complement.sh"]

# Update the healthcheck to have a shorter check interval
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
CMD /bin/sh /healthcheck.sh
CMD /bin/sh /healthcheck.sh
1,369 changes: 143 additions & 1,226 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ packaging = ">=16.1"
# which shipped in Python 3.8. This corresponds to version 1.4 of the backport.
importlib_metadata = { version = ">=1.4", python = "<3.8" }


# Optional Dependencies
# ---------------------
matrix-synapse-ldap3 = { version = ">=0.1", optional = true }
Expand All @@ -180,6 +179,7 @@ hiredis = { version = "*", optional = true }
Pympler = { version = "*", optional = true }
parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
tabulate = "^0.8.10"

[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
Expand Down
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ if [ -z "$skip_docker_build" ]; then
# Build the unified Complement image (from the worker Synapse image we just built).
echo_if_github "::group::Build Docker image: complement/Dockerfile"
docker build -t complement-synapse \
-f "docker/complement/Dockerfile" "docker/complement"
-f "docker/complement/Dockerfile" "docker/complement"
echo_if_github "::endgroup::"
fi

Expand Down
16 changes: 14 additions & 2 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ async def get_pdu_from_destination_raw(
destination, event_id, timeout=timeout
)

logger.debug(
"retrieved event id %s from %s: %r",
logger.info(
"get_pdu_raw retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
Expand All @@ -319,6 +319,12 @@ async def get_pdu_from_destination_raw(
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
]

logger.info(
"get_pdu_raw pdu_list[0]=%s outlier=%s",
pdu_list[0],
pdu_list[0].internal_metadata.outlier,
)

if pdu_list and pdu_list[0]:
pdu = pdu_list[0]

Expand Down Expand Up @@ -361,6 +367,12 @@ async def get_pdu(
# TODO: Rate limit the number of times we try and get the same event.

ev = self._get_pdu_cache.get(event_id)
logger.info(
"get_pdu from cache event_id=%s ev=%s outlier=%s",
event_id,
ev,
ev and ev.internal_metadata.outlier,
)
if ev:
return ev

Expand Down
137 changes: 123 additions & 14 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
check_state_independent_auth_rules,
validate_event_for_room_version,
)
from synapse.events import EventBase
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
Expand Down Expand Up @@ -100,6 +100,7 @@ class FederationEventHandler:

def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastores().main
self.persist_events_store = hs.get_datastores().persist_events
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state

Expand Down Expand Up @@ -765,20 +766,42 @@ async def _process_pulled_event(
"""
logger.info("Processing pulled event %s", event)

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# TODO: Why does this matter? The whole point of this function is to
# persist random PDU's from backfill. It shouldn't matter whether we saw
# them somewhere else first as an outlier, then during backfill. This
# function handles de-outliering anyway.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
#
# these should not be outliers.
assert (
not event.internal_metadata.is_outlier()
), "pulled event unexpectedly flagged as outlier"

event_id = event.event_id

logger.info(
"_process_pulled_event: event_id=%s asdf_get_debug_events_in_room_ordered_by_depth\n%s",
event.event_id,
await self.persist_events_store.asdf_get_debug_events_in_room_ordered_by_depth(
event.room_id
),
)

# TODO: Why is `get_event` returning the event as non-outlier when it's
# clearly an outlier still?
existing = await self._store.get_event(
event_id, allow_none=True, allow_rejected=True
)
logger.info(
"_process_pulled_event: event_id=%s existing_outlier=%s",
event_id,
existing and existing.internal_metadata.is_outlier(),
)
if existing:
# TODO: We could comment this out so it properly creates the
# `state_group`` in `_update_outliers_txn`
if not existing.internal_metadata.is_outlier():
logger.info(
"Ignoring received event %s which we have already seen",
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
Expand Down Expand Up @@ -938,21 +961,27 @@ async def _get_state_ids_after_missing_prev_event(
destination, room_id, event_id=event_id
)

logger.debug(
"state_ids returned %i state events, %i auth events",
logger.info(
"_get_state_ids_after_missing_prev_event: state_ids for event_id=%s returned %i state events, %i auth events",
event_id,
len(state_event_ids),
len(auth_event_ids),
)

# Start by checking events we already have in the DB
desired_events = set(state_event_ids)
desired_events.add(event_id)
logger.debug("Fetching %i events from cache/store", len(desired_events))
logger.info(
"_get_state_ids_after_missing_prev_event: event_id=%s Fetching %i events from cache/store",
event_id,
len(desired_events),
)
have_events = await self._store.have_seen_events(room_id, desired_events)

missing_desired_events = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
logger.info(
"_get_state_ids_after_missing_prev_event: event_id=%s We are missing %i events (got %i)",
event_id,
len(missing_desired_events),
len(have_events),
)
Expand All @@ -969,7 +998,11 @@ async def _get_state_ids_after_missing_prev_event(
missing_auth_events.difference_update(
await self._store.have_seen_events(room_id, missing_auth_events)
)
logger.debug("We are also missing %i auth events", len(missing_auth_events))
logger.info(
"_get_state_ids_after_missing_prev_event: event_id=%s We are also missing %i auth events",
event_id,
len(missing_auth_events),
)

missing_events = missing_desired_events | missing_auth_events

Expand All @@ -983,10 +1016,17 @@ async def _get_state_ids_after_missing_prev_event(
# TODO: might it be better to have an API which lets us do an aggregate event
# request
if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
logger.info(
"_get_state_ids_after_missing_prev_event: event_id=%s requesting complete state from remote",
event_id,
)
await self._get_state_and_persist(destination, room_id, event_id)
else:
logger.debug("Fetching %i events from remote", len(missing_events))
logger.info(
"_get_state_ids_after_missing_prev_event: event_id=%sFetching %i events from remote",
event_id,
len(missing_events),
)
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=missing_events
)
Expand All @@ -996,6 +1036,11 @@ async def _get_state_ids_after_missing_prev_event(
state_map = {}

event_metadata = await self._store.get_metadata_for_events(state_event_ids)
logger.info(
"_get_state_ids_after_missing_prev_event: get_metadata_for_events state_event_ids=%s event_metadata=%s",
state_event_ids,
event_metadata,
)
for state_event_id, metadata in event_metadata.items():
if metadata.room_id != room_id:
# This is a bogus situation, but since we may only discover it a long time
Expand All @@ -1005,8 +1050,9 @@ async def _get_state_ids_after_missing_prev_event(
# This can happen if a remote server claims that the state or
# auth_events at an event in room A are actually events in room B
logger.warning(
"Remote server %s claims event %s in room %s is an auth/state "
"_get_state_ids_after_missing_prev_event: event_id=%s Remote server %s claims event %s in room %s is an auth/state "
"event in room %s",
event_id,
destination,
state_event_id,
metadata.room_id,
Expand All @@ -1016,7 +1062,9 @@ async def _get_state_ids_after_missing_prev_event(

if metadata.state_key is None:
logger.warning(
"Remote server gave us non-state event in state: %s", state_event_id
"_get_state_ids_after_missing_prev_event: event_id=%s Remote server gave us non-state event in state: %s",
event_id,
state_event_id,
)
continue

Expand All @@ -1036,9 +1084,28 @@ async def _get_state_ids_after_missing_prev_event(
# XXX: this doesn't sound right? it means that we'll end up with incomplete
# state.
failed_to_fetch = desired_events - event_metadata.keys()

logger.info(
"_get_state_ids_after_missing_prev_event: asdf_get_debug_events_in_room_ordered_by_depth\n%s",
await self.persist_events_store.asdf_get_debug_events_in_room_ordered_by_depth(
room_id
),
)

# The event_id is part of the `desired_events` but isn't fetched as part
# of the `event_metadata` so we remove it here separately if we did find it.
have_event_id = await self._store.have_seen_event(room_id, event_id)
logger.info(
"_get_state_ids_after_missing_prev_event: event_id=%s have_event_id=%s",
event_id,
have_event_id,
)
if have_event_id:
failed_to_fetch = failed_to_fetch - {event_id}
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

if failed_to_fetch:
logger.warning(
"Failed to fetch missing state events for %s %s",
"_get_state_ids_after_missing_prev_event: Failed to fetch missing state events for event_id=%s failed_to_fetch=%s",
event_id,
failed_to_fetch,
)
Expand Down Expand Up @@ -1066,7 +1133,13 @@ async def _get_state_and_persist(
)

# we also need the event itself.
if not await self._store.have_seen_event(room_id, event_id):
have_seen_event = await self._store.have_seen_event(room_id, event_id)
logger.info(
"_get_state_and_persist event_id=%s have_seen_event=%s",
event_id,
have_seen_event,
)
if not have_seen_event:
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)
Expand Down Expand Up @@ -1315,6 +1388,42 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
marker_event,
)

async def backfill_event(
self, destination: str, room_id: str, event_id: str
) -> None:
room_version = await self._store.get_room_version(room_id)

logger.info("backfill_event event_id=%s", event_id)

eventAsdf = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)
# # FIXME: Too sketchy? Yes, because it updates references in place (messes with the cache)
# eventAsdf.internal_metadata.outlier = False
# eventAsdf.internal_metadata.stream_ordering = None
# event = eventAsdf

event = make_event_from_dict(eventAsdf.get_pdu_json(), eventAsdf.room_version)

# event = await self._federation_client.get_pdu_from_destination_raw(
# destination,
# event_id,
# room_version,
# )

logger.info(
"backfill_event event=%s outlier=%s", event, event.internal_metadata.outlier
)

await self._process_pulled_events(
destination,
[event],
# Prevent notifications going to clients
backfilled=True,
)

async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
Expand Down
Loading