Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Don't select for read on character devices in _UnixWritePipeTransport #374

Merged
merged 3 commits into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,10 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
self._pipe = pipe
self._fileno = pipe.fileno()
mode = os.fstat(self._fileno).st_mode
is_char = stat.S_ISCHR(mode)
is_fifo = stat.S_ISFIFO(mode)
is_socket = stat.S_ISSOCK(mode)
if not (is_socket or
stat.S_ISFIFO(mode) or
stat.S_ISCHR(mode)):
if not (is_char or is_fifo or is_socket):
raise ValueError("Pipe transport is only for "
"pipes, sockets and character devices")
_set_nonblocking(self._fileno)
Expand All @@ -439,7 +439,7 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
# On AIX, the reader trick (to be notified when the read end of the
# socket is closed) only works for sockets. On other platforms it
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or not sys.platform.startswith("aix"):
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._fileno, self._read_ready)
Expand Down
75 changes: 75 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from unittest import mock
import weakref

if sys.platform != 'win32':
import tty

import asyncio
from asyncio import proactor_events
Expand Down Expand Up @@ -1547,6 +1549,79 @@ def reader(data):
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)

@unittest.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
# select, poll and kqueue don't support character devices (PTY) on Mac OS X
# older than 10.6 (Snow Leopard)
@support.requires_mac_ver(10, 6)
def test_bidirectional_pty(self):
master, read_slave = os.openpty()
write_slave = os.dup(read_slave)
tty.setraw(read_slave)

slave_read_obj = io.open(read_slave, 'rb', 0)
read_proto = MyReadPipeProto(loop=self.loop)
read_connect = self.loop.connect_read_pipe(lambda: read_proto,
slave_read_obj)
read_transport, p = self.loop.run_until_complete(read_connect)
self.assertIs(p, read_proto)
self.assertIs(read_transport, read_proto.transport)
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
self.assertEqual(0, read_proto.nbytes)


slave_write_obj = io.open(write_slave, 'wb', 0)
write_proto = MyWritePipeProto(loop=self.loop)
write_connect = self.loop.connect_write_pipe(lambda: write_proto,
slave_write_obj)
write_transport, p = self.loop.run_until_complete(write_connect)
self.assertIs(p, write_proto)
self.assertIs(write_transport, write_proto.transport)
self.assertEqual('CONNECTED', write_proto.state)

data = bytearray()
def reader(data):
chunk = os.read(master, 1024)
data += chunk
return len(data)

write_transport.write(b'1')
test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
self.assertEqual(b'1', data)
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
self.assertEqual('CONNECTED', write_proto.state)

os.write(master, b'a')
test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
timeout=10)
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
self.assertEqual(1, read_proto.nbytes)
self.assertEqual('CONNECTED', write_proto.state)

write_transport.write(b'2345')
test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
self.assertEqual(b'12345', data)
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
self.assertEqual('CONNECTED', write_proto.state)

os.write(master, b'bcde')
test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
timeout=10)
self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
self.assertEqual(5, read_proto.nbytes)
self.assertEqual('CONNECTED', write_proto.state)

os.close(master)

read_transport.close()
self.loop.run_until_complete(read_proto.done)
self.assertEqual(
['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)

write_transport.close()
self.loop.run_until_complete(write_proto.done)
self.assertEqual('CLOSED', write_proto.state)

def test_prompt_cancellation(self):
r, w = test_utils.socketpair()
r.setblocking(False)
Expand Down