Skip to content

Commit

Permalink
Add async client.wait(event) and make client.send non-blocking async
Browse files Browse the repository at this point in the history
  • Loading branch information
numberoverzero committed Jan 25, 2016
1 parent ec9fbe7 commit c9e4a13
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 28 deletions.
21 changes: 12 additions & 9 deletions bottom/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ class Client:
protocol = None

def __init__(self, host, port, *, encoding="UTF-8", ssl=True, loop=None):
self._handlers = collections.defaultdict(list)

self.host = host
self.port = port
self.ssl = ssl
Expand All @@ -20,21 +18,25 @@ def __init__(self, host, port, *, encoding="UTF-8", ssl=True, loop=None):
loop = asyncio.get_event_loop()
self.loop = loop

self._handlers = collections.defaultdict(list)
self._events = collections.defaultdict(
lambda: asyncio.Event(loop=self.loop))

def send(self, command, **kwargs):
"""
Send a message to the server.
Schedule a message to be sent to the server.
Examples
--------
client.send("nick", nick="weatherbot")
client.send("privmsg", target="#python", message="Hello, World!")
"""
packed_command = pack_command(command, **kwargs).strip()
self.loop.create_task(self._send(packed_command))

async def _send(self, packed_command):
if not self.protocol:
raise RuntimeError("Not connected")
packed_command = pack_command(command, **kwargs).strip()
self.protocol.write(packed_command)

async def connect(self):
Expand All @@ -52,17 +54,18 @@ async def disconnect(self):

def trigger(self, event, **kwargs):
"""Trigger all handlers for an event to (asynchronously) execute"""
for func in self._handlers[event.upper()]:
event = event.upper()
for func in self._handlers[event]:
self.loop.create_task(func(**kwargs))
asyncio_event = self._events[event]
# This will unblock anyone that is awaiting on the next loop update,
# while still ensuring the next `await client.wait(event)` doesn't
# immediately fire.
asyncio_event.set()
asyncio_event.clear()
async_event = self._events[event]
async_event.set()
async_event.clear()

async def wait(self, event):
await self._events[event]
await self._events[event.upper()].wait()

def on(self, event, func=None):
"""
Expand Down
11 changes: 11 additions & 0 deletions tests/integ/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ def loop():
return loop


@pytest.fixture
def flush(loop):
"""Run loop once, to execute any pending tasks"""
async def sentinel():
pass

def _flush():
loop.run_until_complete(sentinel())
return _flush


@pytest.fixture
def waiter(loop):
"""Return a pair of functions for marking and waiting on an async event,
Expand Down
29 changes: 19 additions & 10 deletions tests/integ/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@ def test_connect(client, connect):
assert client.triggers['CLIENT_CONNECT'] == 1


def test_ping_pong(client, server, connect, waiter):
mark, wait = waiter()

@client.on("PING")
def handle(**kwargs):
assert kwargs == {"message": "ping-message"}
client.send("PONG")
mark()

def test_ping_pong(client, server, connect, waiter, flush):
connect()
server.write("PING :ping-message")
wait()
client.send("PONG")

# Even though the server writes the message immediately, the client
# doesn't receive the PING until a flush.
assert not client.triggers["PING"]
assert not server.received

# After the first flush the client has received the full PING. However,
# the PONG `send` requires another flush. The first flush here will
# only kick off the async _send, not clear the buffer to the server.
flush()
assert client.triggers["PING"] == 1
assert not server.received

# The client clears the outgoing buffer, and the PONG finally makes it to
# the server's protocol and is unpacked.
flush()
assert client.triggers["PING"] == 1
assert server.received == ["PONG"]
42 changes: 33 additions & 9 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ def test_send_unknown_command(active_client, protocol):
active_client.send("Unknown Command")


def test_send_before_connected(client):
def test_send_before_connected(client, flush, transport):
""" Sending before connected raises """
with pytest.raises(RuntimeError):
client.send("PONG")
client.send("PONG")
flush()
assert not transport.written


def test_disconnect_before_connected(client, schedule):
Expand All @@ -29,13 +30,18 @@ def test_disconnect_before_connected(client, schedule):
assert client.protocol is None


def test_send_after_disconnected(client, transport, schedule):
def test_send_after_disconnected(client, transport, schedule, flush):
""" Sending after disconnect does not invoke writer """
schedule(client.connect())
# Written while connected
client.send("PONG")
flush()

schedule(client.disconnect())
with pytest.raises(RuntimeError):
client.send("QUIT")
# Written while disconnected - not sent
client.send("QUIT")
flush()

assert transport.written == [b"PONG\r\n"]


Expand Down Expand Up @@ -64,7 +70,6 @@ def test_unpack_triggers_client(active_client, protocol, flush):

@active_client.on("PRIVMSG")
async def receive(nick, user, host, target, message):
print("success")
received.extend([nick, user, host, target, message])

protocol.data_received(
Expand Down Expand Up @@ -148,9 +153,9 @@ def method(self, arg, kw_default="default"):
flush()


def test_callback_ordering(client, flush, loop):
def test_callback_ordering(client, flush):
""" Callbacks for a second event don't queue behind the first event """
second_complete = asyncio.Event(loop=loop)
second_complete = asyncio.Event(loop=client.loop)
call_order = []
complete_order = []

Expand All @@ -171,3 +176,22 @@ async def second():
flush()
assert call_order == ["first", "second"]
assert complete_order == ["second", "first"]


def test_wait_ordering(client, flush):
""" Handlers are enqueued before trigger waits """
invoked = []

@client.on("some.trigger")
def handle(**kwargs):
invoked.append("handler")

async def waiter():
await client.wait("some.trigger")
invoked.append("waiter")

client.loop.create_task(waiter())
flush()
client.trigger("some.trigger")
flush()
assert invoked == ["handler", "waiter"]

0 comments on commit c9e4a13

Please sign in to comment.