Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce replication traffic due to reflected cache stream POSITION #16557

Merged
merged 3 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16557.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.
19 changes: 18 additions & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
from synapse.replication.tcp.streams._base import StreamRow, Token
from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -204,6 +204,23 @@ async def _run_notifier_loop(self) -> None:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
#
# There are two reasons for this: 1) this instance
# requested a stream ID but didn't use it, or 2)
# this instance advanced its own stream position due
# to receiving notifications about other instances
# advancing their stream position.

# We skip sending `POSITION` for the `caches` stream
# for the second case as a) it generates a lot of
# traffic as every worker would echo each write, and
# b) nothing cares if a given worker's caches stream
# position lags.
if stream.NAME == CachesStream.NAME:
clokep marked this conversation as resolved.
Show resolved Hide resolved
# If there haven't been any writes since the
# `last_token` then we're in the second case.
if stream.minimal_local_current_token() <= last_token:
continue

# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
Expand Down
Loading