Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage (bonus): Catch-up on Synapse Startup #8322

Merged
merged 20 commits into from Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8230.bugfix
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8230.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8247.bugfix
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8247.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8258.bugfix
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8258.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8322.bugfix
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
49 changes: 49 additions & 0 deletions synapse/federation/sender/__init__.py
Expand Up @@ -55,6 +55,15 @@
"Total number of PDUs queued for sending across all destinations",
)

# Time (in s) after Synapse's startup that we will begin to wake up destinations
# that have catch-up outstanding.
CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC = 15
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# Time (in s) waited in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds after Synapse's startup until we have woken
# every destination has outstanding catch-up.
CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC = 5
reivilibre marked this conversation as resolved.
Show resolved Hide resolved


class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
Expand Down Expand Up @@ -125,6 +134,14 @@ def __init__(self, hs: "synapse.server.HomeServer"):
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)

# wake up destinations that have outstanding PDUs to be caught up
self._catchup_after_startup_timer = self.clock.call_later(
CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC,
run_as_background_process,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination

Expand Down Expand Up @@ -560,3 +577,35 @@ async def get_replication_rows(
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return [], 0, False

async def _wake_destinations_needing_catchup(self):
"""
Wakes up destinations that need catch-up and are not currently being
backed off from.
Does so in a slow way (one every 5 seconds) to reduce load spikes.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""

last_processed = None # type: Optional[str]

while True:
destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
last_processed
)

if not destinations_to_wake:
# finished waking all destinations!
break
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

for destination in destinations_to_wake:
if self._federation_shard_config.should_handle(
self._instance_name, destination
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be duplicated from wake_destination. What was the goal here? Is it just to avoid the logging?

If there is multiple federation senders than they'll each run this code, but only handle the destinations they're interested in? It might be clearer to pre-filter the list, like:

domains = [
d
for d in domains_set
if d != self.server_name
and self._federation_shard_config.should_handle(self._instance_name, d)
]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just to avoid the logging (because it'd be very confusing to have a federation sender claim it was waking a destination that it didn't) [and we also avoid the sleep].

Pre-filtering: no strong opinions there, fine by me — it's an extra pass and list allocation but only a small list so should be OK

last_processed = destination
logger.info(
"Destination %s has outstanding catch-up, waking up.",
destination,
)
self.wake_destination(destination)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that destination no longer needs to be woken up at this point or will wake_destination just bail since the loop is running already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it already started transmitting and is doing that, it will see the flag and not start the transmitter loop.

If it already caught up quickly, then it will still wake it up but the logic there will notice that there is nothing to send so it won't do anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was what I gathered too, just double checking!

await self.clock.sleep(
CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC
)
65 changes: 63 additions & 2 deletions synapse/storage/databases/main/transactions.py
Expand Up @@ -218,6 +218,7 @@ def _set_destination_retry_timings(
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
clokep marked this conversation as resolved.
Show resolved Hide resolved
OR destinations.retry_interval < EXCLUDED.retry_interval
"""

Expand Down Expand Up @@ -249,7 +250,11 @@ def _set_destination_retry_timings(
"retry_interval": retry_interval,
},
)
elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
elif (
retry_interval == 0
or prev_row["retry_interval"] is None
clokep marked this conversation as resolved.
Show resolved Hide resolved
or prev_row["retry_interval"] < retry_interval
):
self.db_pool.simple_update_one_txn(
txn,
"destinations",
Expand Down Expand Up @@ -397,7 +402,7 @@ async def get_catch_up_room_event_ids(

@staticmethod
def _get_catch_up_room_event_ids_txn(
txn, destination: str, last_successful_stream_ordering: int,
txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
Expand All @@ -412,3 +417,59 @@ def _get_catch_up_room_event_ids_txn(
)
event_ids = [row[0] for row in txn]
return event_ids

async def get_catch_up_outstanding_destinations(
self, after_destination: Optional[str]
) -> List[str]:
"""
Gets at most 25 destinations which have outstanding PDUs to be caught up,
and are not being backed off from
Args:
after_destination:
If provided, all destinations must be lexicographically greater
than this one.

Returns:
list of up to 25 destinations with outstanding catch-up.
These are the lexicographically first destinations which are
lexicographically greater than after_destination (if provided).
"""
time = self.hs.get_clock().time_msec()

return await self.db_pool.runInteraction(
"get_catch_up_outstanding_destinations",
self._get_catch_up_outstanding_destinations_txn,
time,
after_destination,
)

@staticmethod
def _get_catch_up_outstanding_destinations_txn(
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
) -> List[str]:
q = """
SELECT destination FROM destinations
WHERE destination IN (
SELECT destination FROM destination_rooms
WHERE stream_ordering > last_successful_stream_ordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this last_successful_stream_ordering coming from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it clearer now?:

WHERE destination_rooms.stream_ordering >
  destinations.last_successful_stream_ordering

)
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
Comment on lines +459 to +460
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attempts to find destinations which have never been backed off from or which are beyond their retry interval?

Does this mean it will just poke all servers when it wakes up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this part means have never been backed off from or have an expired backoff.

                WHERE destination IN (
                    SELECT destination FROM destination_rooms
                        WHERE destination_rooms.stream_ordering >
                            destinations.last_successful_stream_ordering
                )

restricts this to only destinations with catch-up needed.

)
ORDER BY destination
LIMIT 25
"""
txn.execute(
q,
(
# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination or "",
now_time_ms,
),
)

destinations = [row[0] for row in txn]
return destinations
98 changes: 98 additions & 0 deletions tests/federation/test_federation_catch_up.py
@@ -1,3 +1,4 @@
from collections import defaultdict
from typing import List, Tuple

from mock import Mock
Expand Down Expand Up @@ -321,3 +322,100 @@ def test_catch_up_loop(self):
per_dest_queue._last_successful_stream_ordering,
event_5.internal_metadata.stream_ordering,
)

@override_config({"send_federation": True})
def test_catch_up_on_synapse_startup(self):
"""
Tests the behaviour of get_catch_up_outstanding_destinations and
_wake_destinations_needing_catchup.
"""

# list of sorted server names
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]

# ARRANGE:
# - a local user (u1)
# - a room which u1 is joined to (and remote users @user:serverXX are
# joined to)

# mark the remotes as online
self.is_online = True

self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_1 = self.helper.create_room_as("u1", tok=u1_token)
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

for server_name in server_names:
self.get_success(
event_injection.inject_member_event(
self.hs, room_1, "@user:%s" % server_name, "join"
)
)

# create an event
self.helper.send(room_1, "deary me!", tok=u1_token)

# ASSERT:
# - All servers are up to date so none should have outstanding catch-up
outstanding_when_successful = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
)
self.assertEqual(outstanding_when_successful, [])

# ACT:
# - Make the remote servers unreachable
self.is_online = False

# - Mark zzzerver as being backed-off from
now = self.clock.time_msec()
self.get_success(
self.hs.get_datastore().set_destination_retry_timings(
"zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
)
)

# - Send an event
self.helper.send(room_1, "can anyone hear me?", tok=u1_token)

# ASSERT (get_catch_up_outstanding_destinations):
# - all remotes are outstanding
# - they are returned in batches of 25, in order
outstanding_1 = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
)

self.assertEqual(len(outstanding_1), 25)
self.assertEqual(outstanding_1, server_names[0:25])

outstanding_2 = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(
server_names[24]
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
)
)
self.assertNotIn("zzzerver", outstanding_2)
self.assertEqual(len(outstanding_2), 17)
self.assertEqual(outstanding_2, server_names[25:-1])

# ACT: call _wake_destinations_needing_catchup

# patch wake_destination to just count the destinations instead
woken = defaultdict(lambda: 0)

def wake_destination_track(destination):
woken[destination] += 1

self.hs.get_federation_sender().wake_destination = wake_destination_track

# cancel the pre-existing timer for _wake_destinations_needing_catchup
self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary? Is it because _wake_destinations_needing_catchup doesn't protect itself from being called multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since the reactor gets pumped then calling it manually ourselves doesn't change the fact that it is called automatically after the timer.


self.get_success(
self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
)

# ASSERT (_wake_destinations_needing_catchup):
# - all remotes are woken up, save for zzzerver
self.assertNotIn("zzzerver", woken)
self.assertEqual(set(woken.keys()), set(server_names[:-1]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're just trying to compare the values are the same (in any order), use assertCountEqual:

Suggested change
self.assertEqual(set(woken.keys()), set(server_names[:-1]))
self.assertCountEqual(woken.keys(), server_names[:-1])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to see there is a built-in function for this, but I'm not sure I like it — the name is completely misleading?

(sounds like it is doing len(woken.keys()) == len(server_names[:-1])…)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the name isn't great, but using the standardized form is easier instead of parsing the logic IMO.

# check that all destinations were woken exactly once
self.assertEqual([value for value in woken.values() if value != 1], [])