Permalink
Browse files

Avoid merging the entire read buffer for IOStream.read_until.

Among other things, this dramatically speeds up downloads of large
chunked files over a fast network with SimpleHTTPClient.

Fixes #425.
  • Loading branch information...
1 parent 0b31d8d commit 7f5d4de759e0c5050e506b4c3f55a46e5df1a102 @bdarnell bdarnell committed Dec 31, 2011
Showing with 77 additions and 2 deletions.
  1. +47 −0 demos/benchmark/chunk_benchmark.py
  2. +30 −2 tornado/iostream.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+#
+# Downloads a large file in chunked encoding with both curl and simple clients
+
+import logging
+from tornado.curl_httpclient import CurlAsyncHTTPClient
+from tornado.simple_httpclient import SimpleAsyncHTTPClient
+from tornado.ioloop import IOLoop
+from tornado.options import define, options, parse_command_line
+from tornado.web import RequestHandler, Application
+
+define('port', default=8888)
+define('num_chunks', default=1000)
+define('chunk_size', default=2048)
+
+class ChunkHandler(RequestHandler):
+ def get(self):
+ for i in xrange(options.num_chunks):
+ self.write('A' * options.chunk_size)
+ self.flush()
+ self.finish()
+
+def main():
+ parse_command_line()
+ app = Application([('/', ChunkHandler)])
+ app.listen(options.port, address='127.0.0.1')
+ def callback(response):
+ response.rethrow()
+ assert len(response.body) == (options.num_chunks * options.chunk_size)
+ logging.warning("fetch completed in %s seconds", response.request_time)
+ IOLoop.instance().stop()
+
+ logging.warning("Starting fetch with curl client")
+ curl_client = CurlAsyncHTTPClient()
+ curl_client.fetch('http://localhost:%d/' % options.port,
+ callback=callback)
+ IOLoop.instance().start()
+
+ logging.warning("Starting fetch with simple client")
+ simple_client = SimpleAsyncHTTPClient()
+ simple_client.fetch('http://localhost:%d/' % options.port,
+ callback=callback)
+ IOLoop.instance().start()
+
+
+if __name__ == '__main__':
+ main()
View
@@ -412,8 +412,25 @@ def _read_from_buffer(self):
self._run_callback(callback, self._consume(num_bytes))
return True
elif self._read_delimiter is not None:
- _merge_prefix(self._read_buffer, sys.maxint)
- loc = self._read_buffer[0].find(self._read_delimiter)
+ # Multi-byte delimiters (e.g. '\r\n') may straddle two
+ # chunks in the read buffer, so we can't easily find them
+ # without collapsing the buffer. However, since protocols
+ # using delimited reads (as opposed to reads of a known
+ # length) tend to be "line" oriented, the delimiter is likely
+ # to be in the first few chunks. Merge the buffer gradually
+ # since large merges are relatively expensive and get undone in
+ # consume().
+ loc = -1
+ if self._read_buffer:
+ loc = self._read_buffer[0].find(self._read_delimiter)
+ while loc == -1 and len(self._read_buffer) > 1:
+ # Grow by doubling, but don't split the second chunk just
+ # because the first one is small.
+ new_len = max(len(self._read_buffer[0]) * 2,
+ (len(self._read_buffer[0]) +
+ len(self._read_buffer[1])))
+ _merge_prefix(self._read_buffer, new_len)
+ loc = self._read_buffer[0].find(self._read_delimiter)
if loc != -1:
callback = self._read_callback
delimiter_len = len(self._read_delimiter)
@@ -424,6 +441,17 @@ def _read_from_buffer(self):
self._consume(loc + delimiter_len))
return True
elif self._read_regex is not None:
+ m = None
+ if self._read_buffer:
+ m = self._read_regex.search(self._read_buffer[0])
+ while m is None and len(self._read_buffer) > 1:
+ # Grow by doubling, but don't split the second chunk just
+ # because the first one is small.
+ new_len = max(len(self._read_buffer[0]) * 2,
+ (len(self._read_buffer[0]) +
+ len(self._read_buffer[1])))
+ _merge_prefix(self._read_buffer, new_len)
+ m = self._read_regex.search(self._read_buffer[0])
_merge_prefix(self._read_buffer, sys.maxint)
m = self._read_regex.search(self._read_buffer[0])
if m:

0 comments on commit 7f5d4de

Please sign in to comment.