diff --git a/treq/test/test_testing.py b/treq/test/test_testing.py index c915b2e0..699024fa 100644 --- a/treq/test/test_testing.py +++ b/treq/test/test_testing.py @@ -42,6 +42,18 @@ def render(self, request): return NOT_DONE_YET +class _EventuallyResponsiveTestResource(Resource): + """ + Resource that returns NOT_DONE_YET and stores the request so that something + else can finish the response later. + """ + isLeaf = True + + def render(self, request): + self.stored_request = request + return NOT_DONE_YET + + class StubbingTests(TestCase): """ Tests for :class:`StubTreq`. @@ -89,7 +101,7 @@ def test_providing_resource_to_stub_treq(self): urls = ( 'http://supports-http.com', 'https://supports-https.com', - 'http://this/has/a/path/and/invalid/domain/name' + 'http://this/has/a/path/and/invalid/domain/name', 'https://supports-https.com:8080', 'http://supports-http.com:8080', ) @@ -155,9 +167,10 @@ def test_passing_in_strange_data_is_rejected(self): self.successResultOf( stub.request('method', 'http://url', data=text_type(""))) - def test_handles_asynchronous_requests(self): + def test_handles_failing_asynchronous_requests(self): """ - Handle a resource returning NOT_DONE_YET. + Handle a resource returning NOT_DONE_YET and then canceling the + request. """ stub = StubTreq(_NonResponsiveTestResource()) d = stub.request('method', 'http://url', data="1234") @@ -165,6 +178,72 @@ def test_handles_asynchronous_requests(self): d.cancel() self.failureResultOf(d, ResponseFailed) + def test_handles_successful_asynchronous_requests(self): + """ + Handle a resource returning NOT_DONE_YET and then later finishing the + response. + """ + rsrc = _EventuallyResponsiveTestResource() + stub = StubTreq(rsrc) + d = stub.request('method', 'http://example.com/', data="1234") + self.assertNoResult(d) + rsrc.stored_request.finish() + stub.flush() + resp = self.successResultOf(d) + self.assertEqual(resp.code, 200) + + def test_handles_successful_asynchronous_requests_with_response_data(self): + """ + Handle a resource returning NOT_DONE_YET and then sending some data in + the response. + """ + rsrc = _EventuallyResponsiveTestResource() + stub = StubTreq(rsrc) + d = stub.request('method', 'http://example.com/', data="1234") + self.assertNoResult(d) + + chunks = [] + rsrc.stored_request.write('spam ') + rsrc.stored_request.write('eggs') + stub.flush() + resp = self.successResultOf(d) + d = stub.collect(resp, chunks.append) + self.assertNoResult(d) + self.assertEqual(''.join(chunks), 'spam eggs') + + rsrc.stored_request.finish() + stub.flush() + self.successResultOf(d) + + def test_handles_successful_asynchronous_requests_with_streaming(self): + """ + Handle a resource returning NOT_DONE_YET and then streaming data back + gradually over time. + """ + rsrc = _EventuallyResponsiveTestResource() + stub = StubTreq(rsrc) + d = stub.request('method', 'http://example.com/', data="1234") + self.assertNoResult(d) + + chunks = [] + rsrc.stored_request.write('spam ') + rsrc.stored_request.write('eggs') + stub.flush() + resp = self.successResultOf(d) + d = stub.collect(resp, chunks.append) + self.assertNoResult(d) + self.assertEqual(''.join(chunks), 'spam eggs') + + del chunks[:] + rsrc.stored_request.write('eggs\r\nspam\r\n') + stub.flush() + self.assertNoResult(d) + self.assertEqual(''.join(chunks), 'eggs\r\nspam\r\n') + + rsrc.stored_request.finish() + stub.flush() + self.successResultOf(d) + class HasHeadersTests(TestCase): """ diff --git a/treq/testing.py b/treq/testing.py index 6177896d..3e0aba70 100644 --- a/treq/testing.py +++ b/treq/testing.py @@ -6,10 +6,10 @@ from six import string_types -from twisted.test.proto_helpers import StringTransport, MemoryReactor +from twisted.test.proto_helpers import MemoryReactor +from twisted.test import iosim from twisted.internet.address import IPv4Address -from twisted.internet.error import ConnectionDone from twisted.internet.defer import succeed from twisted.internet.interfaces import ISSLTransport @@ -20,26 +20,12 @@ from twisted.web.server import Site from twisted.web.iweb import IAgent, IBodyProducer -from twisted.python.failure import Failure - from zope.interface import directlyProvides, implementer import treq from treq.client import HTTPClient -class AbortableStringTransport(StringTransport): - """ - A :obj:`StringTransport` that supports ``abortConnection``. - """ - def abortConnection(self): - """ - Since all connection cessation is immediate in this in-memory - transport, just call ``loseConnection``. - """ - self.loseConnection() - - @implementer(IAgent) class RequestTraversalAgent(object): """ @@ -55,6 +41,7 @@ def __init__(self, rootResource): self._memoryReactor = MemoryReactor() self._realAgent = Agent(reactor=self._memoryReactor) self._rootResource = rootResource + self._pumps = set() def request(self, method, uri, headers=None, bodyProducer=None): """ @@ -89,54 +76,54 @@ def check_already_called(r): host, port, factory, timeout, bindAddress = ( self._memoryReactor.tcpClients[-1]) - # Then we need to convince that factory it's connected to something and - # it will give us a protocol for that connection. - protocol = factory.buildProtocol(None) - - # We want to capture the output of that connection so we'll make an - # in-memory transport. - clientTransport = AbortableStringTransport() - if scheme == "https": - directlyProvides(clientTransport, ISSLTransport) - - # When the protocol is connected to a transport, it ought to send the - # whole request because callers of this should not use an asynchronous - # bodyProducer. - protocol.makeConnection(clientTransport) - - # Get the data from the request. - requestData = clientTransport.io.getvalue() + serverAddress = IPv4Address('TCP', '127.0.0.1', port) + clientAddress = IPv4Address('TCP', '127.0.0.1', 31337) + + # Create the protocol and fake transport for the client and server, + # using the factory that was passed to the MemoryReactor for the + # client, and a Site around our rootResource for the server. + serverProtocol = Site(self._rootResource).buildProtocol(None) + serverTransport = iosim.FakeTransport( + serverProtocol, isServer=True, + hostAddress=serverAddress, peerAddress=clientAddress) + clientProtocol = factory.buildProtocol(None) + clientTransport = iosim.FakeTransport( + clientProtocol, isServer=False, + hostAddress=clientAddress, peerAddress=serverAddress) + + # Twisted 13.2 compatibility. + serverTransport.abortConnection = serverTransport.loseConnection + clientTransport.abortConnection = clientTransport.loseConnection - # Now time for the server to do its job. Ask it to build an HTTP - # channel. - channel = Site(self._rootResource).buildProtocol(None) - - # Connect the channel to another in-memory transport so we can collect - # the response. - serverTransport = AbortableStringTransport() if scheme == "https": + # Provide ISSLTransport on both transports, so everyone knows that + # this is HTTPS. directlyProvides(serverTransport, ISSLTransport) - serverTransport.hostAddr = IPv4Address('TCP', '127.0.0.1', port) - channel.makeConnection(serverTransport) + directlyProvides(clientTransport, ISSLTransport) - # Feed it the data that the Agent synthesized. - channel.dataReceived(requestData) + # Make a pump for wiring the client and server together. + pump = iosim.connect( + serverProtocol, serverTransport, clientProtocol, clientTransport) + self._pumps.add(pump) - # Now we have the response data, let's give it back to the Agent. - protocol.dataReceived(serverTransport.io.getvalue()) + return response - def finish(r): - # By now the Agent should have all it needs to parse a response. - protocol.connectionLost(Failure(ConnectionDone())) - # Tell it that the connection is now complete so it can clean up. - channel.connectionLost(Failure(ConnectionDone())) - # Propogate the response. - return r + def flush(self): + """ + Flush all data between pending client/server pairs. - # Return the response in the accepted format (Deferred firing - # IResponse). This should be synchronously fired, and if not, it's the - # system under test's problem. - return response.addBoth(finish) + This is only necessary if a :obj:`Resource` under test returns + :obj:`NOT_DONE_YET` from its ``render`` method, making a response + asynchronous. In that case, after each write from the server, + :meth:`pump` must be called so the client can see it. + """ + old_pumps = self._pumps + new_pumps = self._pumps = set() + for p in old_pumps: + p.flush() + if p.clientIO.disconnected and p.serverIO.disconnected: + continue + new_pumps.add(p) @implementer(IBodyProducer) @@ -197,7 +184,8 @@ def __init__(self, resource): Construct a client, and pass through client methods and/or treq.content functions. """ - _client = HTTPClient(agent=RequestTraversalAgent(resource), + _agent = RequestTraversalAgent(resource) + _client = HTTPClient(agent=_agent, data_to_body_producer=_SynchronousProducer) for function_name in treq.__all__: function = getattr(_client, function_name, None) @@ -207,6 +195,7 @@ def __init__(self, resource): function = _reject_files(function) setattr(self, function_name, function) + self.flush = _agent.flush class StringStubbingResource(Resource):