Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow async responses from RequestTraversalAgent #115

Merged
merged 3 commits into from Oct 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 82 additions & 3 deletions treq/test/test_testing.py
Expand Up @@ -42,6 +42,18 @@ def render(self, request):
return NOT_DONE_YET


class _EventuallyResponsiveTestResource(Resource):
"""
Resource that returns NOT_DONE_YET and stores the request so that something
else can finish the response later.
"""
isLeaf = True

def render(self, request):
self.stored_request = request
return NOT_DONE_YET


class StubbingTests(TestCase):
"""
Tests for :class:`StubTreq`.
Expand Down Expand Up @@ -89,7 +101,7 @@ def test_providing_resource_to_stub_treq(self):
urls = (
'http://supports-http.com',
'https://supports-https.com',
'http://this/has/a/path/and/invalid/domain/name'
'http://this/has/a/path/and/invalid/domain/name',
'https://supports-https.com:8080',
'http://supports-http.com:8080',
)
Expand Down Expand Up @@ -155,16 +167,83 @@ def test_passing_in_strange_data_is_rejected(self):
self.successResultOf(
stub.request('method', 'http://url', data=text_type("")))

def test_handles_asynchronous_requests(self):
def test_handles_failing_asynchronous_requests(self):
"""
Handle a resource returning NOT_DONE_YET.
Handle a resource returning NOT_DONE_YET and then canceling the
request.
"""
stub = StubTreq(_NonResponsiveTestResource())
d = stub.request('method', 'http://url', data="1234")
self.assertNoResult(d)
d.cancel()
self.failureResultOf(d, ResponseFailed)

def test_handles_successful_asynchronous_requests(self):
"""
Handle a resource returning NOT_DONE_YET and then later finishing the
response.
"""
rsrc = _EventuallyResponsiveTestResource()
stub = StubTreq(rsrc)
d = stub.request('method', 'http://example.com/', data="1234")
self.assertNoResult(d)
rsrc.stored_request.finish()
stub.flush()
resp = self.successResultOf(d)
self.assertEqual(resp.code, 200)

def test_handles_successful_asynchronous_requests_with_response_data(self):
"""
Handle a resource returning NOT_DONE_YET and then sending some data in
the response.
"""
rsrc = _EventuallyResponsiveTestResource()
stub = StubTreq(rsrc)
d = stub.request('method', 'http://example.com/', data="1234")
self.assertNoResult(d)

chunks = []
rsrc.stored_request.write('spam ')
rsrc.stored_request.write('eggs')
stub.flush()
resp = self.successResultOf(d)
d = stub.collect(resp, chunks.append)
self.assertNoResult(d)
self.assertEqual(''.join(chunks), 'spam eggs')

rsrc.stored_request.finish()
stub.flush()
self.successResultOf(d)

def test_handles_successful_asynchronous_requests_with_streaming(self):
"""
Handle a resource returning NOT_DONE_YET and then streaming data back
gradually over time.
"""
rsrc = _EventuallyResponsiveTestResource()
stub = StubTreq(rsrc)
d = stub.request('method', 'http://example.com/', data="1234")
self.assertNoResult(d)

chunks = []
rsrc.stored_request.write('spam ')
rsrc.stored_request.write('eggs')
stub.flush()
resp = self.successResultOf(d)
d = stub.collect(resp, chunks.append)
self.assertNoResult(d)
self.assertEqual(''.join(chunks), 'spam eggs')

del chunks[:]
rsrc.stored_request.write('eggs\r\nspam\r\n')
stub.flush()
self.assertNoResult(d)
self.assertEqual(''.join(chunks), 'eggs\r\nspam\r\n')

rsrc.stored_request.finish()
stub.flush()
self.successResultOf(d)


class HasHeadersTests(TestCase):
"""
Expand Down
105 changes: 47 additions & 58 deletions treq/testing.py
Expand Up @@ -6,10 +6,10 @@

from six import string_types

from twisted.test.proto_helpers import StringTransport, MemoryReactor
from twisted.test.proto_helpers import MemoryReactor
from twisted.test import iosim

from twisted.internet.address import IPv4Address
from twisted.internet.error import ConnectionDone
from twisted.internet.defer import succeed
from twisted.internet.interfaces import ISSLTransport

Expand All @@ -20,26 +20,12 @@
from twisted.web.server import Site
from twisted.web.iweb import IAgent, IBodyProducer

from twisted.python.failure import Failure

from zope.interface import directlyProvides, implementer

import treq
from treq.client import HTTPClient


class AbortableStringTransport(StringTransport):
"""
A :obj:`StringTransport` that supports ``abortConnection``.
"""
def abortConnection(self):
"""
Since all connection cessation is immediate in this in-memory
transport, just call ``loseConnection``.
"""
self.loseConnection()


@implementer(IAgent)
class RequestTraversalAgent(object):
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this is getting rid of public API I think, but maybe we can afford to be a little more fast and loose than Twisted itself. (We should really make all implementation modules private by default, and explicitly populate public APIs …)

Expand All @@ -55,6 +41,7 @@ def __init__(self, rootResource):
self._memoryReactor = MemoryReactor()
self._realAgent = Agent(reactor=self._memoryReactor)
self._rootResource = rootResource
self._pumps = set()

def request(self, method, uri, headers=None, bodyProducer=None):
"""
Expand Down Expand Up @@ -89,54 +76,54 @@ def check_already_called(r):
host, port, factory, timeout, bindAddress = (
self._memoryReactor.tcpClients[-1])

# Then we need to convince that factory it's connected to something and
# it will give us a protocol for that connection.
protocol = factory.buildProtocol(None)

# We want to capture the output of that connection so we'll make an
# in-memory transport.
clientTransport = AbortableStringTransport()
if scheme == "https":
directlyProvides(clientTransport, ISSLTransport)

# When the protocol is connected to a transport, it ought to send the
# whole request because callers of this should not use an asynchronous
# bodyProducer.
protocol.makeConnection(clientTransport)

# Get the data from the request.
requestData = clientTransport.io.getvalue()
serverAddress = IPv4Address('TCP', '127.0.0.1', port)
clientAddress = IPv4Address('TCP', '127.0.0.1', 31337)

# Create the protocol and fake transport for the client and server,
# using the factory that was passed to the MemoryReactor for the
# client, and a Site around our rootResource for the server.
serverProtocol = Site(self._rootResource).buildProtocol(None)
serverTransport = iosim.FakeTransport(
serverProtocol, isServer=True,
hostAddress=serverAddress, peerAddress=clientAddress)
clientProtocol = factory.buildProtocol(None)
clientTransport = iosim.FakeTransport(
clientProtocol, isServer=False,
hostAddress=clientAddress, peerAddress=serverAddress)

# Twisted 13.2 compatibility.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only for 13.2, shouldn't we avoid overwriting that method if it's already set?

serverTransport.abortConnection = serverTransport.loseConnection
clientTransport.abortConnection = clientTransport.loseConnection

# Now time for the server to do its job. Ask it to build an HTTP
# channel.
channel = Site(self._rootResource).buildProtocol(None)

# Connect the channel to another in-memory transport so we can collect
# the response.
serverTransport = AbortableStringTransport()
if scheme == "https":
# Provide ISSLTransport on both transports, so everyone knows that
# this is HTTPS.
directlyProvides(serverTransport, ISSLTransport)
serverTransport.hostAddr = IPv4Address('TCP', '127.0.0.1', port)
channel.makeConnection(serverTransport)
directlyProvides(clientTransport, ISSLTransport)

# Feed it the data that the Agent synthesized.
channel.dataReceived(requestData)
# Make a pump for wiring the client and server together.
pump = iosim.connect(
serverProtocol, serverTransport, clientProtocol, clientTransport)
self._pumps.add(pump)

# Now we have the response data, let's give it back to the Agent.
protocol.dataReceived(serverTransport.io.getvalue())
return response

def finish(r):
# By now the Agent should have all it needs to parse a response.
protocol.connectionLost(Failure(ConnectionDone()))
# Tell it that the connection is now complete so it can clean up.
channel.connectionLost(Failure(ConnectionDone()))
# Propogate the response.
return r
def flush(self):
"""
Flush all data between pending client/server pairs.

# Return the response in the accepted format (Deferred firing
# IResponse). This should be synchronously fired, and if not, it's the
# system under test's problem.
return response.addBoth(finish)
This is only necessary if a :obj:`Resource` under test returns
:obj:`NOT_DONE_YET` from its ``render`` method, making a response
asynchronous. In that case, after each write from the server,
:meth:`pump` must be called so the client can see it.
"""
old_pumps = self._pumps
new_pumps = self._pumps = set()
for p in old_pumps:
p.flush()
if p.clientIO.disconnected and p.serverIO.disconnected:
continue
new_pumps.add(p)


@implementer(IBodyProducer)
Expand Down Expand Up @@ -197,7 +184,8 @@ def __init__(self, resource):
Construct a client, and pass through client methods and/or
treq.content functions.
"""
_client = HTTPClient(agent=RequestTraversalAgent(resource),
_agent = RequestTraversalAgent(resource)
_client = HTTPClient(agent=_agent,
data_to_body_producer=_SynchronousProducer)
for function_name in treq.__all__:
function = getattr(_client, function_name, None)
Expand All @@ -207,6 +195,7 @@ def __init__(self, resource):
function = _reject_files(function)

setattr(self, function_name, function)
self.flush = _agent.flush


class StringStubbingResource(Resource):
Expand Down