From 5f5e0bb3be4e522039fbbe3768109ea64cc99fe2 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Mon, 15 Nov 2010 16:53:44 -0800 Subject: [PATCH] Implement max_clients limitation and queueing for SimpleAsyncHTTPClient --- tornado/simple_httpclient.py | 30 +++++++++++++++--- tornado/test/simple_httpclient_test.py | 42 ++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/tornado/simple_httpclient.py b/tornado/simple_httpclient.py index 07d655afb3..c1b340627d 100644 --- a/tornado/simple_httpclient.py +++ b/tornado/simple_httpclient.py @@ -8,6 +8,7 @@ from tornado.iostream import IOStream, SSLIOStream from tornado import stack_context +import collections import contextlib import errno import functools @@ -47,11 +48,13 @@ class SimpleAsyncHTTPClient(object): _ASYNC_CLIENTS = weakref.WeakKeyDictionary() def __new__(cls, io_loop=None, max_clients=10, - max_simultaneous_connections=None): + max_simultaneous_connections=None, + force_instance=False): """Creates a SimpleAsyncHTTPClient. Only a single SimpleAsyncHTTPClient instance exists per IOLoop in order to provide limitations on the number of pending connections. + force_instance=True may be used to suppress this behavior. max_clients is the number of concurrent requests that can be in progress. max_simultaneous_connections has no effect and is accepted @@ -60,13 +63,16 @@ def __new__(cls, io_loop=None, max_clients=10, and will be ignored when an existing client is reused. """ io_loop = io_loop or IOLoop.instance() - if io_loop in cls._ASYNC_CLIENTS: + if io_loop in cls._ASYNC_CLIENTS and not force_instance: return cls._ASYNC_CLIENTS[io_loop] else: instance = super(SimpleAsyncHTTPClient, cls).__new__(cls) instance.io_loop = io_loop instance.max_clients = max_clients - cls._ASYNC_CLIENTS[io_loop] = instance + instance.queue = collections.deque() + instance.active = {} + if not force_instance: + cls._ASYNC_CLIENTS[io_loop] = instance return instance def close(self): @@ -78,7 +84,23 @@ def fetch(self, request, callback, **kwargs): if not isinstance(request.headers, HTTPHeaders): request.headers = HTTPHeaders(request.headers) callback = stack_context.wrap(callback) - _HTTPConnection(self.io_loop, request, callback) + self.queue.append((request, callback)) + self._process_queue() + + def _process_queue(self): + with stack_context.NullContext(): + while self.queue and len(self.active) < self.max_clients: + request, callback = self.queue.popleft() + key = object() + self.active[key] = (request, callback) + _HTTPConnection(self.io_loop, request, + functools.partial(self._on_fetch_complete, + key, callback)) + + def _on_fetch_complete(self, key, callback, response): + del self.active[key] + callback(response) + self._process_queue() diff --git a/tornado/test/simple_httpclient_test.py b/tornado/test/simple_httpclient_test.py index 24b67b024f..e8c35277a0 100644 --- a/tornado/test/simple_httpclient_test.py +++ b/tornado/test/simple_httpclient_test.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import collections import gzip import logging import socket @@ -36,14 +37,29 @@ class HangHandler(RequestHandler): def get(self): pass +class TriggerHandler(RequestHandler): + def initialize(self, queue, wake_callback): + self.queue = queue + self.wake_callback = wake_callback + + @asynchronous + def get(self): + logging.info("queuing trigger") + self.queue.append(self.finish) + self.wake_callback() + class SimpleHTTPClientTestCase(AsyncHTTPTestCase, LogTrapTestCase): def get_app(self): + # callable objects to finish pending /trigger requests + self.triggers = collections.deque() return Application([ ("/hello", HelloWorldHandler), ("/post", PostHandler), ("/chunk", ChunkHandler), ("/auth", AuthHandler), ("/hang", HangHandler), + ("/trigger", TriggerHandler, dict(queue=self.triggers, + wake_callback=self.stop)), ], gzip=True) def setUp(self): @@ -128,7 +144,33 @@ def test_singleton(self): # Class "constructor" reuses objects on the same IOLoop self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is SimpleAsyncHTTPClient(self.io_loop)) + # unless force_instance is used + self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is not + SimpleAsyncHTTPClient(self.io_loop, + force_instance=True)) # different IOLoops use different objects io_loop2 = IOLoop() self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is not SimpleAsyncHTTPClient(io_loop2)) + + def test_connection_limit(self): + client = SimpleAsyncHTTPClient(self.io_loop, max_clients=2, + force_instance=True) + self.assertEqual(client.max_clients, 2) + seen = [] + # Send 4 requests. Two can be sent immediately, while the others + # will be queued + for i in range(4): + client.fetch(self.get_url("/trigger"), + lambda response, i=i: (seen.append(i), self.stop())) + self.wait(condition=lambda: len(self.triggers) == 2) + self.assertEqual(len(client.queue), 2) + + # Finish the first two requests and let the next two through + self.triggers.popleft()() + self.triggers.popleft()() + self.wait(condition=lambda: (len(self.triggers) == 2 and + len(seen) == 2)) + self.assertEqual(seen, [0, 1]) + self.assertEqual(len(client.queue), 0) +