Browse files

Increase performance of IOStream.read_until and read_until_regex.

_handle_read and _try_inline_read now only call _read_from_buffer
once, after calling _read_to_buffer as many times as they can.  This
allows the progressive _double_prefix calls in _read_from_buffer to
work as efficiently as possible.

In testing with a 4MB read, performance improved by a factor of 32 (8
seconds to 0.25 seconds)
  • Loading branch information...
1 parent c86b8ce commit 41463a9851ab0bdfda2f21fdf3847673a223e1c4 @bdarnell bdarnell committed Mar 26, 2012
Showing with 66 additions and 29 deletions.
  1. +49 −29 tornado/iostream.py
  2. +17 −0 tornado/test/iostream_test.py
View
78 tornado/iostream.py
@@ -217,12 +217,16 @@ def close(self):
self._state = None
self.socket.close()
self.socket = None
- if self._close_callback and self._pending_callbacks == 0:
- # if there are pending callbacks, don't run the close callback
- # until they're done (see _maybe_add_error_handler)
- cb = self._close_callback
- self._close_callback = None
- self._run_callback(cb)
+ self._maybe_run_close_callback()
+
+ def _maybe_run_close_callback(self):
+ if (self.socket is None and self._close_callback and
+ self._pending_callbacks == 0):
+ # if there are pending callbacks, don't run the close callback
+ # until they're done (see _maybe_add_error_handler)
+ cb = self._close_callback
+ self._close_callback = None
+ self._run_callback(cb)
def reading(self):
"""Returns true if we are currently reading from the stream."""
@@ -310,22 +314,38 @@ def wrapper():
self.io_loop.add_callback(wrapper)
def _handle_read(self):
- while True:
+ try:
try:
- # Read from the socket until we get EWOULDBLOCK or equivalent.
- # SSL sockets do some internal buffering, and if the data is
- # sitting in the SSL object's buffer select() and friends
- # can't see it; the only way to find out if it's there is to
- # try to read it.
- result = self._read_to_buffer()
- except Exception:
- self.close()
- return
- if result == 0:
- break
- else:
- if self._read_from_buffer():
- return
+ # Pretend to have a pending callback so that an EOF in
+ # _read_to_buffer doesn't trigger an immediate close
+ # callback. At the end of this method we'll either
+ # estabilsh a real pending callback via
+ # _read_from_buffer or run the close callback.
+ #
+ # We need two try statements here so that
+ # pending_callbacks is decremented before the `except`
+ # clause below (which calls `close` and does need to
+ # trigger the callback)
+ self._pending_callbacks += 1
+ while True:
+ # Read from the socket until we get EWOULDBLOCK or equivalent.
+ # SSL sockets do some internal buffering, and if the data is
+ # sitting in the SSL object's buffer select() and friends
+ # can't see it; the only way to find out if it's there is to
+ # try to read it.
+ if self._read_to_buffer() == 0:
+ break
+ finally:
+ self._pending_callbacks -= 1
+ except Exception:
+ logging.warning("error on read", exc_info=True)
+ self.close()
+ return
+ if self._read_from_buffer():
+ return
+ else:
+ self._maybe_run_close_callback()
+
def _set_read_callback(self, callback):
assert not self._read_callback, "Already reading"
@@ -338,13 +358,16 @@ def _try_inline_read(self):
read callback on the next IOLoop iteration; otherwise starts
listening for reads on the socket.
"""
+ # See if we've already got the data from a previous read
+ if self._read_from_buffer():
+ return
+ self._check_closed()
while True:
- # See if we've already got the data from a previous read
- if self._read_from_buffer():
- return
- self._check_closed()
if self._read_to_buffer() == 0:
break
+ self._check_closed()
+ if self._read_from_buffer():
+ return
self._add_io_state(self.io_loop.READ)
def _read_from_socket(self):
@@ -520,10 +543,7 @@ def _check_closed(self):
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
if self.socket is None:
- cb = self._close_callback
- if cb is not None:
- self._close_callback = None
- self._run_callback(cb)
+ self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
View
17 tornado/test/iostream_test.py
@@ -216,3 +216,20 @@ def test_close_buffered_data(self):
finally:
server.close()
client.close()
+
+ def test_large_read_until(self):
+ # Performance test: read_until used to have a quadratic component
+ # so a read_until of 4MB would take 8 seconds; now it takes 0.25
+ # seconds.
+ server, client = self.make_iostream_pair()
+ try:
+ NUM_KB = 4096
+ for i in xrange(NUM_KB):
+ client.write(b("A") * 1024)
+ client.write(b("\r\n"))
+ server.read_until(b("\r\n"), self.stop)
+ data = self.wait()
+ self.assertEqual(len(data), NUM_KB * 1024 + 2)
+ finally:
+ server.close()
+ client.close()

0 comments on commit 41463a9

Please sign in to comment.