Skip to content

Commit

Permalink
Merge nopipeline-8320-2: Remove parallel HTTP/1.1 pipelining
Browse files Browse the repository at this point in the history
Author: lukasa
Reviewers: glyph, hawkowl
Fixes: #8320
  • Loading branch information
hawkowl committed May 24, 2016
2 parents 77d338b + 3c4972d commit 1d69e04
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 198 deletions.
163 changes: 114 additions & 49 deletions twisted/web/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def _parseHeader(line):
from twisted.internet.interfaces import IProtocol
from twisted.protocols import policies, basic

from twisted.web.iweb import IRequest, IAccessLogFormatter
from twisted.web.iweb import (
IRequest, IAccessLogFormatter, INonQueuedRequestFactory)
from twisted.web.http_headers import Headers

H2_ENABLED = False
Expand Down Expand Up @@ -534,6 +535,10 @@ def rawDataReceived(self, data):
NO_BODY_CODES = (204, 304)


# Sentinel object that detects people explicitly passing `queued` to Request.
_QUEUED_SENTINEL = object()


@implementer(interfaces.IConsumer)
class Request:
"""
Expand Down Expand Up @@ -587,24 +592,24 @@ class Request:
_forceSSL = 0
_disconnected = False

def __init__(self, channel, queued):
def __init__(self, channel, queued=_QUEUED_SENTINEL):
"""
@param channel: the channel we're connected to.
@param queued: are we in the request queue, or can we start writing to
the transport?
@param queued: (deprecated) are we in the request queue, or can we
start writing to the transport?
"""
self.notifications = []
self.channel = channel
self.queued = queued
self.requestHeaders = Headers()
self.received_cookies = {}
self.responseHeaders = Headers()
self.cookies = [] # outgoing cookies
self.transport = self.channel.transport

if queued:
self.transport = StringTransport()
else:
self.transport = self.channel.transport
if queued is _QUEUED_SENTINEL:
queued = False

self.queued = queued


def _cleanup(self):
Expand All @@ -616,12 +621,13 @@ def _cleanup(self):
self.unregisterProducer()
self.channel.requestDone(self)
del self.channel
try:
self.content.close()
except OSError:
# win32 suckiness, no idea why it does this
pass
del self.content
if self.content is not None:
try:
self.content.close()
except OSError:
# win32 suckiness, no idea why it does this
pass
del self.content
for d in self.notifications:
d.callback(None)
self.notifications = []
Expand All @@ -635,25 +641,12 @@ def noLongerQueued(self):
We start writing whatever data we have to the transport, etc.
This method is not intended for users.
"""
if not self.queued:
raise RuntimeError("noLongerQueued() got called unnecessarily.")

self.queued = 0
# set transport to real one and send any buffer data
data = self.transport.getvalue()
self.transport = self.channel.transport
if data:
self.transport.write(data)

# if we have producer, register it with transport
if (self.producer is not None) and not self.finished:
self.transport.registerProducer(self.producer, self.streamingProducer)
In 16.3 this method was changed to become a no-op, as L{Request}
objects are now never queued.
"""
pass

# if we're finished, clean up
if self.finished:
self._cleanup()

def gotLength(self, length):
"""
Expand Down Expand Up @@ -806,19 +799,13 @@ def registerProducer(self, producer, streaming):

self.streamingProducer = streaming
self.producer = producer

if self.queued:
if streaming:
producer.pauseProducing()
else:
self.transport.registerProducer(producer, streaming)
self.transport.registerProducer(producer, streaming)

def unregisterProducer(self):
"""
Unregister the producer.
"""
if not self.queued:
self.transport.unregisterProducer()
self.transport.unregisterProducer()
self.producer = None


Expand Down Expand Up @@ -1264,11 +1251,12 @@ def isSecure(self):
"""
if self._forceSSL:
return True
transport = getattr(getattr(self, 'channel', None), 'transport', None)
transport = getattr(self, 'transport', None)
if interfaces.ISSLTransport(transport, None) is not None:
return True
return False


def _authorize(self):
# Authorization, (mostly) per the RFC
try:
Expand Down Expand Up @@ -1351,6 +1339,10 @@ def connectionLost(self, reason):
"Twisted Names to resolve hostnames")(Request.getClient)


Request.noLongerQueued = deprecated(
Version("Twisted", 16, 3, 0))(Request.noLongerQueued)


class _DataLoss(Exception):
"""
L{_DataLoss} indicates that not all of a message body was received. This
Expand Down Expand Up @@ -1585,6 +1577,34 @@ def noMoreData(self):



@implementer(interfaces.IPushProducer)
class _NoPushProducer(object):
"""
A no-op version of L{interfaces.IPushProducer}, used to abstract over the
possibility that a L{HTTPChannel} transport does not provide
L{IPushProducer}.
"""
def pauseProducing(self):
"""
Pause producing data.
Tells a producer that it has produced too much data to process for
the time being, and to stop until resumeProducing() is called.
"""
pass


def resumeProducing(self):
"""
Resume producing data.
This tells a producer to re-add itself to the main loop and produce
more data for its consumer.
"""
pass



class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
"""
A receiver for HTTP requests.
Expand All @@ -1605,6 +1625,19 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
@ivar _receivedHeaderSize: Bytes received so far for the header.
@type _receivedHeaderSize: C{int}
@ivar _handlingRequest: Whether a request is currently being processed.
@type _handlingRequest: L{bool}
@ivar _dataBuffer: Any data that has been received from the connection
while processing an outstanding request.
@type _dataBuffer: L{list} of L{bytes}
@ivar _producer: Either the transport, if it provides
L{interfaces.IPushProducer}, or a null implementation of
L{interfaces.IPushProducer}. Used to attempt to prevent the transport
from producing excess data when we're responding to a request.
@type _producer: L{interfaces.IPushProducer}
"""

maxHeaders = 500
Expand All @@ -1626,11 +1659,16 @@ class HTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
def __init__(self):
# the request queue
self.requests = []
self._handlingRequest = False
self._dataBuffer = []
self._transferDecoder = None


def connectionMade(self):
self.setTimeout(self.timeOut)
self._producer = interfaces.IPushProducer(
self.transport, _NoPushProducer()
)


def lineReceived(self, line):
Expand All @@ -1645,6 +1683,13 @@ def lineReceived(self, line):
_respondToBadRequestAndDisconnect(self.transport)
return

# If we're currently handling a request, buffer this data. We shouldn't
# have received it (we've paused the transport), but let's be cautious.
if self._handlingRequest:
self._dataBuffer.append(line)
self._dataBuffer.append(b'\r\n')
return

if self.__first_line:
# if this connection is not persistent, drop any data which
# the client (illegally) sent after the last request.
Expand All @@ -1659,7 +1704,10 @@ def lineReceived(self, line):
return

# create a new Request object
request = self.requestFactory(self, len(self.requests))
if INonQueuedRequestFactory.providedBy(self.requestFactory):
request = self.requestFactory(self)
else:
request = self.requestFactory(self, len(self.requests))
self.requests.append(request)

self.__first_line = 0
Expand Down Expand Up @@ -1706,7 +1754,7 @@ def lineReceived(self, line):

def _finishRequestBody(self, data):
self.allContentReceived()
self.setLineMode(data)
self._dataBuffer.append(data)


def headerReceived(self, line):
Expand Down Expand Up @@ -1777,12 +1825,23 @@ def allContentReceived(self):
if self.timeOut:
self._savedTimeOut = self.setTimeout(None)

# Pause the producer if we can. If we can't, that's ok, we'll buffer.
self._producer.pauseProducing()
self._handlingRequest = True

req = self.requests[-1]
req.requestReceived(command, path, version)


def rawDataReceived(self, data):
self.resetTimeout()

# If we're currently handling a request, buffer this data. We shouldn't
# have received it (we've paused the transport), but let's be cautious.
if self._handlingRequest:
self._dataBuffer.append(data)
return

try:
self._transferDecoder.dataReceived(data)
except _MalformedChunkedDataError:
Expand Down Expand Up @@ -1854,19 +1913,25 @@ def requestDone(self, request):
del self.requests[0]

if self.persistent:
# notify next request it can start writing
if self.requests:
self.requests[0].noLongerQueued()
else:
if self._savedTimeOut:
self.setTimeout(self._savedTimeOut)
self._handlingRequest = False
self._producer.resumeProducing()

if self._savedTimeOut:
self.setTimeout(self._savedTimeOut)

# Receive our buffered data, if any.
data = b''.join(self._dataBuffer)
self._dataBuffer = []
self.setLineMode(data)
else:
self.transport.loseConnection()


def timeoutConnection(self):
log.msg("Timing out client: %s" % str(self.transport.getPeer()))
policies.TimeoutMixin.timeoutConnection(self)


def connectionLost(self, reason):
self.setTimeout(None)
for request in self.requests:
Expand Down
19 changes: 19 additions & 0 deletions twisted/web/iweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,25 @@ def setHost(host, port, ssl=0):



class INonQueuedRequestFactory(Interface):
"""
A factory of L{IRequest} objects that does not take a ``queued`` parameter.
"""
def __call__(channel):
"""
Create an L{IRequest} that is operating on the given channel. There
must only be one L{IRequest} object processing at any given time on a
channel.
@param channel: A L{twisted.web.http.HTTPChannel} object.
@type channel: L{twisted.web.http.HTTPChannel}
@return: A request object.
@rtype: L{IRequest}
"""



class IAccessLogFormatter(Interface):
"""
An object which can represent an HTTP request as a line of text for
Expand Down
6 changes: 3 additions & 3 deletions twisted/web/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from twisted.internet.protocol import ClientFactory
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from twisted.web.http import HTTPClient, Request, HTTPChannel
from twisted.web.http import HTTPClient, Request, HTTPChannel, _QUEUED_SENTINEL



Expand Down Expand Up @@ -134,7 +134,7 @@ class ProxyRequest(Request):
protocols = {b'http': ProxyClientFactory}
ports = {b'http': 80}

def __init__(self, channel, queued, reactor=reactor):
def __init__(self, channel, queued=_QUEUED_SENTINEL, reactor=reactor):
Request.__init__(self, channel, queued)
self.reactor = reactor

Expand Down Expand Up @@ -195,7 +195,7 @@ class ReverseProxyRequest(Request):

proxyClientFactoryClass = ProxyClientFactory

def __init__(self, channel, queued, reactor=reactor):
def __init__(self, channel, queued=_QUEUED_SENTINEL, reactor=reactor):
Request.__init__(self, channel, queued)
self.reactor = reactor

Expand Down
2 changes: 1 addition & 1 deletion twisted/web/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ class Site(http.HTTPFactory):
A web site: manage log, sessions, and resources.
@ivar counter: increment value used for generating unique sessions ID.
@ivar requestFactory: A factory which is called with (channel, queued)
@ivar requestFactory: A factory which is called with (channel)
and creates L{Request} instances. Default to L{Request}.
@ivar displayTracebacks: if set, Twisted internal errors are displayed on
rendered pages. Default to C{True}.
Expand Down

0 comments on commit 1d69e04

Please sign in to comment.