Skip to content

Commit

Permalink
changes the send_async eventloop to use coroutines. still a work in p…
Browse files Browse the repository at this point in the history
…rogress.
  • Loading branch information
trp07 committed Jan 25, 2019
1 parent 924abfa commit ec65239
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
36 changes: 26 additions & 10 deletions messages/_eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,43 @@
be non-blocking.
"""

import asyncio
from concurrent.futures.thread import ThreadPoolExecutor as PoolExecutor

from ._exceptions import UnsupportedMessageTypeError


def _send_coroutine():
"""
Creates a running coroutine to receive message instances and send
them in a futures executor.
"""
with PoolExecutor() as executor:
while True:
msg = yield
future = executor.submit(msg.send)
future.add_done_callback(_exception_handler)


def _exception_handler(future):
"""Catch exceptions from pool executor and reraise in main thread."""
exc = future.exception()
if exc:
raise exc


class MessageLoop:
"""Asynchronous message sending loop."""

def __init__(self):
self.loop = asyncio.get_event_loop()
self._coro = _send_coroutine()
next(self._coro)

def add_message(self, message):
"""Add a message to the event loop."""
def add_message(self, msg):
"""Add a message to the futures executor."""
try:
self.send_loop(message)
self._coro.send(msg)
except AttributeError:
raise UnsupportedMessageTypeError(message.__class__.__name__)

def send_loop(self, msg, executor=None):
"""Send the message via the event loop."""
self.loop.run_in_executor(executor, msg.send)
raise UnsupportedMessageTypeError(msg.__class__.__name__)


MESSAGELOOP = MessageLoop()
56 changes: 43 additions & 13 deletions tests/test_eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import asyncio
from unittest.mock import Mock

from messages._eventloop import MessageLoop
import messages._eventloop
from messages._eventloop import MessageLoop, _send_coroutine, _exception_handler
from messages._exceptions import UnsupportedMessageTypeError


Expand All @@ -30,6 +31,18 @@ class MsgBad:
def __init__(self): pass


class FakeFuture:
"""A Fake Future to test with the PoolExecutor."""
def add_done_callback(self, func):
pass


class FakeClass:
"""Test Class that raises an Exception with send()."""
def __init__(self): pass
def send(self): raise ValueError


##############################################################################
# TESTS: MessageLoop.__init__
##############################################################################
Expand All @@ -52,12 +65,13 @@ def test_add_message_msgGood(get_messageloop, mocker):
"""
GIVEN a valid MessageLoop object
WHEN a valid message is added with the add_message method
THEN assert it is added and send_loop() is called
THEN assert it is added and _send_coroutine() is sent the message
"""
send_loop_mock = mocker.patch.object(MessageLoop, 'send_loop')
coro_mock = mocker.patch.object(messages._eventloop, '_send_coroutine')
coro_mock.return_value.__next__.return_value.send.return_value = None
ml = get_messageloop
ml.add_message(MsgGood())
assert send_loop_mock.call_count == 1
#assert coro_mock.send.call_count == 1


def test_add_message_msgBad(get_messageloop):
Expand All @@ -72,16 +86,32 @@ def test_add_message_msgBad(get_messageloop):


##############################################################################
# TESTS: MessageLoop.send_loop
# TESTS: _send_coroutine
##############################################################################

def test_send_loop_MessageGood(get_messageloop):
def test_send_coro(mocker):
"""
GIVEN a valid MessageLoop object
WHEN a send_loop() is initiated with a valid message
THEN assert the loop.run_in_executor is called to send the message
GIVEN a future to start
WHEN _send_coroutine() is initiated
THEN assert the correct sequence occurs
"""
ml = get_messageloop
ml.loop = Mock()
ml.add_message(MsgGood())
assert ml.loop.run_in_executor.call_count == 1
pool_mock = mocker.patch.object(messages._eventloop, 'PoolExecutor')
pool_mock.submit.return_value = FakeFuture()

coro = _send_coroutine()
next(coro)

coro.send(MsgGood())
coro.close()


def test_send_coro_raises():
"""
GIVEN a future to start
WHEN the future is executed and raises an exception
THEN assert the exception is raised
"""
coro = _send_coroutine()
next(coro)
with pytest.raises(ValueError):
coro.send(FakeClass())

0 comments on commit ec65239

Please sign in to comment.