Skip to content

Commit

Permalink
Merge pull request #853 from projectcalico/smc-batch-delay
Browse files Browse the repository at this point in the history
Add a delay to Actor scheduling when under load.
  • Loading branch information
Peter White committed Dec 8, 2015
2 parents f16b952 + 7042961 commit 4845eaa
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
33 changes: 30 additions & 3 deletions calico/felix/actor.py
Expand Up @@ -99,6 +99,7 @@
"""
import collections
import functools
from calico.monotonic import monotonic_time
import gevent
import gevent.local
import logging
Expand All @@ -114,6 +115,10 @@

_log = logging.getLogger(__name__)

# Minimum gevent scheduling delay. A delay of 0 should mean "yield" but
# gevent has a known issue that a greenlet that sleeps for 0 may be rescheduled
# immediately. Any small positive value is enough to truly yield.
MIN_DELAY = 0.000001

ResultOrExc = collections.namedtuple("ResultOrExc", ("result", "exception"))

Expand All @@ -129,16 +134,25 @@ class Actor(object):
Class that contains a queue and a greenlet serving that queue.
"""

max_ops_before_yield = 10000
max_ops_before_yield = 1000
"""Number of calls to self._maybe_yield before it yields"""

batch_delay = 0.01
"""
Minimum delay between schedules of this Actor. Larger values encourage
more batching of messages and reduce starvation (but introduce more
latency when we're under load).
"""

def __init__(self, qualifier=None):
self._event_queue = collections.deque()

# Set to True when the main loop is actively processing the input
# queue or has been scheduled to do so. Set to False when the loop
# runs out of work and switches to the Hub to wait for more.
self._scheduled = True
# (Monotonic time) timestamp of last schedule.
self._last_scheduled = None
# Cache the gevent Hub and main loop.
self._gevent_hub = gevent.get_hub()
self._gevent_loop = self._gevent_hub.loop
Expand Down Expand Up @@ -178,10 +192,23 @@ def maybe_schedule(self, caller):
# logging between the test and set of self._scheduled.
if not self._scheduled:
self._scheduled = True
# Calculate the scheduling delay. If this Actor hasn't been
# scheduled for a long time, we'll schedule it straight away,
# otherwise we back off to the batch_delay to encourage work to be
# batched up when we're under load.
now = monotonic_time()
if self._last_scheduled is not None:
time_since_last_schedule = now - self._last_scheduled
delay = max(self.batch_delay - time_since_last_schedule,
MIN_DELAY)
else:
delay = MIN_DELAY
# We can't switch directly to the Actor's greenlet because that
# prevents gevent from doing its scheduling. Instead, we ask the
# gevent event loop to switch to the greenlet.
self._gevent_loop.run_callback(self._switch, caller)
t = self._gevent_loop.timer(delay)
t.start(self._switch, caller)
self._last_scheduled = now
_log.debug("Scheduled %s", self)

def _switch(self, value):
Expand Down Expand Up @@ -418,7 +445,7 @@ def _maybe_yield(self):
"""
self._op_count += 1
if self._op_count >= self.max_ops_before_yield:
gevent.sleep()
gevent.sleep(MIN_DELAY)
self._op_count = 0

def __str__(self):
Expand Down
2 changes: 1 addition & 1 deletion calico/felix/test/test_actor.py
Expand Up @@ -206,7 +206,7 @@ def test_yield(self, m_sleep):
self._actor.do_a(async=False)
self._actor.do_a(async=False)
self._actor.do_a(async=False)
m_sleep.assert_called_once_with()
m_sleep.assert_called_once_with(0.000001)

def test_wait_and_check_no_input(self):
actor.wait_and_check([])
Expand Down

0 comments on commit 4845eaa

Please sign in to comment.