Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce amount of caches POSITIONS we send #16561

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16561.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.
10 changes: 10 additions & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ async def get_updates(self) -> StreamUpdateResult:
and `limited` is whether there are more updates to fetch.
"""
current_token = self.current_token(self.local_instance_name)

# If the minimum current token for the local instance is less than or
# equal to the last thing we published, we know that there are no
# updates.
if self.last_token >= self.minimal_local_current_token():
self.last_token = current_token
return [], current_token, False
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

updates, current_token, limited = await self.get_updates_since(
self.local_instance_name, self.last_token, current_token
)
Expand Down Expand Up @@ -489,6 +497,8 @@ def current_token(self, instance_name: str) -> Token:
return self.store.get_cache_stream_token_for_writer(instance_name)

def minimal_local_current_token(self) -> Token:
if self.store._cache_id_gen:
return self.store._cache_id_gen.get_minimal_local_current_token()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cache_id_gen is None on SQLite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the cache stream special?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at the time we didn't have a good way of doing it on SQLite, or something?

Copy link
Contributor

@DMRobertson DMRobertson Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I mean two questions:

  • Why are we handling the caches stream separately to the others? Presumably b/c it is especially chatty and we want to ratelimit the updates sent out?
  • What's the difference between self.store._cache_id_gen.get_minimal_local_current_token() and self.current_token(self.local_instance_name)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we handling the caches stream separately to the others? Presumably b/c it is especially chatty and we want to ratelimit the updates sent out?

This seems to be the case, by my reading of https://github.com/matrix-org/synapse/pull/16557/files#diff-844ba8f7be8c32eb75cc8092e1c48528797f6a8e1eeada942724ba6f73923a9cR214-R218

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I'd missed that this was inside the CachesStream class, I thought this was base logic for all streams.

I guess the point is that if there's no _cache_id_gen then there are no other workers to worry about and so the distinction between minimum and current tokens is moot?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any other streams whose minimal_local_current_token impl we should sanity check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the point is that if there's no _cache_id_gen then there are no other workers to worry about and so the distinction between minimum and current tokens is moot?

Yup

Are there any other streams whose minimal_local_current_token impl we should sanity check?

I don't think so. Most of the others can just rely directly on the ID gens (and there's a helper class to do that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean more that e.g. the function bodies here don't call something with the word "minimal" in:

def minimal_local_current_token(self) -> Token:
return self.current_token(self.local_instance_name)

def minimal_local_current_token(self) -> Token:
return self.current_token_function()

def minimal_local_current_token(self) -> Token:
return self._federation_queue.get_current_token(self.local_instance_name)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so yeah you're right that they're suboptimal implementations, but they are also valid implementations. We mostly care about the difference for caches as a) its high traffic, and b) we have an extra check for it that wants minimal_local_current_token to return the actual minimum

return self.current_token(self.local_instance_name)


Expand Down
Loading