Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Introduce SyncResponseCache and add /sync response cache timeout #9739

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9739.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix #8518 and #3880.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These won't get auto-linked by towncrier. Please spell out the bugs you are fixing - linking back to the GH issue number isn't normally necessary because anyone who is really interested can follow the link to the PR and thence to the GH issues.

1 change: 1 addition & 0 deletions changelog.d/9739.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added experimental support to cache `/sync` responses with config key `experimental_features.sync_cache_timeout_ms` (in milliseconds).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Do we announce experimental settings like these? Is this even experimental?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes, we announce experimental features. I don't think this should be one though. Experimental features are more appropriate when we are making changes to the Matrix protocol; this is all internal to Synapse.

If it needs shaking out before releasing (and I'm not convinced it does), we can do that with test deployments using branches rather than with feature flags.

5 changes: 5 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ def read_config(self, config: JsonDict, **kwargs):

# MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool

# /sync ResponseCache timeout (in ms)
self.sync_cache_timeout = experimental.get(
"sync_cache_timeout_ms", 0
) # type: int
14 changes: 9 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.caches.sync_response_cache import SyncResponseCache
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client

Expand Down Expand Up @@ -244,9 +244,11 @@ def __init__(self, hs: "HomeServer"):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(
hs.get_clock(), "sync"
) # type: ResponseCache[Tuple[Any, ...]]
self.response_cache = SyncResponseCache(
hs.get_clock(),
"sync",
timeout_ms=self.hs_config.experimental.sync_cache_timeout,
) # type: SyncResponseCache[Tuple[Any, ...]]
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
Expand Down Expand Up @@ -278,8 +280,10 @@ async def wait_for_sync_for_user(
user_id = sync_config.user.to_string()
await self.auth.check_auth_blocking(requester=requester)

res = await self.response_cache.wrap(
res = await self.response_cache.wrap_conditional(
sync_config.request_key,
# Evict cache if next_batch would refer to this cached result
lambda result: since_token != result.next_batch,
self._wait_for_sync_for_user,
sync_config,
since_token,
Expand Down
101 changes: 101 additions & 0 deletions synapse/util/caches/sync_response_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from typing import Any, Callable, Dict

from twisted.internet import defer

from synapse.util import Clock
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.response_cache import ResponseCache, T

logger = logging.getLogger(__name__)


# A special class for /sync responses, to conditionally cache these.
class SyncResponseCache(ResponseCache[T]):
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
super().__init__(clock, name, timeout_ms)

self.conditionals = {} # type: Dict[T, Callable[[Any], bool]]

def run_conditional(self, key: T, result: Any) -> bool:
"""Runs a conditional set on key T, defaults to True"""
cond = self.conditionals.get(key, None)
if cond is None:
return True
else:
try:
# Below type annotation is needed for mypy to shush about some statements being unreachable,
# we essentially have to not trust other functions to be able to correctly recover from any fallout
# (and log it)
res = cond(result) # type: Any
except Exception:
logger.exception(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.exception because then it throws it to sentry, i think this is pretty sentry-worthy

"[%s]: Executing conditional %r on %s raised an exception.",
self._name,
cond,
key,
)
# Evict cache out of caution.
return False
else:
if not isinstance(res, bool):
logger.warning(
"[%s]: Conditional %r returned non-bool value %r (for key %r)",
self._name,
cond,
res,
key,
)
# Return concrete boolean value based on falsy or truthiness.
# If this raises, then so be it, then this value wasn't ever supposed to be true" or "false"
# anyways, then have it be a scream test.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be more correct by wrapping a try-catch around this, but i think that'd be a bit too over-cautious, i dont know what situations could cause bool() to raise, but coupled with the limited use, i think this is fine.

Still added a comment, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Yes, i'd like it to be this "correct")

return bool(res)
else:
return res

# Copy this method wholesale from ResponseCache to be able to alter the inner `remove` function
def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't not copy this without editing ResponseCache to use a method instead of the ::{{remove}} closure, I didn't wanna touch ResponseCache, so i just copied this part.

Should I make ResponseCache.set more generic to allow for it's "remove" closure to be class-defined, or is this okay? (Could add a little bit more maintenance burden to leave it like this)

"""Same as ResponseCache.set, but is conditional-aware"""
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result

def remove(r):
if self.timeout_sec and (
not isinstance(r, BaseException) and self.run_conditional(key, r)
):
self.clock.call_later(
self.timeout_sec, self.pending_result_cache.pop, key, None
)
else:
self.pending_result_cache.pop(key, None)

self.conditionals.pop(key, None)

return r

result.addBoth(remove)
return result.observe()

def wrap_conditional(
self,
key: T,
conditional: "Callable[[Any], bool]",
callback: "Callable[..., Any]",
*args: Any,
**kwargs: Any
) -> defer.Deferred:
"""Same as wrap(), but adds a conditional to be executed on completion.

Only the very first caller with this key, between both wrap() and wrap_conditional(), will set the
conditional function, otherwise the 'conditional' argument will be ignored."""

if self.get(key) is None: # we are the first caller
logger.debug(
"[%s]: We are the very first caller for [%s], setting conditional %r...",
self._name,
key,
conditional,
)
self.conditionals[key] = conditional

return self.wrap(key, callback, *args, **kwargs)
134 changes: 134 additions & 0 deletions tests/util/caches/test_sync_responsecache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from synapse.util.caches.sync_response_cache import SyncResponseCache

from tests.server import get_clock
from tests.unittest import TestCase

# A random callback that returns an object
CALLBACK = lambda: OBJ

# An object, can be equalized to itself
OBJ = {0}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All that matters is that i have something with an identifier that x == x, so i can easily boilerplate it.


# The key used on the caches throughout this file
KEY = 0

# Easy conditionals
YES = lambda _: True
NO = lambda _: False


class SyncResponseCacheTestCase(TestCase):
"""
A TestCase class for SyncResponseCache.

The test-case function naming has some logic to it in it's parts, here's some notes about it:
first: Denotes tests that test wrap_conditional as a "first caller"
later: Denotes tests that test wrap_conditional as a non-"first caller"
multi: Denotes tests that have multiple consequent calls to wrap*
approve: Denotes tests where the conditional approves of the results (letting cache).
disapprove: Denotes tests where the conditional disapproves of the result (expiring it).
hit: Denotes tests which expected outcome is a cache hit.
miss: Denotes tests which expected outcome is a cache miss.
"""

def setUp(self):
self.reactor, self.clock = get_clock()
self.cache = SyncResponseCache(self.clock, "keeping_cache", timeout_ms=1000)

# Extra helper functions

def is_hit(self):
self.assertEqual(
OBJ,
self.successResultOf(self.cache.get(KEY)),
"cache should not be expired",
)

def is_miss(self):
self.assertIsNone(self.cache.get(KEY), "cache should be expired")

def pump(self):
self.reactor.pump((1,))

# Like CALLBACK, but waits a second, and is async
async def delayed_callback(self):
await self.clock.sleep(1)
return OBJ

# Actual tests

def test_cache_first_approve_hit(self):
self.cache.wrap_conditional(KEY, YES, CALLBACK)

self.is_hit()

def test_cache_first_disapprove_miss(self):
self.cache.wrap_conditional(KEY, NO, CALLBACK)

self.is_miss()

def test_cache_later_approve_hit(self):
# first
self.cache.wrap(KEY, CALLBACK)

# second
self.cache.wrap_conditional(KEY, YES, CALLBACK)

self.is_hit()

def test_cache_later_disapprove_hit(self):
# first
self.cache.wrap(KEY, CALLBACK)

# second
self.cache.wrap_conditional(KEY, NO, CALLBACK)

self.is_hit()

# Show how later calls to wrap_conditional dont change it's conditional outcome
# These need self.delayed_callback, because else the first wrap* (by logic of run_in_background)
# will also run the function *and* it's callbacks, including (Sync)ResponseCache.set::{{remove}}

def test_cache_multi_first_approve_later_approve_hit(self):
# first
self.cache.wrap_conditional(KEY, YES, self.delayed_callback)

# second
self.cache.wrap_conditional(KEY, YES, self.delayed_callback)

self.pump()

self.is_hit()

def test_cache_multi_first_approve_later_disapprove_hit(self):
# first
self.cache.wrap_conditional(KEY, YES, self.delayed_callback)

# second
self.cache.wrap_conditional(KEY, NO, self.delayed_callback)

self.pump()

self.is_hit()

def test_cache_multi_first_disapprove_later_approve_miss(self):
# first
self.cache.wrap_conditional(KEY, NO, self.delayed_callback)

# second
self.cache.wrap_conditional(KEY, YES, self.delayed_callback)

self.pump()

self.is_miss()

def test_cache_multi_first_disapprove_later_disapprove_miss(self):
# first
self.cache.wrap_conditional(KEY, NO, self.delayed_callback)

# second
self.cache.wrap_conditional(KEY, NO, self.delayed_callback)

self.pump()

self.is_miss()