319 changes: 243 additions & 76 deletions twisted/web/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
from zope.interface import implements

from twisted.python import log
from twisted.python.failure import Failure
from twisted.web import http
from twisted.internet import defer, protocol, task, reactor
from twisted.internet.interfaces import IProtocol
from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint
from twisted.python import failure
from twisted.python.util import InsensitiveDict
from twisted.python.components import proxyForInterface
Expand Down Expand Up @@ -522,6 +524,23 @@ def pageEnd(self):



class _URL(tuple):
"""
A parsed URL.
At some point this should be replaced with a better URL implementation.
"""
def __new__(self, scheme, host, port, path):
return tuple.__new__(_URL, (scheme, host, port, path))


def __init__(self, scheme, host, port, path):
self.scheme = scheme
self.host = host
self.port = port
self.path = path


def _parse(url, defaultPort=None):
"""
Split the given URL into the scheme, host, port, and path.
Expand Down Expand Up @@ -558,7 +577,7 @@ def _parse(url, defaultPort=None):
if path == '':
path = '/'

return scheme, host, port, path
return _URL(scheme, host, port, path)


def _makeGetterFactory(url, factoryFactory, contextFactory=None,
Expand Down Expand Up @@ -625,10 +644,9 @@ def downloadPage(url, file, contextFactory=None, *args, **kwargs):
# should be significantly better than anything above, though it is not yet
# feature equivalent.

from twisted.internet.protocol import ClientCreator
from twisted.web.error import SchemeNotSupported
from twisted.web._newclient import ResponseDone, Request, HTTP11ClientProtocol
from twisted.web._newclient import Response, ResponseFailed
from twisted.web._newclient import Request, Response, HTTP11ClientProtocol
from twisted.web._newclient import ResponseDone, ResponseFailed

try:
from twisted.internet.ssl import ClientContextFactory
Expand Down Expand Up @@ -791,36 +809,168 @@ def resumeProducing(self):



class _AgentMixin(object):
class _HTTP11ClientFactory(protocol.Factory):
"""
Base class offering facilities for L{Agent}-type classes.
A factory for L{HTTP11ClientProtocol}, used by L{HTTPConnectionPool}.
@ivar _quiescentCallback: The quiescent callback to be passed to protocol
instances, used to return them to the connection pool.
@since: 11.1
"""
def __init__(self, quiescentCallback):
self._quiescentCallback = quiescentCallback

def buildProtocol(self, addr):
return HTTP11ClientProtocol(self._quiescentCallback)



class HTTPConnectionPool(object):
"""
A pool of persistent HTTP connections.
Features:
- Cached connections will eventually time out.
- Limits on maximum number of persistent connections.
Connections are stored using keys, which should be chosen such that any
connections stored under a given key can be used interchangeably.
@ivar persistent: Boolean indicating whether connections should be
persistent.
@ivar maxPersistentPerHost: The maximum number of cached persistent
connections for a C{host:port} destination.
@type maxPersistentPerHost: C{int}
@ivar cachedConnectionTimeout: Number of seconds a cached persistent
connection will stay open before disconnecting.
@ivar _factory: The factory used to connect to the proxy.
@ivar _connections: Map (scheme, host, port) to lists of
L{HTTP11ClientProtocol} instances.
@ivar _timeouts: Map L{HTTP11ClientProtocol} instances to a C{IDelayedCall}
instance of their timeout.
@since: 12.1
"""

_factory = _HTTP11ClientFactory
maxPersistentPerHost = 2
cachedConnectionTimeout = 240

def __init__(self, reactor, persistent=True):
self._reactor = reactor
self.persistent = persistent
self._connections = {}
self._timeouts = {}

def _connectAndRequest(self, method, uri, headers, bodyProducer,
requestPath=None):

def getConnection(self, key, endpoint):
"""
Internal helper to make the request.
Retrieve a connection, either new or cached, to be used for a HTTP
request.
If a cached connection is returned, it will not be used for other
requests until it is put back (which will happen automatically), since
we do not support pipelined requests. If no cached connection is
available, the passed in endpoint is used to create the connection.
If the connection doesn't disconnect at the end of its request, it
will be returned to this pool automatically. As such, only a single
request should be sent using the returned connection.
@param requestPath: If specified, the path to use for the request
instead of the path extracted from C{uri}.
@return: A C{Deferred} that will fire with L{HTTP11ClientProtocol}
that can be used to send a single HTTP request.
"""
scheme, host, port, path = _parse(uri)
if requestPath is None:
requestPath = path
d = self._connect(scheme, host, port)
if headers is None:
headers = Headers()
if not headers.hasHeader('host'):
headers = headers.copy()
headers.addRawHeader(
'host', self._computeHostValue(scheme, host, port))
def cbConnected(proto):
return proto.request(
Request(method, requestPath, headers, bodyProducer))
d.addCallback(cbConnected)
return d
# Try to get cached version:
connections = self._connections.get(key)
while connections:
connection = connections.pop(0)
# Cancel timeout:
self._timeouts[connection].cancel()
del self._timeouts[connection]
if connection.state == "QUIESCENT":
return defer.succeed(connection)

def quiescentCallback(protocol):
self._putConnection(key, protocol)
factory = self._factory(quiescentCallback)
return endpoint.connect(factory)


def _removeConnection(self, key, connection):
"""
Remove a connection from the cache and disconnect it.
"""
connection.transport.loseConnection()
self._connections[key].remove(connection)
del self._timeouts[connection]


def _putConnection(self, key, connection):
"""
Return a persistent connection to the pool. This will be called by
L{HTTP11ClientProtocol} when the connection becomes quiescent.
"""
if connection.state != "QUIESCENT":
# Log with traceback for debugging purposes:
try:
raise RuntimeError(
"BUG: Non-quiescent protocol added to connection pool.")
except:
log.err()
return
connections = self._connections.setdefault(key, [])
if len(connections) == self.maxPersistentPerHost:
dropped = connections.pop(0)
dropped.transport.loseConnection()
self._timeouts[dropped].cancel()
del self._timeouts[dropped]
connections.append(connection)
cid = self._reactor.callLater(self.cachedConnectionTimeout,
self._removeConnection,
key, connection)
self._timeouts[connection] = cid


def closeCachedConnections(self):
"""
Close all persistent connections and remove them from the pool.
@return: L{defer.Deferred} that fires when all connections have been
closed.
"""
results = []
for protocols in self._connections.itervalues():
for p in protocols:
results.append(p.abort())
self._connections = {}
for dc in self._timeouts.values():
dc.cancel()
self._timeouts = {}
return defer.gatherResults(results).addCallback(lambda ign: None)



class _AgentBase(object):
"""
Base class offering common facilities for L{Agent}-type classes.
@ivar _reactor: The C{IReactorTime} implementation which will be used by
the pool, and perhaps by subclasses as well.
@ivar _pool: The L{HTTPConnectionPool} used to manage HTTP connections.
"""

def __init__(self, reactor, pool):
if pool is None:
pool = HTTPConnectionPool(reactor, False)
self._reactor = reactor
self._pool = pool


def _computeHostValue(self, scheme, host, port):
Expand All @@ -833,15 +983,38 @@ def _computeHostValue(self, scheme, host, port):
return '%s:%d' % (host, port)


def _requestWithEndpoint(self, key, endpoint, method, parsedURI,
headers, bodyProducer, requestPath):
"""
Issue a new request, given the endpoint and the path sent as part of
the request.
"""
# Create minimal headers, if necessary:
if headers is None:
headers = Headers()
if not headers.hasHeader('host'):
headers = headers.copy()
headers.addRawHeader(
'host', self._computeHostValue(parsedURI.scheme, parsedURI.host,
parsedURI.port))

class Agent(_AgentMixin):
d = self._pool.getConnection(key, endpoint)
def cbConnected(proto):
return proto.request(
Request(method, requestPath, headers, bodyProducer,
persistent=self._pool.persistent))
d.addCallback(cbConnected)
return d



class Agent(_AgentBase):
"""
L{Agent} is a very basic HTTP client. It supports I{HTTP} and I{HTTPS}
scheme URIs (but performs no certificate checking by default). It does not
support persistent connections.
scheme URIs (but performs no certificate checking by default).
@ivar _reactor: The L{IReactorTCP} and L{IReactorSSL} implementation which
will be used to set up connections over which to issue requests.
@param pool: A L{HTTPConnectionPool} instance, or C{None}, in which case a
non-persistent L{HTTPConnectionPool} instance will be created.
@ivar _contextFactory: A web context factory which will be used to create
SSL context objects for any SSL connections the agent needs to make.
Expand All @@ -854,11 +1027,11 @@ class Agent(_AgentMixin):
@since: 9.0
"""
_protocol = HTTP11ClientProtocol

def __init__(self, reactor, contextFactory=WebClientContextFactory(),
connectTimeout=None, bindAddress=None):
self._reactor = reactor
connectTimeout=None, bindAddress=None,
pool=None):
_AgentBase.__init__(self, reactor, pool)
self._contextFactory = contextFactory
self._connectTimeout = connectTimeout
self._bindAddress = bindAddress
Expand All @@ -882,10 +1055,10 @@ def _wrapContextFactory(self, host, port):
return _WebToNormalContextFactory(self._contextFactory, host, port)


def _connect(self, scheme, host, port):
def _getEndpoint(self, scheme, host, port):
"""
Connect to the given host and port, using a transport selected based on
scheme.
Get an endpoint for the given host and port, using a transport
selected based on scheme.
@param scheme: A string like C{'http'} or C{'https'} (the only two
supported values) to use to determine how to establish the
Expand All @@ -897,23 +1070,20 @@ def _connect(self, scheme, host, port):
@param port: An C{int} giving the port number the connection will be
on.
@return: A L{Deferred} which fires with a connected instance of
C{self._protocol}.
@return: An endpoint which can be used to connect to given address.
"""
cc = ClientCreator(self._reactor, self._protocol)
kwargs = {}
if self._connectTimeout is not None:
kwargs['timeout'] = self._connectTimeout
kwargs['bindAddress'] = self._bindAddress
if scheme == 'http':
d = cc.connectTCP(host, port, **kwargs)
return TCP4ClientEndpoint(self._reactor, host, port, **kwargs)
elif scheme == 'https':
d = cc.connectSSL(host, port, self._wrapContextFactory(host, port),
**kwargs)
return SSL4ClientEndpoint(self._reactor, host, port,
self._wrapContextFactory(host, port),
**kwargs)
else:
d = defer.fail(SchemeNotSupported(
"Unsupported scheme: %r" % (scheme,)))
return d
raise SchemeNotSupported("Unsupported scheme: %r" % (scheme,))


def request(self, method, uri, headers=None, bodyProducer=None):
Expand Down Expand Up @@ -941,52 +1111,48 @@ def request(self, method, uri, headers=None, bodyProducer=None):
given URI is not supported.
@rtype: L{Deferred}
"""
return self._connectAndRequest(method, uri, headers, bodyProducer)



class _HTTP11ClientFactory(protocol.ClientFactory):
"""
A simple factory for L{HTTP11ClientProtocol}, used by L{ProxyAgent}.
@since: 11.1
"""
protocol = HTTP11ClientProtocol
parsedURI = _parse(uri)
try:
endpoint = self._getEndpoint(parsedURI.scheme, parsedURI.host,
parsedURI.port)
except SchemeNotSupported:
return defer.fail(Failure())
key = (parsedURI.scheme, parsedURI.host, parsedURI.port)
return self._requestWithEndpoint(key, endpoint, method, parsedURI,
headers, bodyProducer, parsedURI.path)



class ProxyAgent(_AgentMixin):
class ProxyAgent(_AgentBase):
"""
An HTTP agent able to cross HTTP proxies.
@ivar _factory: The factory used to connect to the proxy.
@ivar _proxyEndpoint: The endpoint used to connect to the proxy, passing
the factory.
@ivar _proxyEndpoint: The endpoint used to connect to the proxy.
@since: 11.1
"""

_factory = _HTTP11ClientFactory

def __init__(self, endpoint):
def __init__(self, endpoint, reactor=None, pool=None):
if reactor is None:
from twisted.internet import reactor
_AgentBase.__init__(self, reactor, pool)
self._proxyEndpoint = endpoint


def _connect(self, scheme, host, port):
"""
Ignore the connection to the expected host, and connect to the proxy
instead.
"""
return self._proxyEndpoint.connect(self._factory())


def request(self, method, uri, headers=None, bodyProducer=None):
"""
Issue a new request via the configured proxy.
"""
return self._connectAndRequest(method, uri, headers, bodyProducer,
requestPath=uri)
# Cache *all* connections under the same key, since we are only
# connecting to a single destination, the proxy:
key = ("http-proxy", self._proxyEndpoint)

# To support proxying HTTPS via CONNECT, we will use key
# ("http-proxy-CONNECT", scheme, host, port), and an endpoint that
# wraps _proxyEndpoint with an additional callback to do the CONNECT.
return self._requestWithEndpoint(key, self._proxyEndpoint, method,
_parse(uri), headers, bodyProducer,
uri)



Expand Down Expand Up @@ -1345,4 +1511,5 @@ def _handleResponse(self, response, method, uri, headers, redirectCount):
'PartialDownloadError', 'HTTPPageGetter', 'HTTPPageDownloader',
'HTTPClientFactory', 'HTTPDownloader', 'getPage', 'downloadPage',
'ResponseDone', 'Response', 'ResponseFailed', 'Agent', 'CookieAgent',
'ProxyAgent', 'ContentDecoderAgent', 'GzipDecoder', 'RedirectAgent']
'ProxyAgent', 'ContentDecoderAgent', 'GzipDecoder', 'RedirectAgent',
'HTTPConnectionPool']
244 changes: 239 additions & 5 deletions twisted/web/test/test_newclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ class SlowRequest:
"""
method = 'GET'
stopped = False
persistent = False

def writeTo(self, transport):
self.finished = Deferred()
Expand All @@ -811,6 +812,8 @@ class SimpleRequest:
returns a succeeded L{Deferred}. This vaguely emulates the behavior of a
L{Request} with no body producer.
"""
persistent = False

def writeTo(self, transport):
transport.write('SOME BYTES')
return succeed(None)
Expand Down Expand Up @@ -879,6 +882,7 @@ def test_failedWriteTo(self):
L{RequestGenerationFailed} wrapping the underlying failure.
"""
class BrokenRequest:
persistent = False
def writeTo(self, transport):
return fail(ArbitraryException())

Expand All @@ -901,6 +905,7 @@ def test_synchronousWriteToError(self):
a L{Failure} of L{RequestGenerationFailed} wrapping that exception.
"""
class BrokenRequest:
persistent = False
def writeTo(self, transport):
raise ArbitraryException()

Expand Down Expand Up @@ -991,10 +996,12 @@ def cbRequest(response):
self.assertEqual(response.code, 200)
self.assertEqual(response.headers, Headers())
self.assertTrue(self.transport.disconnecting)
self.assertEqual(self.protocol.state, 'QUIESCENT')
d.addCallback(cbRequest)
self.protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"Content-Length: 0\r\n"
"Connection: close\r\n"
"\r\n")
return d

Expand Down Expand Up @@ -1023,10 +1030,18 @@ def test_receiveResponseBeforeRequestGenerationDone(self):
If response bytes are delivered to L{HTTP11ClientProtocol} before the
L{Deferred} returned by L{Request.writeTo} fires, those response bytes
are parsed as part of the response.
The connection is also closed, because we're in a confusing state, and
therefore the C{quiescentCallback} isn't called.
"""
quiescentResult = []
transport = StringTransport()
protocol = HTTP11ClientProtocol(quiescentResult.append)
protocol.makeConnection(transport)

request = SlowRequest()
d = self.protocol.request(request)
self.protocol.dataReceived(
d = protocol.request(request)
protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"X-Foo: bar\r\n"
"Content-Length: 6\r\n"
Expand All @@ -1036,6 +1051,10 @@ def cbResponse(response):
p = AccumulatingProtocol()
whenFinished = p.closedDeferred = Deferred()
response.deliverBody(p)
self.assertEqual(
protocol.state, 'TRANSMITTING_AFTER_RECEIVING_RESPONSE')
self.assertTrue(transport.disconnecting)
self.assertEqual(quiescentResult, [])
return whenFinished.addCallback(
lambda ign: (response, p.data))
d.addCallback(cbResponse)
Expand Down Expand Up @@ -1225,15 +1244,41 @@ def test_proxyStopped(self):

def test_abortClosesConnection(self):
"""
The transport will be told to close its connection when
L{HTTP11ClientProtocol.abort} is invoked.
L{HTTP11ClientProtocol.abort} will tell the transport to close its
connection when it is invoked, and returns a C{Deferred} that fires
when the connection is lost.
"""
transport = StringTransport()
protocol = HTTP11ClientProtocol()
protocol.makeConnection(transport)
protocol.abort()
r1 = []
r2 = []
protocol.abort().addCallback(r1.append)
protocol.abort().addCallback(r2.append)
self.assertEqual((r1, r2), ([], []))
self.assertTrue(transport.disconnecting)

# Disconnect protocol, the Deferreds will fire:
protocol.connectionLost(Failure(ConnectionDone()))
self.assertEqual(r1, [None])
self.assertEqual(r2, [None])


def test_abortAfterConnectionLost(self):
"""
L{HTTP11ClientProtocol.abort} called after the connection is lost
returns a C{Deferred} that fires immediately.
"""
transport = StringTransport()
protocol = HTTP11ClientProtocol()
protocol.makeConnection(transport)
protocol.connectionLost(Failure(ConnectionDone()))

result = []
protocol.abort().addCallback(result.append)
self.assertEqual(result, [None])
self.assertEqual(protocol._state, "CONNECTION_LOST")


def test_abortBeforeResponseBody(self):
"""
Expand Down Expand Up @@ -1313,6 +1358,181 @@ def checkError(error):
return deferred.addCallback(checkError)


def test_quiescentCallbackCalled(self):
"""
If after a response is done the {HTTP11ClientProtocol} stays open and
returns to QUIESCENT state, all per-request state is reset and the
C{quiescentCallback} is called with the protocol instance.
This is useful for implementing a persistent connection pool.
The C{quiescentCallback} is called *before* the response-receiving
protocol's C{connectionLost}, so that new requests triggered by end of
first request can re-use a persistent connection.
"""
quiescentResult = []
def callback(p):
self.assertEqual(p, protocol)
self.assertEqual(p.state, "QUIESCENT")
quiescentResult.append(p)

transport = StringTransport()
protocol = HTTP11ClientProtocol(callback)
protocol.makeConnection(transport)

requestDeferred = protocol.request(
Request('GET', '/', _boringHeaders, None, persistent=True))
protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"Content-length: 3\r\n"
"\r\n")

# Headers done, but still no quiescent callback:
self.assertEqual(quiescentResult, [])

result = []
requestDeferred.addCallback(result.append)
response = result[0]

# When response body is done (i.e. connectionLost is called), note the
# fact in quiescentResult:
bodyProtocol = AccumulatingProtocol()
bodyProtocol.closedDeferred = Deferred()
bodyProtocol.closedDeferred.addCallback(
lambda ign: quiescentResult.append("response done"))

response.deliverBody(bodyProtocol)
protocol.dataReceived("abc")
bodyProtocol.closedReason.trap(ResponseDone)
# Quiescent callback called *before* protocol handling the response
# body gets its connectionLost called:
self.assertEqual(quiescentResult, [protocol, "response done"])

# Make sure everything was cleaned up:
self.assertEqual(protocol._parser, None)
self.assertEqual(protocol._finishedRequest, None)
self.assertEqual(protocol._currentRequest, None)
self.assertEqual(protocol._transportProxy, None)
self.assertEqual(protocol._responseDeferred, None)


def test_quiescentCallbackCalledEmptyResponse(self):
"""
The quiescentCallback is called before the request C{Deferred} fires,
in cases where the response has no body.
"""
quiescentResult = []
def callback(p):
self.assertEqual(p, protocol)
self.assertEqual(p.state, "QUIESCENT")
quiescentResult.append(p)

transport = StringTransport()
protocol = HTTP11ClientProtocol(callback)
protocol.makeConnection(transport)

requestDeferred = protocol.request(
Request('GET', '/', _boringHeaders, None, persistent=True))
requestDeferred.addCallback(quiescentResult.append)
protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"Content-length: 0\r\n"
"\r\n")

self.assertEqual(len(quiescentResult), 2)
self.assertIdentical(quiescentResult[0], protocol)
self.assertIsInstance(quiescentResult[1], Response)


def test_quiescentCallbackNotCalled(self):
"""
If after a response is done the {HTTP11ClientProtocol} returns a
C{Connection: close} header in the response, the C{quiescentCallback}
is not called and the connection is lost.
"""
quiescentResult = []
transport = StringTransport()
protocol = HTTP11ClientProtocol(quiescentResult.append)
protocol.makeConnection(transport)

requestDeferred = protocol.request(
Request('GET', '/', _boringHeaders, None, persistent=True))
protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"Content-length: 0\r\n"
"Connection: close\r\n"
"\r\n")

result = []
requestDeferred.addCallback(result.append)
response = result[0]

bodyProtocol = AccumulatingProtocol()
response.deliverBody(bodyProtocol)
bodyProtocol.closedReason.trap(ResponseDone)
self.assertEqual(quiescentResult, [])
self.assertTrue(transport.disconnecting)


def test_quiescentCallbackNotCalledNonPersistentQuery(self):
"""
If the request was non-persistent (i.e. sent C{Connection: close}),
the C{quiescentCallback} is not called and the connection is lost.
"""
quiescentResult = []
transport = StringTransport()
protocol = HTTP11ClientProtocol(quiescentResult.append)
protocol.makeConnection(transport)

requestDeferred = protocol.request(
Request('GET', '/', _boringHeaders, None, persistent=False))
protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"Content-length: 0\r\n"
"\r\n")

result = []
requestDeferred.addCallback(result.append)
response = result[0]

bodyProtocol = AccumulatingProtocol()
response.deliverBody(bodyProtocol)
bodyProtocol.closedReason.trap(ResponseDone)
self.assertEqual(quiescentResult, [])
self.assertTrue(transport.disconnecting)


def test_quiescentCallbackThrows(self):
"""
If C{quiescentCallback} throws an exception, the error is logged and
protocol is disconnected.
"""
def callback(p):
raise ZeroDivisionError()

transport = StringTransport()
protocol = HTTP11ClientProtocol(callback)
protocol.makeConnection(transport)

requestDeferred = protocol.request(
Request('GET', '/', _boringHeaders, None, persistent=True))
protocol.dataReceived(
"HTTP/1.1 200 OK\r\n"
"Content-length: 0\r\n"
"\r\n")

result = []
requestDeferred.addCallback(result.append)
response = result[0]
bodyProtocol = AccumulatingProtocol()
response.deliverBody(bodyProtocol)
bodyProtocol.closedReason.trap(ResponseDone)

errors = self.flushLoggedErrors(ZeroDivisionError)
self.assertEqual(len(errors), 1)
self.assertTrue(transport.disconnecting)



class StringProducer:
"""
Expand Down Expand Up @@ -1367,6 +1587,19 @@ def test_sendSimplestRequest(self):
"\r\n")


def test_sendSimplestPersistentRequest(self):
"""
A pesistent request does not send 'Connection: close' header.
"""
req = Request('GET', '/', _boringHeaders, None, persistent=True)
req.writeTo(self.transport)
self.assertEqual(
self.transport.value(),
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n")


def test_sendRequestHeaders(self):
"""
L{Request.writeTo} formats header data and writes it to the given
Expand Down Expand Up @@ -1721,6 +1954,7 @@ def test_sendRequestBodyWithError(self):

return self.assertFailure(writeDeferred, ArbitraryException)


def test_hostHeaderRequired(self):
"""
L{Request.writeTo} raises L{BadHeaders} if there is not exactly one
Expand Down
744 changes: 584 additions & 160 deletions twisted/web/test/test_webclient.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions twisted/web/topfiles/3420.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
twisted.web.client.Agent and ProxyAgent now support persistent connections.