Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
"pipes, sockets and character devices")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._buffer = []
self._buffer = bytearray()
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.

Expand Down Expand Up @@ -475,7 +475,7 @@ def __repr__(self):
return '<%s>' % ' '.join(info)

def get_write_buffer_size(self):
return sum(len(data) for data in self._buffer)
return len(self._buffer)

def _read_ready(self):
# Pipe was closed by peer.
Expand Down Expand Up @@ -513,39 +513,37 @@ def write(self, data):
if n == len(data):
return
elif n > 0:
data = data[n:]
data = memoryview(data)[n:]
self._loop.add_writer(self._fileno, self._write_ready)

self._buffer.append(data)
self._buffer += data

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use .extend() seems it is faster

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad knowhow: operator is faster than method, thanks to slot.

In [2]: def pluseq():
   ...:     buf = bytearray()
   ...:     for _ in range(1000):
   ...:         buf += b'foo'
   ...:         

In [3]: def extend():
   ...:     buf = bytearray()
   ...:     for _ in range(1000):
   ...:         buf.extend(b'foo')
   ...:         

In [4]: %timeit pluseq()
10000 loops, best of 3: 80.2 µs per loop

In [5]: %timeit extend()
10000 loops, best of 3: 135 µs per loop

Copy link
Member Author

@methane methane Jul 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And more important: bytearray.extend() accepts sequence of integers [0, 255).

In [6]: buf = bytearray()
In [7]: buf += [1,2,3]
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-7-d99af3f0cf4a> in <module>()
----> 1 buf += [1,2,3]

TypeError: can't concat list to bytearray
In [8]: buf.extend([1,2,3])
In [9]: buf
Out[9]: bytearray(b'\x01\x02\x03')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In [6]: buf = bytearray()
In [7]: buf += [1,2,3]

This should not be allowed. Supporting this would make life for other loop implementations harder with no obvious benefits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1st1 Yes, it raises TypeError, as you can see above.

self._maybe_pause_protocol()

def _write_ready(self):
data = b''.join(self._buffer)
assert data, 'Data should not be empty'
assert self._buffer, 'Data should not be empty'

self._buffer.clear()
try:
n = os.write(self._fileno, data)
n = os.write(self._fileno, self._buffer)
except (BlockingIOError, InterruptedError):
self._buffer.append(data)
pass
except Exception as exc:
self._buffer.clear()
self._conn_lost += 1
# Remove writer here, _fatal_error() doesn't it
# because _buffer is empty.
self._loop.remove_writer(self._fileno)
self._fatal_error(exc, 'Fatal write error on pipe transport')
else:
if n == len(data):
if n == len(self._buffer):
self._buffer.clear()
self._loop.remove_writer(self._fileno)
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer and self._closing:
if self._closing:
self._loop.remove_reader(self._fileno)
self._call_connection_lost(None)
return
elif n > 0:
data = data[n:]

self._buffer.append(data) # Try again later.
del self._buffer[:n]

def can_write_eof(self):
return True
Expand Down
49 changes: 22 additions & 27 deletions tests/test_unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,43 +518,42 @@ def test_write(self, m_write):
tr.write(b'data')
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
self.assertEqual(bytearray(), tr._buffer)

@mock.patch('os.write')
def test_write_no_data(self, m_write):
tr = self.write_pipe_transport()
tr.write(b'')
self.assertFalse(m_write.called)
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
self.assertEqual(bytearray(b''), tr._buffer)

@mock.patch('os.write')
def test_write_partial(self, m_write):
tr = self.write_pipe_transport()
m_write.return_value = 2
tr.write(b'data')
m_write.assert_called_with(5, b'data')
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'ta'], tr._buffer)
self.assertEqual(bytearray(b'ta'), tr._buffer)

@mock.patch('os.write')
def test_write_buffer(self, m_write):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'previous']
tr._buffer = bytearray(b'previous')
tr.write(b'data')
self.assertFalse(m_write.called)
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'previous', b'data'], tr._buffer)
self.assertEqual(bytearray(b'previousdata'), tr._buffer)

@mock.patch('os.write')
def test_write_again(self, m_write):
tr = self.write_pipe_transport()
m_write.side_effect = BlockingIOError()
tr.write(b'data')
m_write.assert_called_with(5, b'data')
m_write.assert_called_with(5, bytearray(b'data'))
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
self.assertEqual(bytearray(b'data'), tr._buffer)

@mock.patch('asyncio.unix_events.logger')
@mock.patch('os.write')
Expand All @@ -566,7 +565,7 @@ def test_write_err(self, m_write, m_log):
tr.write(b'data')
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
self.assertEqual(bytearray(), tr._buffer)
tr._fatal_error.assert_called_with(
err,
'Fatal write error on pipe transport')
Expand Down Expand Up @@ -606,58 +605,55 @@ def test__read_ready(self):
def test__write_ready(self, m_write):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta']
tr._buffer = bytearray(b'data')
m_write.return_value = 4
tr._write_ready()
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
self.assertEqual(bytearray(), tr._buffer)

@mock.patch('os.write')
def test__write_ready_partial(self, m_write):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta']
tr._buffer = bytearray(b'data')
m_write.return_value = 3
tr._write_ready()
m_write.assert_called_with(5, b'data')
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'a'], tr._buffer)
self.assertEqual(bytearray(b'a'), tr._buffer)

@mock.patch('os.write')
def test__write_ready_again(self, m_write):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta']
tr._buffer = bytearray(b'data')
m_write.side_effect = BlockingIOError()
tr._write_ready()
m_write.assert_called_with(5, b'data')
m_write.assert_called_with(5, bytearray(b'data'))
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
self.assertEqual(bytearray(b'data'), tr._buffer)

@mock.patch('os.write')
def test__write_ready_empty(self, m_write):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta']
tr._buffer = bytearray(b'data')
m_write.return_value = 0
tr._write_ready()
m_write.assert_called_with(5, b'data')
m_write.assert_called_with(5, bytearray(b'data'))
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
self.assertEqual(bytearray(b'data'), tr._buffer)

@mock.patch('asyncio.log.logger.error')
@mock.patch('os.write')
def test__write_ready_err(self, m_write, m_logexc):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta']
tr._buffer = bytearray(b'data')
m_write.side_effect = err = OSError()
tr._write_ready()
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers)
self.assertFalse(self.loop.readers)
self.assertEqual([], tr._buffer)
self.assertEqual(bytearray(), tr._buffer)
self.assertTrue(tr.is_closing())
m_logexc.assert_called_with(
test_utils.MockPattern(
Expand All @@ -673,13 +669,12 @@ def test__write_ready_closing(self, m_write):
tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready)
tr._closing = True
tr._buffer = [b'da', b'ta']
tr._buffer = bytearray(b'data')
m_write.return_value = 4
tr._write_ready()
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers)
self.assertFalse(self.loop.readers)
self.assertEqual([], tr._buffer)
self.assertEqual(bytearray(), tr._buffer)
self.protocol.connection_lost.assert_called_with(None)
self.pipe.close.assert_called_with()

Expand Down