Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Fix rare notifier bug where listeners dont timeout #1683

Merged
merged 3 commits into from Dec 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/handlers/sync.py
Expand Up @@ -510,6 +510,7 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
Returns:
Deferred(SyncResult)
"""
logger.info("Calculating sync response for %r", sync_config.user)

# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
Expand Down
40 changes: 23 additions & 17 deletions synapse/notifier.py
Expand Up @@ -17,6 +17,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError

from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
Expand Down Expand Up @@ -294,14 +295,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,

result = None
if timeout:
# Will be set to a _NotificationListener that we'll be waiting on.
# Allows us to cancel it.
listener = None

def timed_out():
if listener:
listener.deferred.cancel()
timer = self.clock.call_later(timeout / 1000., timed_out)
end_time = self.clock.time_msec() + timeout

prev_token = from_token
while not result:
Expand All @@ -312,18 +306,25 @@ def timed_out():
if result:
break

now = self.clock.time_msec()
if end_time <= now:
break

# Now we wait for the _NotifierUserStream to be told there
# is a new token.
# We need to supply the token we supplied to callback so
# that we don't miss any current_token updates.
prev_token = current_token
listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext():
yield listener.deferred
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
except DeferredTimedOutError:
break
except defer.CancelledError:
break

self.clock.cancel_call_later(timer, ignore_errs=True)
else:
current_token = user_stream.current_token
result = yield callback(from_token, current_token)
Expand Down Expand Up @@ -492,22 +493,27 @@ def wait_for_replication(self, callback, timeout):
"""
listener = _NotificationListener(None)

def timed_out():
listener.deferred.cancel()
end_time = self.clock.time_msec() + timeout

timer = self.clock.call_later(timeout / 1000., timed_out)
while True:
listener.deferred = self.replication_deferred.observe()
result = yield callback()
if result:
break

now = self.clock.time_msec()
if end_time <= now:
break

try:
with PreserveLoggingContext():
yield listener.deferred
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
except DeferredTimedOutError:
break
except defer.CancelledError:
break

self.clock.cancel_call_later(timer, ignore_errs=True)

defer.returnValue(result)
7 changes: 6 additions & 1 deletion synapse/util/__init__.py
Expand Up @@ -24,6 +24,11 @@
logger = logging.getLogger(__name__)


class DeferredTimedOutError(SynapseError):
def __init__(self):
super(SynapseError).__init__(504, "Timed out")


def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
Expand Down Expand Up @@ -89,7 +94,7 @@ def time_bound_deferred(self, given_deferred, time_out):

def timed_out_fn():
try:
ret_deferred.errback(SynapseError(504, "Timed out"))
ret_deferred.errback(DeferredTimedOutError())
except:
pass

Expand Down
4 changes: 4 additions & 0 deletions tests/utils.py
Expand Up @@ -294,6 +294,10 @@ def advance_time(self, secs):
def advance_time_msec(self, ms):
self.advance_time(ms / 1000.)

def time_bound_deferred(self, d, *args, **kwargs):
# We don't bother timing things out for now.
return d


class SQLiteMemoryDbPool(ConnectionPool, object):
def __init__(self):
Expand Down