Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch cache invalidation over replication #4671

Merged
merged 5 commits into from Feb 19, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+24 −5
Diff settings

Always

Just for now

@@ -137,7 +137,6 @@ for each stream so that on reconneciton it can start streaming from the correct
place. Note: not all RDATA have valid tokens due to batching. See
``RdataCommand`` for more details.


Example
~~~~~~~

@@ -221,3 +220,23 @@ SYNC (S, C)

See ``synapse/replication/tcp/commands.py`` for a detailed description and the
format of each command.


Cache Invalidation Stream
~~~~~~~~~~~~~~~~~~~~~~~~~

The cache invalidation stream is used to inform workers when they need to
invalidate any of their caches in the data store. This is done by streaming all
cache invalidations done on master down to the workers, assuming that any caches
on the workers also exist on the master.

Each individual cache invalidation results in a row being sent down replication,
which includes the cache name (the name of the function) and they key to
invalidate. For example::

> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]

However, there are times when a number of caches need to be invalidated at the
same time with the same key. To reduce traffic we batch those invalidations into
a single poke by defining a special cache name that workers understand to mean

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

I don't mind not listing the exact caches, but please can you define the special cache name?

to expand to invalidate the correct caches.
@@ -1199,8 +1199,8 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
Args:
txn
room_id (str): Room were state changed
members_changed (set[str]): The user_ids of members that have changed
room_id (str): Room where state changed
members_changed (Iterable[str]): The user_ids of members that have changed
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

iterable

"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)

@@ -1215,7 +1215,7 @@ def _invalidate_state_caches(self, room_id, members_changed):
not stream invalidations down replication.
Args:
room_id (str): Room were state changed
room_id (str): Room where state changed
members_changed (set[str]): The user_ids of members that have changed
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

iterable

"""
for member in members_changed:
@@ -1237,7 +1237,7 @@ def _send_invalidation_to_replication(self, txn, cache_name, keys):
Args:
txn
cache_name (str)
keys (list[str])
keys (iterable[str])
"""

if isinstance(self.database_engine, PostgresEngine):
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.