Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Ensure fed-sender catchup does not block for full state #15248

Merged
merged 5 commits into from Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15248.bugfix
@@ -0,0 +1 @@
Fix a rare bug introduced in Synapse 1.73 where events could remain unsent to other homeservers after a faster-join to a room.
9 changes: 7 additions & 2 deletions synapse/federation/sender/per_destination_queue.py
Expand Up @@ -497,8 +497,8 @@ async def _catch_up_transmission_loop(self) -> None:
#
# Note: `catchup_pdus` will have exactly one PDU per room.
for pdu in catchup_pdus:
# The PDU from the DB will be the last PDU in the room from
# *this server* that wasn't sent to the remote. However, other
# The PDU from the DB will be the newest PDU in the room from
# *this server* that we tried---but were unable---to send to the remote.
# servers may have sent lots of events since then, and we want
# to try and tell the remote only about the *latest* events in
# the room. This is so that it doesn't get inundated by events
Expand All @@ -516,6 +516,11 @@ async def _catch_up_transmission_loop(self) -> None:
# If the event is in the extremities, then great! We can just
# use that without having to do further checks.
room_catchup_pdus = [pdu]
elif await self._store.is_partial_state_room(pdu.room_id):
# We can't be sure which events the destination should
# see using only partial state. Avoid doing so, and just retry
# sending our the newest PDU the remote is missing from us.
room_catchup_pdus = [pdu]
else:
# If not, fetch the extremities and figure out which we can
# send.
Expand Down
87 changes: 86 additions & 1 deletion tests/federation/test_federation_catch_up.py
@@ -1,4 +1,5 @@
from typing import Callable, List, Optional, Tuple
from typing import Callable, Collection, List, Optional, Tuple
from unittest import mock
from unittest.mock import Mock

from twisted.test.proto_helpers import MemoryReactor
Expand Down Expand Up @@ -500,3 +501,87 @@ def test_not_latest_event(self) -> None:
self.assertEqual(len(sent_pdus), 1)
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
self.assertFalse(per_dest_queue._catching_up)

def test_catch_up_is_not_blocked_by_remote_event_in_partial_state_room(
self,
) -> None:
"""Detects (part of?) https://github.com/matrix-org/synapse/issues/15220."""
# ARRANGE:
# - a local user (u1)
# - a room which contains u1 and two remote users, @u2:host2 and @u3:other
# - events in that room such that
# - history visibility is restricted
# - u1 sent message events e1 and e2
# - afterwards, u3 sent a remote event e3
# - catchup to begin for host2; last successfully sent event was e1
per_dest_queue, sent_pdus = self.make_fake_destination_queue()

self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)
self.helper.send_state(
room_id=room,
event_type="m.room.history_visibility",
body={"history_visibility": "joined"},
tok=u1_token,
)
self.get_success(
event_injection.inject_member_event(self.hs, room, "@u2:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room, "@u3:other", "join")
)

# create some events
event_id_1 = self.helper.send(room, "hello", tok=u1_token)["event_id"]
event_id_2 = self.helper.send(room, "world", tok=u1_token)["event_id"]
# pretend that u3 changes their displayname
event_id_3 = self.get_success(
event_injection.inject_member_event(self.hs, room, "@u3:other", "join")
).event_id

# destination_rooms should already be populated, but let us pretend that we already
# sent (successfully) up to and including event id 1
event_1 = self.get_success(self.hs.get_datastores().main.get_event(event_id_1))
assert event_1.internal_metadata.stream_ordering is not None
self.get_success(
self.hs.get_datastores().main.set_destination_last_successful_stream_ordering(
"host2", event_1.internal_metadata.stream_ordering
)
)

# also fetch event 2 so we can compare its stream ordering to the sender's
# last_successful_stream_ordering later
event_2 = self.get_success(self.hs.get_datastores().main.get_event(event_id_2))

# Mock event 3 as having partial state
self.get_success(
event_injection.mark_event_as_partial_state(self.hs, event_id_3, room)
)

# Fail the test if we block on full state for event 3.
async def mock_await_full_state(event_ids: Collection[str]) -> None:
if event_id_3 in event_ids:
raise AssertionError("Tried to await full state for event_id_3")

# ACT
with mock.patch.object(
self.hs.get_storage_controllers().state._partial_state_events_tracker,
"await_full_state",
mock_await_full_state,
):
self.get_success(per_dest_queue._catch_up_transmission_loop())

# ASSERT
# We should have:
# - not sent event 3: it's not ours, and the room is partial stated
# - fallen back to sending event 2: it's the most recent event in the room
# we tried to send to host2
# - completed catch-up
self.assertEqual(len(sent_pdus), 1)
self.assertEqual(sent_pdus[0].event_id, event_id_2)
self.assertFalse(per_dest_queue._catching_up)
self.assertEqual(
per_dest_queue._last_successful_stream_ordering,
event_2.internal_metadata.stream_ordering,
)
31 changes: 31 additions & 0 deletions tests/test_utils/event_injection.py
Expand Up @@ -102,3 +102,34 @@ async def create_event(
context = await unpersisted_context.persist(event)

return event, context


async def mark_event_as_partial_state(
hs: synapse.server.HomeServer,
event_id: str,
room_id: str,
) -> None:
"""
(Falsely) mark an event as having partial state.

Naughty, but occasionally useful when checking that partial state doesn't
block something from happening.

If the event already has partial state, this insert will fail (event_id is unique
in this table).
"""
store = hs.get_datastores().main
await store.db_pool.simple_upsert(
table="partial_state_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"room_id": room_id},
)

await store.db_pool.simple_insert(
table="partial_state_events",
values={
"room_id": room_id,
"event_id": event_id,
},
)