diff --git a/calico/felix/actor.py b/calico/felix/actor.py index bb353559a20..49d716fbed4 100644 --- a/calico/felix/actor.py +++ b/calico/felix/actor.py @@ -99,6 +99,7 @@ """ import collections import functools +from calico.monotonic import monotonic_time import gevent import gevent.local import logging @@ -114,6 +115,10 @@ _log = logging.getLogger(__name__) +# Minimum gevent scheduling delay. A delay of 0 should mean "yield" but +# gevent has a known issue that a greenlet that sleeps for 0 may be rescheduled +# immediately. Any small positive value is enough to truly yield. +MIN_DELAY = 0.000001 ResultOrExc = collections.namedtuple("ResultOrExc", ("result", "exception")) @@ -129,9 +134,16 @@ class Actor(object): Class that contains a queue and a greenlet serving that queue. """ - max_ops_before_yield = 10000 + max_ops_before_yield = 1000 """Number of calls to self._maybe_yield before it yields""" + batch_delay = 0.01 + """ + Minimum delay between schedules of this Actor. Larger values encourage + more batching of messages and reduce starvation (but introduce more + latency when we're under load). + """ + def __init__(self, qualifier=None): self._event_queue = collections.deque() @@ -139,6 +151,8 @@ def __init__(self, qualifier=None): # queue or has been scheduled to do so. Set to False when the loop # runs out of work and switches to the Hub to wait for more. self._scheduled = True + # (Monotonic time) timestamp of last schedule. + self._last_scheduled = None # Cache the gevent Hub and main loop. self._gevent_hub = gevent.get_hub() self._gevent_loop = self._gevent_hub.loop @@ -178,10 +192,23 @@ def maybe_schedule(self, caller): # logging between the test and set of self._scheduled. if not self._scheduled: self._scheduled = True + # Calculate the scheduling delay. If this Actor hasn't been + # scheduled for a long time, we'll schedule it straight away, + # otherwise we back off to the batch_delay to encourage work to be + # batched up when we're under load. + now = monotonic_time() + if self._last_scheduled is not None: + time_since_last_schedule = now - self._last_scheduled + delay = max(self.batch_delay - time_since_last_schedule, + MIN_DELAY) + else: + delay = MIN_DELAY # We can't switch directly to the Actor's greenlet because that # prevents gevent from doing its scheduling. Instead, we ask the # gevent event loop to switch to the greenlet. - self._gevent_loop.run_callback(self._switch, caller) + t = self._gevent_loop.timer(delay) + t.start(self._switch, caller) + self._last_scheduled = now _log.debug("Scheduled %s", self) def _switch(self, value): @@ -418,7 +445,7 @@ def _maybe_yield(self): """ self._op_count += 1 if self._op_count >= self.max_ops_before_yield: - gevent.sleep() + gevent.sleep(MIN_DELAY) self._op_count = 0 def __str__(self): diff --git a/calico/felix/test/test_actor.py b/calico/felix/test/test_actor.py index d56189426c4..09ed6e6239b 100644 --- a/calico/felix/test/test_actor.py +++ b/calico/felix/test/test_actor.py @@ -206,7 +206,7 @@ def test_yield(self, m_sleep): self._actor.do_a(async=False) self._actor.do_a(async=False) self._actor.do_a(async=False) - m_sleep.assert_called_once_with() + m_sleep.assert_called_once_with(0.000001) def test_wait_and_check_no_input(self): actor.wait_and_check([])