Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Run the AS senders as background processes #4189

Merged
merged 2 commits into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/4189.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Run the AS senders as background processes to fix warnings

36 changes: 24 additions & 12 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
from twisted.internet import defer

from synapse.appservice import ApplicationServiceState
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,27 +104,34 @@ def __init__(self, txn_ctrl, clock):
self.clock = clock

def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
run_in_background(self._send_request, service)

@defer.inlineCallbacks
def _send_request(self, service):
# start a sender for this appservice if we don't already have one

if service.id in self.requests_in_flight:
return

run_as_background_process(
"as-sender-%s" % (service.id, ),
self._send_request, service,
)

@defer.inlineCallbacks
def _send_request(self, service):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert(service.id not in self.requests_in_flight)

self.requests_in_flight.add(service.id)
try:
while True:
events = self.queued_events.pop(service.id, [])
if not events:
return

with Measure(self.clock, "servicequeuer.send"):
try:
yield self.txn_ctrl.send(service, events)
except Exception:
logger.exception("AS request failed")
try:
yield self.txn_ctrl.send(service, events)
except Exception:
logger.exception("AS request failed")
finally:
self.requests_in_flight.discard(service.id)

Expand Down Expand Up @@ -223,7 +230,12 @@ def __init__(self, clock, store, as_api, service, callback):
self.backoff_counter = 1

def recover(self):
self.clock.call_later((2 ** self.backoff_counter), self.retry)
def _retry():
run_as_background_process(
"as-recoverer-%s" % (self.service.id,),
self.retry,
)
self.clock.call_later((2 ** self.backoff_counter), _retry)

def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
Expand Down