Skip to content

Commit

Permalink
loop: internal async referencing
Browse files Browse the repository at this point in the history
  • Loading branch information
koehlma committed Feb 8, 2016
1 parent 0da7daa commit 9c3de11
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
6 changes: 0 additions & 6 deletions tests/test_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ def test_call_later(self):
def callback():
self.callback_called = True

def on_wakeup(async_handle):
async_handle.close()

self.async = uv.Async(on_wakeup=on_wakeup)
self.async.send()

self.loop.call_later(callback)
self.loop.run()

Expand Down
12 changes: 12 additions & 0 deletions uv/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ def _destroy(self, _):
self.run()
self.close()

def reference_internal_async(self):
"""
Reference the internal async handle used for wakeup.
"""
lib.uv_ref(ffi.cast('uv_handle_t*', self.internal_uv_async))

def dereference_internal_async(self):
"""
Dereference the internal async handle used for wakeup.
"""
lib.uv_unref(ffi.cast('uv_handle_t*', self.internal_uv_async))

@property
def user_loop(self):
"""
Expand Down
13 changes: 10 additions & 3 deletions uv/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def __init__(self, allocator=None, buffer_size=2**16, default=False):
self.make_current()
self.pending_structures = set()
self.pending_callbacks = collections.deque()
self.pending_callbacks_lock = threading.RLock()

@property
def closed(self):
Expand Down Expand Up @@ -584,8 +585,10 @@ def call_later(self, callback, *arguments, **keywords):
:type keywords:
dict
"""
self.pending_callbacks.append((callback, arguments, keywords))
self.base_loop.wakeup()
with self.pending_callbacks_lock:
self.pending_callbacks.append((callback, arguments, keywords))
self.base_loop.wakeup()
self.base_loop.reference_internal_async()

def on_wakeup(self):
"""
Expand All @@ -597,13 +600,17 @@ def on_wakeup(self):
"""
try:
while True:
callback, arguments, keywords = self.pending_callbacks.popleft()
with self.pending_callbacks_lock:
callback, arguments, keywords = self.pending_callbacks.popleft()
try:
callback(*arguments, **keywords)
except BaseException:
self.handle_exception()
except IndexError:
pass
with self.pending_callbacks_lock:
if not self.pending_callbacks:
self.base_loop.dereference_internal_async()

def handle_exception(self):
"""
Expand Down

0 comments on commit 9c3de11

Please sign in to comment.