Skip to content

Commit

Permalink
emit will now queue events and not yield to loop; v2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rossengeorgiev committed Aug 9, 2016
1 parent 308d4d6 commit 659116c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
38 changes: 23 additions & 15 deletions eventemitter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
__version__ = "1.4"
__version__ = "2.0"
__author__ = "Rossen Georgiev"

from collections import defaultdict, OrderedDict
import gevent
from gevent.event import AsyncResult
from gevent.queue import Queue


class EventEmitter(object):
"""
Implements event emitter using ``gevent`` module.
Every callback is executed via :meth:`gevent.spawn`.
"""
__worker = None

def emit(self, event, *args):
"""
Expand All @@ -20,24 +22,29 @@ def emit(self, event, *args):
:type event: any type
:param args: any or no arguments
"""
if hasattr(self, '_EventEmitter__callbacks'):
self.__queue.put((event, args))
self.__queue.put((None, (event,) + args))

gevent.idle()
if self.__worker is None or self.__worker.ready():
self.__worker = gevent.spawn(self.__emit_worker)

if hasattr(self, '_EventEmitter__callbacks'):
if event in self.__callbacks:
for callback, once in list(self.__callbacks[event].items()):
if once:
self.remove_listener(event, callback)
if isinstance(callback, AsyncResult):
callback.set(args)
else:
gevent.spawn(callback, *args)
def __emit_worker(self):
for event, args in self.__queue:
if hasattr(self, '_EventEmitter__callbacks'):
if event in self.__callbacks:
for callback, once in list(self.__callbacks[event].items()):
if once:
self.remove_listener(event, callback)
if isinstance(callback, AsyncResult):
callback.set(args)
else:
gevent.spawn(callback, *args)

gevent.idle()
gevent.idle()

# every event is also emitted as None
if event is not None:
self.emit(None, event, *args)
if self.__queue.empty():
break

def on(self, event, callback=None, once=False):
"""
Expand All @@ -61,6 +68,7 @@ def handle_event():

if not hasattr(self, '_EventEmitter__callbacks'):
self.__callbacks = defaultdict(OrderedDict)
self.__queue = Queue()

# when used function
if callback:
Expand Down
26 changes: 24 additions & 2 deletions tests/test_eventemitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class Dummy(EventEmitter):
self.dummy = Dummy()
self.calls = 0

def idle(self):
for _ in range(100):
gevent.idle()

def test_emit_with_args(self):
@self.dummy.on('one')
def func_one(a):
Expand All @@ -24,20 +28,28 @@ def func_two(a, b):
self.dummy.emit('one', 1)
self.dummy.emit('two', 10, 100)

self.idle()
self.assertEqual(self.calls, 111)

self.dummy.emit('one', 1)
self.dummy.emit('two', 10, 100)

self.idle()
self.assertEqual(self.calls, 222)

def test_emit_async(self):
result = gevent.event.AsyncResult()
self.dummy.on('event', result)
self.dummy.emit('event', 1)

self.assertEqual(result.get(block=False), (1,))
self.assertEqual(result.get(block=True, timeout=1), (1,))

result2 = gevent.event.AsyncResult()
self.dummy.on('event', result2)
self.dummy.emit('event', 1, 2, 3)

self.assertEqual(result2.get(block=False), (1, 2, 3))
self.idle()
self.assertEqual(result2.get(block=True, timeout=1), (1, 2, 3))

def test_on(self):
def func_one():
Expand All @@ -50,6 +62,7 @@ def func_two():
self.dummy.on('event', func_two)
self.dummy.emit('event')

self.idle()
self.assertEqual(self.calls, 11)

def test_on_decorator(self):
Expand All @@ -63,6 +76,7 @@ def func_two():

self.dummy.emit('event')

self.idle()
self.assertEqual(self.calls, 11)

def test_once(self):
Expand All @@ -73,6 +87,7 @@ def func_one():
self.dummy.emit('event')
self.dummy.emit('event')

self.idle()
self.assertEqual(self.calls, 1)

def test_once_decorator(self):
Expand All @@ -83,16 +98,19 @@ def func_one():
self.dummy.emit('event')
self.dummy.emit('event')

self.idle()
self.assertEqual(self.calls, 1)

def test_wait_event(self):
def tiny_worker():
self.dummy.wait_event('event')

g = gevent.spawn(tiny_worker)
self.idle()

self.dummy.emit('event')

self.idle()
g.get(block=False)

def test_wait_event_with_timeout(self):
Expand Down Expand Up @@ -123,6 +141,7 @@ def func_one():
self.dummy.emit('event')
self.dummy.emit('event')

self.idle()
self.assertEqual(self.calls, 0)

def test_remove_all_listeners(self):
Expand All @@ -142,6 +161,7 @@ def func_three():
self.dummy.emit('event')
self.dummy.emit('other')

self.idle()
self.assertEqual(self.calls, 0)

def test_remove_all_listeners_for_event(self):
Expand All @@ -161,6 +181,7 @@ def func_three():
self.dummy.emit('event')
self.dummy.emit('other')

self.idle()
self.assertEqual(self.calls, 100)

def test_callback_call_order(self):
Expand All @@ -179,4 +200,5 @@ def aaa():

self.dummy.emit('event')

self.idle()
self.assertEqual(result, [3, 2, 1])

0 comments on commit 659116c

Please sign in to comment.