Skip to content

Commit

Permalink
Merge agent-persistent-connections-3: Support for persistent HTTP/1.1…
Browse files Browse the repository at this point in the history
… connections.

Author: itamar, yasusii
Review: exarkun, jknight, therve, glyph
Fixes: #3420

Add support for persistent connections to Agent and ProxyAgent.


git-svn-id: svn://svn.twistedmatrix.com/svn/Twisted/trunk@33546 bbbe8e31-12d6-0310-92fd-ac37d47ddeeb
  • Loading branch information
itamarst committed Feb 11, 2012
1 parent 6589b0a commit 1f043a3
Show file tree
Hide file tree
Showing 6 changed files with 1,242 additions and 259 deletions.
106 changes: 106 additions & 0 deletions doc/web/howto/client.xhtml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,108 @@ if __name__ == "__main__":
context objects.
</p>

<h3>HTTP Persistent Connection</h3>

<p>
HTTP persistent connections use the same TCP connection to send and
receive multiple HTTP requests/responses. This reduces latency and TCP
connection establishment overhead.
</p>

<p>
The constructor of <code class="API">twisted.web.client.Agent</code>
takes an optional parameter pool, which should be an instance
of <code class="API"
base="twisted.web.client">HTTPConnectionPool</code>, which will be used
to manage the connections. If the pool is created with the
parameter <code>persistent</code> set to <code>True</code> (the
default), it will not close connections when the request is done, and
instead hold them in its cache to be re-used.
</p>

<p>
Here's an example which sends requests over a persistent connection:
</p>

<pre class="python">
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, HTTPConnectionPool

class IgnoreBody(Protocol):
def __init__(self, deferred):
self.deferred = deferred

def dataReceived(self, bytes):
pass

def connectionLost(self, reason):
self.deferred.callback(None)


def cbRequest(response):
print 'Response code:', response.code
finished = Deferred()
response.deliverBody(IgnoreBody(finished))
return finished

pool = HTTPConnectionPool(reactor)
agent = Agent(reactor, pool=pool)

def requestGet(url):
d = agent.request('GET', url)
d.addCallback(cbRequest)
return d

# Two requests to the same host:
d = requestGet('http://localhost:8080/foo').addCallback(
lambda ign: requestGet("http://localhost:8080/bar"))
def cbShutdown(ignored):
reactor.stop()
d.addCallback(cbShutdown)

reactor.run()
</pre>

<p>
Here, the two requests are to the same host, one after the each
other. In most cases, the same connection will be used for the second
request, instead of two different connections when using a
non-persistent pool.
</p>

<h3>Multiple Connections to the Same Server</h3>

<p>
<code class="API">twisted.web.client.HTTPConnectionPool</code> instances
have an attribute
called <code class="python">maxPersistentPerHost</code> which limits the
number of cached persistent connections to the same server. The default
value is 2. This is effective only when the <code class="API"
base="twisted.web.client.HTTPConnectionPool">persistent</code> option is
True. You can change the value like bellow:
</p>

<pre class="python">
from twisted.web.client import HTTPConnectionPool

pool = HTTPConnectionPool(reactor, persistent=True)
pool.maxPersistentPerHost = 1
</pre>

<p>
With the default value of 2, the pool keeps around two connections to
the same host at most. Eventually the cached persistent connections will
be closed, by default after 240 seconds; you can change this timeout
value with the <code class="python">cachedConnectionTimeout</code>
attribute of the pool. To force all connections to close use
the <code class="API"
base="twisted.web.client.HTTPConnectionPool">closeCachedConnections</code>
method.
</p>


<h3>Following redirects</h3>

<p>
Expand Down Expand Up @@ -456,6 +558,10 @@ if __name__ == "__main__":
<li>
How to control the streaming of the response body.
</li>
<li>
How to enable the HTTP persistent connection, and control the
number of connections.
</li>
</ul>
</body>
</html>
87 changes: 69 additions & 18 deletions twisted/web/_newclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,12 +538,16 @@ class Request:
@ivar bodyProducer: C{None} or an L{IBodyProducer} provider which
produces the content body to send to the remote HTTP server.
@ivar persistent: Set to C{True} when you use HTTP persistent connecton.
@type persistent: Boolean
"""
def __init__(self, method, uri, headers, bodyProducer):
def __init__(self, method, uri, headers, bodyProducer, persistent=False):
self.method = method
self.uri = uri
self.headers = headers
self.bodyProducer = bodyProducer
self.persistent = persistent


def _writeHeaders(self, transport, TEorCL):
Expand All @@ -557,7 +561,8 @@ def _writeHeaders(self, transport, TEorCL):
requestLines = []
requestLines.append(
'%s %s HTTP/1.1\r\n' % (self.method, self.uri))
requestLines.append('Connection: close\r\n')
if not self.persistent:
requestLines.append('Connection: close\r\n')
if TEorCL is not None:
requestLines.append(TEorCL)
for name, values in self.headers.getAllRawHeaders():
Expand Down Expand Up @@ -1216,9 +1221,26 @@ class HTTP11ClientProtocol(Protocol):
- CONNECTION_LOST: The connection has been lost.
@ivar _abortDeferreds: A list of C{Deferred} instances that will fire when
the connection is lost.
"""
_state = 'QUIESCENT'
_parser = None
_finishedRequest = None
_currentRequest = None
_transportProxy = None
_responseDeferred = None


def __init__(self, quiescentCallback=lambda c: None):
self._quiescentCallback = quiescentCallback
self._abortDeferreds = []


@property
def state(self):
return self._state


def request(self, request):
"""
Expand Down Expand Up @@ -1258,10 +1280,6 @@ def request(self, request):
def cbRequestWrotten(ignored):
if self._state == 'TRANSMITTING':
self._state = 'WAITING'
# XXX We're stuck in WAITING until we lose the connection now.
# This will be wrong when persistent connections are supported.
# See #3420 for persistent connections.

self._responseDeferred.chainDeferred(self._finishedRequest)

def ebRequestWriting(err):
Expand All @@ -1288,18 +1306,16 @@ def _finishResponse(self, rest):
the L{HTTPClientParser} which were not part of the response it
was parsing.
"""
# XXX this is because Connection: close is hard-coded above, probably
# will want to change that at some point. Either the client or the
# server can control this.

# XXX If the connection isn't being closed at this point, it's
# important to make sure the transport isn't paused (after _giveUp,
# or inside it, or something - after the parser can no longer touch
# the transport)
_finishResponse = makeStatefulDispatcher('finishResponse', _finishResponse)

# For both of the above, see #3420 for persistent connections.

if self._state == 'TRANSMITTING':
def _finishResponse_WAITING(self, rest):
# Currently the rest parameter is ignored. Don't forget to use it if
# we ever add support for pipelining. And maybe check what trailers
# mean.
if self._state == 'WAITING':
self._state = 'QUIESCENT'
else:
# The server sent the entire response before we could send the
# whole request. That sucks. Oh well. Fire the request()
# Deferred with the response. But first, make sure that if the
Expand All @@ -1308,7 +1324,31 @@ def _finishResponse(self, rest):
self._state = 'TRANSMITTING_AFTER_RECEIVING_RESPONSE'
self._responseDeferred.chainDeferred(self._finishedRequest)

self._giveUp(Failure(ConnectionDone("synthetic!")))
# This will happen if we're being called due to connection being lost;
# if so, no need to disconnect parser again, or to call
# _quiescentCallback.
if self._parser is None:
return

reason = ConnectionDone("synthetic!")
connHeaders = self._parser.connHeaders.getRawHeaders('connection', ())
if (('close' in connHeaders) or self._state != "QUIESCENT" or
not self._currentRequest.persistent):
self._giveUp(Failure(reason))
else:
# We call the quiescent callback first, to ensure connection gets
# added back to connection pool before we finish the request.
try:
self._quiescentCallback(self)
except:
# If callback throws exception, just log it and disconnect;
# keeping persistent connections around is an optimisation:
log.err()
self.transport.loseConnection()
self._disconnectParser(reason)


_finishResponse_TRANSMITTING = _finishResponse_WAITING


def _disconnectParser(self, reason):
Expand All @@ -1321,13 +1361,16 @@ def _disconnectParser(self, reason):
if self._parser is not None:
parser = self._parser
self._parser = None
self._currentRequest = None
self._finishedRequest = None
self._responseDeferred = None

# The parser is no longer allowed to do anything to the real
# transport. Stop proxying from the parser's transport to the real
# transport before telling the parser it's done so that it can't do
# anything.
self._transportProxy._stopProxying()

self._transportProxy = None
parser.connectionLost(reason)


Expand Down Expand Up @@ -1417,12 +1460,20 @@ def _connectionLost_ABORTING(self, reason):
"""
self._disconnectParser(Failure(ConnectionAborted()))
self._state = 'CONNECTION_LOST'
for d in self._abortDeferreds:
d.callback(None)
self._abortDeferreds = []


def abort(self):
"""
Close the connection and cause all outstanding L{request} L{Deferred}s
to fire with an error.
"""
if self._state == "CONNECTION_LOST":
return succeed(None)
self.transport.loseConnection()
self._state = 'ABORTING'
d = Deferred()
self._abortDeferreds.append(d)
return d
Loading

0 comments on commit 1f043a3

Please sign in to comment.