Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Keep-alives and connect_timeout bug #328

Open
wants to merge 4 commits into from

5 participants

NickNeedsAName Ben Darnell Ben Hodgson Birk Nilson Alek Storm
NickNeedsAName

Added support for keep-alive in SimpleAsyncClient, also fixed bug where Async connections would close due to connection timeout even after the connection had been established

Decided to not go with the suggested implementation of keeping a pool of _HTTPConnections as it seemed cumbersome to maintain all the state of an object that would (potentially) be overwritten every time. Decided instead on keeping a queue of streams (essentially sockets) that are reused as soon as they become available.

Streams are keyed in the stream_map with a (scheme,host,port) tuple (or some permutation, i forget)

Client defaults to keep_alive, dead sockets are cycled through if they're dead and not used, when a stream is no longer in use it drops references to the current _HTTPConnection and readies itself for the next one.

Any suggestions/problems/fixes let me know.

NickNeedsAName added some commits
NickNeedsAName NickNeedsAName Added support for keep-alive in SimpleAsyncClient, fixed bug where As…
…ync connections would close due to connection timeout even after the connection had been established
7c05e04
NickNeedsAName NickNeedsAName Made it more correct, if explicitly set to not handle keep alives, ig…
…nore all the keepalive logic
d005d29
Ben Darnell
Owner

I tried merging this and all the unit tests failed. Please make sure that the tests pass (run "python -m tornado.test.runtests"), and also add some new tests to ensure that connections are getting reused.

NickNeedsAName

Jawesome

NickNeedsAName

I just pulled all the new stuff and everything seems to be working (passed all the tests...) Could you possibly share what tests were failing and how? Going to add in new tests before I commit. Thanks for the feedback.

Ben Darnell
Owner

Here's the log from the tests:

................F[I 110828 00:06:21 testing:289] RUNNING TEST: test_chunked (tornado.test.httpclient_test.HTTPClientCommonTestCase)
[I 110828 00:06:21 web:1390] 200 GET /chunk (127.0.0.1) 0.62ms
[I 110828 00:06:21 web:1390] 200 GET /chunk (127.0.0.1) 0.32ms
[W 110828 00:06:21 simple_httpclient:320] uncaught exception
    Traceback (most recent call last):
      File "tornado/simple_httpclient.py", line 318, in cleanup
        yield
      File "tornado/stack_context.py", line 183, in wrapped
        callback(*args, **kwargs)
      File "tornado/simple_httpclient.py", line 335, in _on_headers
        assert match
    AssertionError
..F[I 110828 00:06:21 testing:289] RUNNING TEST: test_follow_redirect (tornado.test.httpclient_test.HTTPClientCommonTestCase)
[I 110828 00:06:21 web:1390] 302 GET /countdown/2 (127.0.0.1) 0.51ms
[I 110828 00:06:21 web:1390] 302 GET /countdown/2 (127.0.0.1) 0.24ms
[W 110828 00:06:21 iostream:356] Read error on 9: [Errno 54] Connection reset by peer
[E 110828 00:06:21 web:1028] Uncaught exception GET /countdown/1 (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:10006', method='GET', uri='/countdown/1', version='HTTP/1.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:10006', 'Accept-Encoding': 'gzip'})
    Traceback (most recent call last):
      File "tornado/web.py", line 985, in _execute
        getattr(self, self.request.method.lower())(*args, **kwargs)
      File "tornado/test/httpclient_test.py", line 41, in get
        self.redirect(self.reverse_url("countdown", count - 1))
      File "tornado/web.py", line 449, in redirect
        self.finish()
      File "tornado/web.py", line 672, in finish
        self.request.finish()
      File "tornado/httpserver.py", line 533, in finish
        self.connection.finish()
      File "tornado/httpserver.py", line 310, in finish
        self._finish_request()
      File "tornado/httpserver.py", line 337, in _finish_request
        self.stream.read_until(b("\r\n\r\n"), self._header_callback)
      File "tornado/iostream.py", line 153, in read_until
        if self._read_to_buffer() == 0:
      File "tornado/iostream.py", line 352, in _read_to_buffer
        chunk = self._read_from_socket()
      File "tornado/iostream.py", line 333, in _read_from_socket
        chunk = self.socket.recv(self.read_chunk_size)
    error: [Errno 54] Connection reset by peer
[E 110828 00:06:21 web:688] Cannot send error response after headers written
...................................F[I 110828 00:06:22 testing:289] RUNNING TEST: test_chunked (tornado.test.simple_httpclient_test.SimpleHTTPClientCommonTestCase)
[I 110828 00:06:22 web:1390] 200 GET /chunk (127.0.0.1) 0.43ms
[I 110828 00:06:22 web:1390] 200 GET /chunk (127.0.0.1) 0.31ms
[W 110828 00:06:22 simple_httpclient:320] uncaught exception
    Traceback (most recent call last):
      File "tornado/simple_httpclient.py", line 318, in cleanup
        yield
      File "tornado/stack_context.py", line 183, in wrapped
        callback(*args, **kwargs)
      File "tornado/simple_httpclient.py", line 335, in _on_headers
        assert match
    AssertionError
..F[I 110828 00:06:22 testing:289] RUNNING TEST: test_follow_redirect (tornado.test.simple_httpclient_test.SimpleHTTPClientCommonTestCase)
[I 110828 00:06:22 web:1390] 302 GET /countdown/2 (127.0.0.1) 0.35ms
[I 110828 00:06:22 web:1390] 302 GET /countdown/2 (127.0.0.1) 0.25ms
[W 110828 00:06:22 iostream:356] Read error on 10: [Errno 54] Connection reset by peer
[E 110828 00:06:22 web:1028] Uncaught exception GET /countdown/1 (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:10029', method='GET', uri='/countdown/1', version='HTTP/1.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:10029', 'Accept-Encoding': 'gzip'})
    Traceback (most recent call last):
      File "tornado/web.py", line 985, in _execute
        getattr(self, self.request.method.lower())(*args, **kwargs)
      File "tornado/test/httpclient_test.py", line 41, in get
        self.redirect(self.reverse_url("countdown", count - 1))
      File "tornado/web.py", line 449, in redirect
        self.finish()
      File "tornado/web.py", line 672, in finish
        self.request.finish()
      File "tornado/httpserver.py", line 533, in finish
        self.connection.finish()
      File "tornado/httpserver.py", line 310, in finish
        self._finish_request()
      File "tornado/httpserver.py", line 337, in _finish_request
        self.stream.read_until(b("\r\n\r\n"), self._header_callback)
      File "tornado/iostream.py", line 153, in read_until
        if self._read_to_buffer() == 0:
      File "tornado/iostream.py", line 352, in _read_to_buffer
        chunk = self._read_from_socket()
      File "tornado/iostream.py", line 333, in _read_from_socket
        chunk = self.socket.recv(self.read_chunk_size)
    error: [Errno 54] Connection reset by peer
[E 110828 00:06:22 web:688] Cannot send error response after headers written
....F[I 110828 00:06:22 testing:289] RUNNING TEST: test_connect_timeout (tornado.test.simple_httpclient_test.SimpleHTTPClientTestCase)
....F[I 110828 00:06:27 testing:289] RUNNING TEST: test_max_redirects (tornado.test.simple_httpclient_test.SimpleHTTPClientTestCase)
[I 110828 00:06:27 web:1390] 302 GET /countdown/5 (127.0.0.1) 0.29ms
[W 110828 00:06:27 iostream:356] Read error on 10: [Errno 54] Connection reset by peer
[E 110828 00:06:27 web:1028] Uncaught exception GET /countdown/4 (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:10040', method='GET', uri='/countdown/4', version='HTTP/1.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:10040', 'Accept-Encoding': 'gzip'})
    Traceback (most recent call last):
      File "tornado/web.py", line 985, in _execute
        getattr(self, self.request.method.lower())(*args, **kwargs)
      File "tornado/test/httpclient_test.py", line 41, in get
        self.redirect(self.reverse_url("countdown", count - 1))
      File "tornado/web.py", line 449, in redirect
        self.finish()
      File "tornado/web.py", line 672, in finish
        self.request.finish()
      File "tornado/httpserver.py", line 533, in finish
        self.connection.finish()
      File "tornado/httpserver.py", line 310, in finish
        self._finish_request()
      File "tornado/httpserver.py", line 337, in _finish_request
        self.stream.read_until(b("\r\n\r\n"), self._header_callback)
      File "tornado/iostream.py", line 153, in read_until
        if self._read_to_buffer() == 0:
      File "tornado/iostream.py", line 352, in _read_to_buffer
        chunk = self._read_from_socket()
      File "tornado/iostream.py", line 333, in _read_from_socket
        chunk = self.socket.recv(self.read_chunk_size)
    error: [Errno 54] Connection reset by peer
[E 110828 00:06:27 web:688] Cannot send error response after headers written
.F[I 110828 00:06:27 testing:289] RUNNING TEST: test_redirect_connection_limit (tornado.test.simple_httpclient_test.SimpleHTTPClientTestCase)
[I 110828 00:06:27 web:1390] 302 GET /countdown/3 (127.0.0.1) 0.27ms
[W 110828 00:06:27 iostream:356] Read error on 10: [Errno 54] Connection reset by peer
[E 110828 00:06:27 web:1028] Uncaught exception GET /countdown/2 (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:10042', method='GET', uri='/countdown/2', version='HTTP/1.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:10042', 'Accept-Encoding': 'gzip'})
    Traceback (most recent call last):
      File "tornado/web.py", line 985, in _execute
        getattr(self, self.request.method.lower())(*args, **kwargs)
      File "tornado/test/httpclient_test.py", line 41, in get
        self.redirect(self.reverse_url("countdown", count - 1))
      File "tornado/web.py", line 449, in redirect
        self.finish()
      File "tornado/web.py", line 672, in finish
        self.request.finish()
      File "tornado/httpserver.py", line 533, in finish
        self.connection.finish()
      File "tornado/httpserver.py", line 310, in finish
        self._finish_request()
      File "tornado/httpserver.py", line 337, in _finish_request
        self.stream.read_until(b("\r\n\r\n"), self._header_callback)
      File "tornado/iostream.py", line 153, in read_until
        if self._read_to_buffer() == 0:
      File "tornado/iostream.py", line 352, in _read_to_buffer
        chunk = self._read_from_socket()
      File "tornado/iostream.py", line 333, in _read_from_socket
        chunk = self.socket.recv(self.read_chunk_size)
    error: [Errno 54] Connection reset by peer
[E 110828 00:06:27 web:688] Cannot send error response after headers written
.....................................................
======================================================================
FAIL: test_chunked (tornado.test.httpclient_test.HTTPClientCommonTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/httpclient_test.py", line 104, in test_chunked
    self.assertEqual(chunks, [b("asdf"), b("qwer")])
AssertionError: Lists differ: [] != ['asdf', 'qwer']

Second list contains 2 additional elements.
First extra element 0:
asdf

- []
+ ['asdf', 'qwer']

======================================================================
FAIL: test_follow_redirect (tornado.test.httpclient_test.HTTPClientCommonTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/httpclient_test.py", line 144, in test_follow_redirect
    response = self.fetch("/countdown/2")
  File "tornado/testing.py", line 233, in fetch
    return self.wait()
  File "tornado/testing.py", line 119, in _stack_context
    yield
  File "tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "tornado/httpserver.py", line 365, in _on_headers
    self.request_callback(self._request)
  File "tornado/web.py", line 1359, in __call__
    handler._execute(transforms, *args, **kwargs)
  File "tornado/web.py", line 989, in _execute
    self._handle_request_exception(e)
  File "tornado/web.py", line 1029, in _handle_request_exception
    self.send_error(500, exc_info=sys.exc_info())
  File "tornado/web.py", line 690, in send_error
    self.finish()
  File "tornado/web.py", line 672, in finish
    self.request.finish()
  File "tornado/httpserver.py", line 533, in finish
    self.connection.finish()
  File "tornado/httpserver.py", line 307, in finish
    assert self._request, "Request closed"
AssertionError: Request closed

======================================================================
FAIL: test_chunked (tornado.test.simple_httpclient_test.SimpleHTTPClientCommonTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/httpclient_test.py", line 104, in test_chunked
    self.assertEqual(chunks, [b("asdf"), b("qwer")])
AssertionError: Lists differ: [] != ['asdf', 'qwer']

Second list contains 2 additional elements.
First extra element 0:
asdf

- []
+ ['asdf', 'qwer']

======================================================================
FAIL: test_follow_redirect (tornado.test.simple_httpclient_test.SimpleHTTPClientCommonTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/httpclient_test.py", line 144, in test_follow_redirect
    response = self.fetch("/countdown/2")
  File "tornado/testing.py", line 233, in fetch
    return self.wait()
  File "tornado/testing.py", line 119, in _stack_context
    yield
  File "tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "tornado/httpserver.py", line 365, in _on_headers
    self.request_callback(self._request)
  File "tornado/web.py", line 1359, in __call__
    handler._execute(transforms, *args, **kwargs)
  File "tornado/web.py", line 989, in _execute
    self._handle_request_exception(e)
  File "tornado/web.py", line 1029, in _handle_request_exception
    self.send_error(500, exc_info=sys.exc_info())
  File "tornado/web.py", line 690, in send_error
    self.finish()
  File "tornado/web.py", line 672, in finish
    self.request.finish()
  File "tornado/httpserver.py", line 533, in finish
    self.connection.finish()
  File "tornado/httpserver.py", line 307, in finish
    assert self._request, "Request closed"
AssertionError: Request closed

======================================================================
FAIL: test_connect_timeout (tornado.test.simple_httpclient_test.SimpleHTTPClientTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/simple_httpclient_test.py", line 151, in test_connect_timeout
    response = self.wait()
  File "tornado/testing.py", line 156, in timeout_func
    timeout)
AssertionError: Async operation timed out after 5 seconds

======================================================================
FAIL: test_max_redirects (tornado.test.simple_httpclient_test.SimpleHTTPClientTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/simple_httpclient_test.py", line 130, in test_max_redirects
    response = self.fetch("/countdown/5", max_redirects=3)
  File "tornado/testing.py", line 233, in fetch
    return self.wait()
  File "tornado/testing.py", line 119, in _stack_context
    yield
  File "tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "tornado/httpserver.py", line 365, in _on_headers
    self.request_callback(self._request)
  File "tornado/web.py", line 1359, in __call__
    handler._execute(transforms, *args, **kwargs)
  File "tornado/web.py", line 989, in _execute
    self._handle_request_exception(e)
  File "tornado/web.py", line 1029, in _handle_request_exception
    self.send_error(500, exc_info=sys.exc_info())
  File "tornado/web.py", line 690, in send_error
    self.finish()
  File "tornado/web.py", line 672, in finish
    self.request.finish()
  File "tornado/httpserver.py", line 533, in finish
    self.connection.finish()
  File "tornado/httpserver.py", line 307, in finish
    assert self._request, "Request closed"
AssertionError: Request closed

======================================================================
FAIL: test_redirect_connection_limit (tornado.test.simple_httpclient_test.SimpleHTTPClientTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "tornado/test/simple_httpclient_test.py", line 109, in test_redirect_connection_limit
    response = self.wait()
  File "tornado/testing.py", line 119, in _stack_context
    yield
  File "tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "tornado/httpserver.py", line 365, in _on_headers
    self.request_callback(self._request)
  File "tornado/web.py", line 1359, in __call__
    handler._execute(transforms, *args, **kwargs)
  File "tornado/web.py", line 989, in _execute
    self._handle_request_exception(e)
  File "tornado/web.py", line 1029, in _handle_request_exception
    self.send_error(500, exc_info=sys.exc_info())
  File "tornado/web.py", line 690, in send_error
    self.finish()
  File "tornado/web.py", line 672, in finish
    self.request.finish()
  File "tornado/httpserver.py", line 533, in finish
    self.connection.finish()
  File "tornado/httpserver.py", line 307, in finish
    assert self._request, "Request closed"
AssertionError: Request closed

----------------------------------------------------------------------
Ran 124 tests in 5.475s

FAILED (failures=7)
[E 110828 00:06:27 testing:353] FAIL
NickNeedsAName

Perfect, thanks

Ben Hodgson

How’s progress, NickNeedsAName? This looks like a great feature and you’ve clearly put a lot of effort in. Would be a shame to see this die.

NickNeedsAName
NickNeedsAName

Just an update -- I pulled the most recent code and now the tests pass? O.o

NickNeedsAName

So, I figured out what the issue with the redirects was, _on_body I was immediately re-allocating the stream if we were keeping things alive, now waiting to that until after the redirect logic. All the tests are passing, but what's the best way to test that keep-alives are successfully implemented? I'm currently logging the time the requests take and stuff, but that's somewhat spotty and not always accurate? I'm also logging if we're using a new stream or the same stream, but then I'd have to add a whole bunch of other logic to pass back whether or not the client is using a new stream? Proobabbblllly not the best way.

Here's the output I'm getting (pinging the '/hello' url in the test cases)

FIRST
0.00143885612488
Subsequent
Using old stream!
0.000962972640991
Using old stream!
0.000978946685791
Using old stream!
0.000932216644287
Using old stream!
0.00092601776123
Using old stream!
0.000967025756836
Using old stream!
0.000934839248657
Using old stream!
0.00092601776123
Using old stream!

Which i'd say is pretty satisfactory for proving that keep-alives are actually working properly, but again, what is there to be actually testing against?

Birk Nilson

I decided to extend the existing code in hope of adding the required pieces needed in order to reach a release candidate of this patch. All the available tests in the existing suite are executed successfully.
https://github.com/birknilson/tornado/tree/simple-async-client-keep-alive

However, I do not consider this to be mature enough yet. There are some things missing which I feel are best discussed with those willing to participate before proceeding with the patch.

On top of the existing patch I have:

  • Ensured keep-alive is utilized per default unless either the client or server explicitly disables it by setting the connection-token to close.
  • Reduced the complexity of _HTTPConnection.__init__ by introducing _HTTPConnection._init_stream
  • Avoid KeyError in _on_body by guaranteeing the presence of a stream Queue if requested.
  • On finished streams call the task_done() method for the corresponding stream queue item.
  • Safe stream.close alternative which prevents premature termination of streams intended to be persisted across requests
  • Garbage collect stream mapping items when their entire queue of streams have been closed
  • Ensured the process test package does not utilize keep-alive

What I am considering now and which I hope to receive feedback on is the best way of testing this functionality. I would need to verify that the connection count is the same on the client as the server where the count differs depending on the amount of requests and whether keep-alive is utilized or not. Currently, there is no way to easily retrieve connection count from either the server or the client to my knowledge, but it could be added. Otherwise, I could check this against the IOLoop directly. Any thoughts on this?

The second thing is regarding the close callback on client connections. Currently, this is set to None in case keep-alive is utilized which I am unsure whether it is a good idea or not. In case of keep-alive I would suggest it be moved to the termination of the entire queue and otherwise remain as is, i.e per termination of a single stream.

Also general feedback on the extension of this patch is more than welcome.

Alek Storm

Haven't read the patch in detail yet, but in the SPDY fork, I've added a force_connection attribute/parameter to HTTPRequest, indicating that the request must use a new connection. You may want to add something similar here.

NickNeedsAName

Excited that other people see this as worthwhile!!!!

I also am at a loss regarding (have been at a loss, but honestly haven't put much time into thinking about) how to test that this is doing what it claims to do. Your suggestions make sense.

Another interesting thing is that this actually has the potential for massive memory problems:

Assume that there exists a sort of steady-state between the client and a server where the is using 5 connections constantly to communicate with the server. Then something happens to the server and the new steady state is 1000 connections (remote server becomes slow/sluggish/unresponsive). Then things go back to normal, there are still 1000 connections in the Queue and they'll never be closed because you'll probably cycle through all 1000 over the course of a few seconds, causing them to remain open...etc....

This is something I'm currently thinking about.

Birk Nilson

@alekstorm: I can definitely add support for force_connection. If you have the chance to read through the patch entirely it would be great to get some feedback/input.

I hope to be able to spend a day on this during the week-end to - hopefully - close this pull request. I will focus on writing an exhaustive suite of tests covering this behavior. I have also thought about those issues that you are mentioning @NickNeedsAName.

I believe the best solution is to improve the handling of the scenario when existing streams are performing I/O. Currently, we then initialize a new connection/stream entirely and add it to the stream queue. We could wait instead by queuing the request once more and have a timeout for how long this behavior is allowed before we raise an exception. Along with some sanity precautions which will ensure only a few attempts/connections are setup against each endpoint (scheme, host, port).

But there is definitely work to be done here in order to get this running smoothly. Hopefully, I can reduce the list of things needed this week-end so that we can reach a solution to this quickly.

NickNeedsAName

@birknilson what I'm currently implementing//trying is instead of just having a FIFO queue to use a LIFO queue. Then on every request we can check to see if this stream_key has a garbage collecting timeout set, if it does do nothing, if it doesn't, put one in. Every time we use a stream I'm going to update it's "last_used" time, so on garbage collection we basically pull everything out of the Queue, find all the things that haven't been used in a while, and put it all back in the right order. Since tornado is single threaded there shouldn't be issues with trying to pull things from the queue that aren't there etc, and anything that's not in the Queue because it's in use...well...that's also fine because it's in use! The LIFO queue fixes the steady state issue from 5->1000->5 again because then only the front 5 in the Queue will be used.

Birk Nilson

@NickNeedsAName Ok great. Since you are already working on a patch for that behavior I could focus entirely on writing the necessary tests for this pull request. Along with integrating the force_connection support as @alekstorm suggested. Also there are some minor changes which I suggest:

  • Support importation of Queue in Python 2.6+ & Python 3 - in Python 3 it has been renamed to queue
  • Renomve _get_promised_stream_queue and utilize setdefault instead (feedback from @alekstorm)

I can fork your repository and submit my existing changes as a pull request to it and do the same later with the tests & minor changes mentioned above.

Alek Storm

Since I can't add comments inline outside of a pull request or specific commit, I've collected some here:

  • You check _connect_timeout in _on_connect, but you never set it to anything but None. Did you mean to set it on line 263?
  • Queue.Queue is meant for delegating tasks to worker threads, which doesn't apply here. Use collections.deque for a generic queue.
  • What's the difference between the keep_alive and keep_alive_agreement fields?
  • Calling _close_stream in cleanup could mean that future requests pick up in the middle of the previous one, which would confuse the server. Streams should be closed unconditionally if there's an error.
  • Using (host, port, scheme) isn't enough - streams need to be keyed on everything referenced in _init_stream. In my SPDY fork, I use (scheme, netloc, port, validate_cert, ca_certs, allow_ipv6, client_key, client_cert).

I'll look at the stream logic more closely tomorrow.

Ben Darnell bdarnell added the httpclient label
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 4, 2011
  1. NickNeedsAName

    Added support for keep-alive in SimpleAsyncClient, fixed bug where As…

    NickNeedsAName authored
    …ync connections would close due to connection timeout even after the connection had been established
  2. NickNeedsAName
Commits on Aug 5, 2011
  1. NickNeedsAName
Commits on Aug 18, 2011
  1. NickNeedsAName
This page is out of date. Refresh to see the latest.
Showing with 38 additions and 2 deletions.
  1. +38 −2 tornado/simple_httpclient.py
40 tornado/simple_httpclient.py
View
@@ -16,6 +16,7 @@
import functools
import logging
import os.path
+import Queue
import re
import socket
import time
@@ -89,6 +90,7 @@ def initialize(self, io_loop=None, max_clients=10,
self.active = {}
self.hostname_mapping = hostname_mapping
self.max_buffer_size = max_buffer_size
+ self.stream_map = {}
def fetch(self, request, callback, **kwargs):
if not isinstance(request, HTTPRequest):
@@ -136,6 +138,11 @@ def __init__(self, io_loop, client, request, callback, max_buffer_size):
self._decompressor = None
# Timeout handle returned by IOLoop.add_timeout
self._timeout = None
+ self._connect_timeout = None
+ if self.request.headers.get('Connection') == 'close':
+ self.keep_alive = False
+ else:
+ self.keep_alive = True
with stack_context.StackContext(self.cleanup):
parsed = urlparse.urlsplit(_unicode(self.request.url))
if ssl is None and parsed.scheme == "https":
@@ -168,6 +175,24 @@ def __init__(self, io_loop, client, request, callback, max_buffer_size):
# We only try the first IP we get from getaddrinfo,
# so restrict to ipv4 by default.
af = socket.AF_INET
+ # Ignore keep_alive logic if explicitly requesting non-presistent connections
+ if self.keep_alive:
+ self.stream_key = (host, port, parsed.scheme)
+ if self.client.stream_map.has_key(self.stream_key):
+ while not self.client.stream_map[self.stream_key].empty():
+ self.stream = self.client.stream_map[self.stream_key].get_nowait()
+ # Ditch closed streams and get a new one
+ if self.stream.closed():
+ continue
+ # Double check the stream isn't in use
+ # Don't put back in the queue because if it's in use whoever's using it will,
+ # or if it's closed it shouldn't be there
+ if not (self.stream.reading() or self.stream.writing()):
+ self.stream.set_close_callback(self._on_close)
+ self._on_connect(parsed)
+ return
+ else:
+ self.client.stream_map[self.stream_key] = Queue.Queue()
addrinfo = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM,
0, 0)
@@ -209,8 +234,11 @@ def _on_timeout(self):
self.stream.close()
def _on_connect(self, parsed):
+ if self._connect_timeout is not None:
+ self.io_loop.remove_timeout(self._connect_timeout)
+ self._connect_timeout = None
if self._timeout is not None:
- self.io_loop.remove_callback(self._timeout)
+ self.io_loop.remove_timeout(self._timeout)
self._timeout = None
if self.request.request_timeout:
self._timeout = self.io_loop.add_timeout(
@@ -295,6 +323,10 @@ def _on_headers(self, data):
assert match
self.code = int(match.group(1))
self.headers = HTTPHeaders.parse(header_data)
+ if self.headers.get('Connection') == 'keep-alive':
+ self.keep_alive = True
+ elif self.headers.get('Connection') == 'close':
+ self.keep_alive = False
if self.request.header_callback is not None:
for k, v in self.headers.get_all():
self.request.header_callback("%s: %s\r\n" % (k, v))
@@ -322,6 +354,9 @@ def _on_headers(self, data):
self.stream.read_until_close(self._on_body)
def _on_body(self, data):
+ if self.keep_alive:
+ self.stream._close_callback = None
+ self.client.stream_map[self.stream_key].put_nowait(self.stream)
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
@@ -355,7 +390,8 @@ def _on_body(self, data):
buffer=buffer,
effective_url=self.request.url)
self._run_callback(response)
- self.stream.close()
+ if not self.keep_alive:
+ self.stream.close()
def _on_chunk_length(self, data):
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
Something went wrong with that request. Please try again.