Skip to content

Commit

Permalink
Optimize actor message passing.
Browse files Browse the repository at this point in the history
* Remove UUID generation, which is expensive.
* Replace Queue with a list and direct switch calls.  Queue supports
  multiple readers, which adds a bunch of overhead.
* Only calculate caller information if debugging enabled.
  • Loading branch information
fasaxc authored and Shaun Crampton committed Oct 6, 2015
1 parent 2d677db commit 25cc2f6
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 37 deletions.
139 changes: 114 additions & 25 deletions calico/felix/actor.py
Expand Up @@ -105,11 +105,9 @@
import os
import sys
import traceback
import uuid
import weakref

from gevent.event import AsyncResult
from gevent.queue import Queue
from calico.felix import futils
from calico.felix.futils import StatCounter

Expand All @@ -134,7 +132,8 @@ class Actor(object):
"""Number of calls to self._maybe_yield before it yields"""

def __init__(self, qualifier=None):
self._event_queue = Queue()
self._event_queue = []
self._scheduled = True
self.greenlet = gevent.Greenlet(self._loop)
self._op_count = 0
self._current_msg = None
Expand All @@ -153,6 +152,12 @@ def __init__(self, qualifier=None):
# constructed.
_log.info("%s created.", self.name)

def maybe_schedule(self, caller):
loop = gevent.get_hub().loop
if not self._scheduled:
self._scheduled = True
loop.run_callback(self.greenlet.switch, caller)

def start(self):
assert not self.greenlet, "Already running"
_log.info("Starting %s", self)
Expand Down Expand Up @@ -183,19 +188,25 @@ def _step(self):
It also has the beneficial side effect of introducing a new local
scope so that our variables die before we block next time.
"""
# Block waiting for work.
msg = self._event_queue.get()
hub = gevent.get_hub()
while not self._event_queue:
# Block waiting for work.
self._scheduled = False
caller = hub.switch()
assert self._scheduled, ("%s switched to from %s but scheduled "
"set to False." % (self, caller))
msg = self._event_queue.pop(0)

batch = [msg]
batches = []

if not msg.needs_own_batch:
# Try to pull some more work off the queue to combine into a
# batch.
while not self._event_queue.empty():
while self._event_queue:
# We're the only ones getting from the queue so this should
# never fail.
msg = self._event_queue.get_nowait()
msg = self._event_queue.pop(0)
if msg.needs_own_batch:
if batch:
batches.append(batch)
Expand All @@ -221,7 +232,7 @@ def _step(self):
for msg in batch:
_log.debug("Message %s recd by %s from %s, queue length %d",
msg, msg.recipient, msg.caller,
self._event_queue.qsize())
len(self._event_queue))
self._current_msg = msg
actor_storage.msg_uuid = msg.uuid
actor_storage.msg_name = msg.name
Expand Down Expand Up @@ -364,7 +375,7 @@ def _maybe_yield(self):
def __str__(self):
return self.__class__.__name__ + "<%s,queue_len=%s,live=%s,msg=%s>" % (
self.qualifier,
self._event_queue.qsize(),
len(self._event_queue),
bool(self.greenlet),
self._current_msg
)
Expand All @@ -384,10 +395,16 @@ def wait_and_check(async_results):
r.get()


next_message_id = 0


class Message(object):
"""
Message passed to an actor.
"""
__slots__ = ("uuid", "method", "results", "caller", "name",
"needs_own_batch", "recipient")

def __init__(self, msg_id, method, results, caller_path, recipient,
needs_own_batch):
self.uuid = msg_id
Expand Down Expand Up @@ -429,18 +446,25 @@ def decorator(fn):

@functools.wraps(fn)
def queue_fn(self, *args, **kwargs):
# Get call information for logging purposes.
calling_file, line_no, func, _ = traceback.extract_stack()[-2]
calling_file = os.path.basename(calling_file)
calling_path = "%s:%s:%s" % (calling_file, line_no, func)
try:
caller_name = "%s.%s" % (actor_storage.class_name,
actor_storage.msg_name)
caller = "%s (processing %s)" % (actor_storage.name,
actor_storage.msg_uuid)
except AttributeError:
caller_name = calling_path
caller = calling_path
# Calculating the calling information is expensive, so only do it
# if debug is enabled.
caller = "<disabled>"
caller_name = "<disabled>"
calling_path = "<disabled>"

if _log.isEnabledFor(logging.DEBUG):
# Get call information for logging purposes.
calling_file, line_no, func, _ = traceback.extract_stack()[-2]
calling_file = os.path.basename(calling_file)
calling_path = "%s:%s:%s" % (calling_file, line_no, func)
try:
caller_name = "%s.%s" % (actor_storage.class_name,
actor_storage.msg_name)
caller = "%s (processing %s)" % (actor_storage.name,
actor_storage.msg_uuid)
except AttributeError:
caller_name = calling_path
caller = calling_path

# Figure out our arguments.
async_set = "async" in kwargs
Expand All @@ -462,10 +486,17 @@ def queue_fn(self, *args, **kwargs):
method_name,
self.__class__.__name__)
)
pass

# async must be specified, unless on the same actor.
assert async_set, "All cross-actor event calls must specify async arg."
msg_id = uuid.uuid4().hex[:12]
assert async_set, "Cross-actor event calls must specify async arg."

# Allocate a message ID. We rely on there being no yield point
# here for thread safety.
global next_message_id
msg_id = next_message_id
next_message_id += 1

if not on_same_greenlet and not async:
_stats.increment("Blocking calls started")
_log.debug("BLOCKING CALL: [%s] %s -> %s", msg_id,
Expand All @@ -479,8 +510,9 @@ def queue_fn(self, *args, **kwargs):
needs_own_batch=needs_own_batch)

_log.debug("Message %s sent by %s to %s, queue length %d",
msg, caller, self.name, self._event_queue.qsize())
self._event_queue.put(msg, block=False)
msg, caller, self.name, len(self._event_queue))
self._event_queue.append(msg)
self.maybe_schedule(caller)
if async:
return result
else:
Expand Down Expand Up @@ -615,3 +647,60 @@ def _exit(rc):
This function is mainly here to be mocked out in UTs.
"""
os._exit(rc) # pragma nocover


if __name__ == "__main__":
from calico.monotonic import monotonic_time
print "Running actor perf test"

class MessageChainActor(Actor):
def __init__(self, set_done=False):
super(MessageChainActor, self).__init__()
self.children = []
self.set_done = set_done

@actor_message()
def pass_message_on(self, message):
if not self.children and message and self.set_done:
print "Time:", monotonic_time() - start_time
done.set()
for c in self.children:
gevent.sleep(0.01)
c.pass_message_on(message, async=True)

root = MessageChainActor()
sink = MessageChainActor()
sink.start()
root.start()
node = root
for x in xrange(10000):
child = MessageChainActor()
child.start()
node.children.append(child)
node.children.append(sink)
node = child
node.set_done = True
from gevent.event import Event
done = Event()
start_time = monotonic_time()
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.0001)
root.pass_message_on("", async=True)
gevent.sleep(0.01)
root.pass_message_on("Foobar", async=True)
done.wait()
print next_message_id
4 changes: 2 additions & 2 deletions calico/felix/ipsets.py
Expand Up @@ -586,7 +586,7 @@ def __str__(self):
self.__class__.__name__ + "<queue_len=%s,live=%s,msg=%s,"
"name=%s>" %
(
self._event_queue.qsize(),
len(self._event_queue),
bool(self.greenlet),
self._current_msg,
self.name,
Expand Down Expand Up @@ -641,7 +641,7 @@ def __str__(self):
self.__class__.__name__ + "<queue_len=%s,live=%s,msg=%s,"
"name=%s,id=%s>" %
(
self._event_queue.qsize(),
len(self._event_queue),
bool(self.greenlet),
self._current_msg,
self.name,
Expand Down
2 changes: 1 addition & 1 deletion calico/felix/test/base.py
Expand Up @@ -46,7 +46,7 @@ def step_actor(self, actor):
# actor_message's asserts.
with mock.patch.object(actor, "greenlet"):
actor.greenlet = gevent.getcurrent()
while not actor._event_queue.empty():
while actor._event_queue:
actor._step()


Expand Down
9 changes: 0 additions & 9 deletions calico/felix/test/test_actor.py
Expand Up @@ -194,15 +194,6 @@ def test_same_actor_call(self):
self._actor.start() # really start it.
self.assertEqual("c1c2", self._actor.do_c(async=False))

def test_full_queue(self):
eq = self._actor._event_queue
with nested(mock.patch.object(eq, "full", autospec=True),
mock.patch.object(eq, "put", autospec=True)) as \
(m_full, m_put):
m_full.return_value = True
self._actor.do_a(async=True)
self.assertFalse(m_put.call_args[1]["block"])

def test_loop_coverage(self):
with mock.patch.object(self._actor, "_step", autospec=True) as m_step:
m_step.side_effect = ExpectedException()
Expand Down

0 comments on commit 25cc2f6

Please sign in to comment.