Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch cache invalidation over replication #4671

Merged
merged 5 commits into from Feb 19, 2019
Merged
Changes from 2 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+70 −32
Diff settings

Always

Just for now

Copy path View file
@@ -0,0 +1 @@
Improve replication performance by reducing cache invalidation traffic.
@@ -17,7 +17,7 @@

import six

from synapse.storage._base import SQLBaseStore
from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore
from synapse.storage.engines import PostgresEngine

from ._slaved_id_tracker import SlavedIdTracker
@@ -54,12 +54,17 @@ def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
self._cache_id_gen.advance(token)
for row in rows:
try:
getattr(self, row.cache_func).invalidate(tuple(row.keys))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass
if row.cache_func == _CURRENT_STATE_CACHE_NAME:

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

I'd like to see some words about this (and in general what the 'caches' stream means) documented in docs/tcp_replication.tcp.

This comment has been minimized.

Copy link
@erikjohnston

erikjohnston Feb 19, 2019

Author Member

I've added some words. I haven't specified exactly what _CURRENT_STATE_CACHE_NAME means, as I feel like that's just going to quickly get out of date with the code and is probably unnecessary.

room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
try:
getattr(self, row.cache_func).invalidate(tuple(row.keys))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
Copy path View file
@@ -28,6 +28,7 @@
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
@@ -64,6 +65,10 @@
"event_search": "event_search_event_id_idx",
}

# This is a special cache name we use to batch multiple invalidations of caches
# based on the current state when notifying workers over replication.
_CURRENT_STATE_CACHE_NAME = "cs_cache_fake"


class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
@@ -1184,6 +1189,56 @@ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
be invalidated.
"""
txn.call_after(cache_func.invalidate, keys)
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)

def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
"""Special case invalidation of caches based on current state.
We special case this so that we can batch the cache invalidations into a
single replication poke.
Args:
txn
room_id (str): Room were state changed
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

where

members_changed (set[str]): The user_ids of members that have changed
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

it can be any iterable, rather than just a set, right?

"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)

keys = [room_id]
keys.extend(members_changed)
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

itertools.chain?

This comment has been minimized.

Copy link
@erikjohnston

erikjohnston Feb 19, 2019

Author Member

I don't think keys = list(itertools.chain([room_id], members_chained)) is any clearer or faster tbh

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

keys = itertools.chain([room_id], members_chained) is clearer and faster than this, imho. You don't need the list constructor.

This comment has been minimized.

Copy link
@erikjohnston

erikjohnston Feb 19, 2019

Author Member

Ok, done.

This comment has been minimized.

Copy link
@richvdh
self._send_invalidation_to_replication(
txn, _CURRENT_STATE_CACHE_NAME, keys,
)

def _invalidate_state_caches(self, room_id, members_changed):
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Args:
room_id (str): Room were state changed
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

where

members_changed (set[str]): The user_ids of members that have changed
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

iterable

"""
for member in members_changed:
self.get_rooms_for_user_with_stream_ordering.invalidate((member,))

for host in set(get_domain_from_id(u) for u in members_changed):
self.is_host_joined.invalidate((room_id, host))
self.was_host_joined.invalidate((room_id, host))

self.get_users_in_room.invalidate((room_id,))
self.get_room_summary.invalidate((room_id,))
self.get_current_state_ids.invalidate((room_id,))

def _send_invalidation_to_replication(self, txn, cache_name, keys):
"""Notifies replication that given cache has been invalidated.
Note that this does *not* invalidate the cache locally.
Args:
txn
cache_name (str)
keys (list[str])
This conversation was marked as resolved by erikjohnston

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 19, 2019

Member

any iterable will do?

"""

if isinstance(self.database_engine, PostgresEngine):
# get_next() returns a context manager which is designed to wrap
@@ -1201,7 +1256,7 @@ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
table="cache_invalidation_stream",
values={
"stream_id": stream_id,
"cache_func": cache_func.__name__,
"cache_func": cache_name,
"keys": list(keys),
"invalidation_ts": self.clock.time_msec(),
}
Copy path View file
@@ -979,30 +979,7 @@ def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
if ev_type == EventTypes.Member
)

for member in members_changed:
self._invalidate_cache_and_stream(
txn, self.get_rooms_for_user_with_stream_ordering, (member,)
)

for host in set(get_domain_from_id(u) for u in members_changed):
self._invalidate_cache_and_stream(
txn, self.is_host_joined, (room_id, host)
)
self._invalidate_cache_and_stream(
txn, self.was_host_joined, (room_id, host)
)

self._invalidate_cache_and_stream(
txn, self.get_users_in_room, (room_id,)
)

self._invalidate_cache_and_stream(
txn, self.get_room_summary, (room_id,)
)

self._invalidate_cache_and_stream(
txn, self.get_current_state_ids, (room_id,)
)
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)

def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.