Skip to content

Commit

Permalink
Merge pull request #3231 from bdarnell/wsgi-executor
Browse files Browse the repository at this point in the history
wsgi: Support ThreadPoolExecutor
  • Loading branch information
bdarnell committed Feb 16, 2023
2 parents 9ea5cdd + c34c0f3 commit 1f3f28a
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 30 deletions.
108 changes: 102 additions & 6 deletions tornado/test/wsgi_test.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,116 @@
import asyncio
import concurrent.futures
import threading

from wsgiref.validate import validator

from tornado.testing import AsyncHTTPTestCase
from tornado.routing import RuleRouter
from tornado.testing import AsyncHTTPTestCase, gen_test
from tornado.wsgi import WSGIContainer


class WSGIContainerTest(AsyncHTTPTestCase):
class WSGIAppMixin:
# TODO: Now that WSGIAdapter is gone, this is a pretty weak test.
def wsgi_app(self, environ, start_response):
def get_executor(self):
raise NotImplementedError()

def get_app(self):
executor = self.get_executor()
# The barrier test in DummyExecutorTest will always wait the full
# value of this timeout, so we don't want it to be too high.
self.barrier = threading.Barrier(2, timeout=0.3)

def make_container(app):
return WSGIContainer(validator(app), executor=executor)

return RuleRouter(
[
("/simple", make_container(self.simple_wsgi_app)),
("/barrier", make_container(self.barrier_wsgi_app)),
("/streaming_barrier", make_container(self.streaming_barrier_wsgi_app)),
]
)

def respond_plain(self, start_response):
status = "200 OK"
response_headers = [("Content-Type", "text/plain")]
start_response(status, response_headers)

def simple_wsgi_app(self, environ, start_response):
self.respond_plain(start_response)
return [b"Hello world!"]

def get_app(self):
return WSGIContainer(validator(self.wsgi_app))
def barrier_wsgi_app(self, environ, start_response):
self.respond_plain(start_response)
try:
n = self.barrier.wait()
except threading.BrokenBarrierError:
return [b"broken barrier"]
else:
return [b"ok %d" % n]

def streaming_barrier_wsgi_app(self, environ, start_response):
self.respond_plain(start_response)
yield b"ok "
try:
n = self.barrier.wait()
except threading.BrokenBarrierError:
yield b"broken barrier"
else:
yield b"%d" % n


class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase):
def get_executor(self):
return None

def test_simple(self):
response = self.fetch("/simple")
self.assertEqual(response.body, b"Hello world!")

@gen_test
async def test_concurrent_barrier(self):
self.barrier.reset()
resps = await asyncio.gather(
self.http_client.fetch(self.get_url("/barrier")),
self.http_client.fetch(self.get_url("/barrier")),
)
for resp in resps:
self.assertEqual(resp.body, b"broken barrier")

@gen_test
async def test_concurrent_streaming_barrier(self):
self.barrier.reset()
resps = await asyncio.gather(
self.http_client.fetch(self.get_url("/streaming_barrier")),
self.http_client.fetch(self.get_url("/streaming_barrier")),
)
for resp in resps:
self.assertEqual(resp.body, b"ok broken barrier")


class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase):
def get_executor(self):
return concurrent.futures.ThreadPoolExecutor()

def test_simple(self):
response = self.fetch("/")
response = self.fetch("/simple")
self.assertEqual(response.body, b"Hello world!")

@gen_test
async def test_concurrent_barrier(self):
self.barrier.reset()
resps = await asyncio.gather(
self.http_client.fetch(self.get_url("/barrier")),
self.http_client.fetch(self.get_url("/barrier")),
)
self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))

@gen_test
async def test_concurrent_streaming_barrier(self):
self.barrier.reset()
resps = await asyncio.gather(
self.http_client.fetch(self.get_url("/streaming_barrier")),
self.http_client.fetch(self.get_url("/streaming_barrier")),
)
self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))
108 changes: 84 additions & 24 deletions tornado/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
"""

import sys
import concurrent.futures
from io import BytesIO
import tornado
import sys

from tornado.concurrent import dummy_executor
from tornado import escape
from tornado import httputil
from tornado.ioloop import IOLoop
from tornado.log import access_log

from typing import List, Tuple, Optional, Callable, Any, Dict, Text
Expand All @@ -54,20 +57,28 @@ def to_wsgi_str(s: bytes) -> str:


class WSGIContainer(object):
r"""Makes a WSGI-compatible function runnable on Tornado's HTTP server.
r"""Makes a WSGI-compatible application runnable on Tornado's HTTP server.
.. warning::
WSGI is a *synchronous* interface, while Tornado's concurrency model
is based on single-threaded asynchronous execution. This means that
running a WSGI app with Tornado's `WSGIContainer` is *less scalable*
than running the same app in a multi-threaded WSGI server like
``gunicorn`` or ``uwsgi``. Use `WSGIContainer` only when there are
benefits to combining Tornado and WSGI in the same process that
outweigh the reduced scalability.
is based on single-threaded *asynchronous* execution. Many of Tornado's
distinguishing features are not available in WSGI mode, including efficient
long-polling and websockets. The primary purpose of `WSGIContainer` is
to support both WSGI applications and native Tornado ``RequestHandlers`` in
a single process. WSGI-only applications are likely to be better off
with a dedicated WSGI server such as ``gunicorn`` or ``uwsgi``.
Wrap a WSGI function in a `WSGIContainer` and pass it to `.HTTPServer` to
run it. For example::
Wrap a WSGI application in a `WSGIContainer` to make it implement the Tornado
`.HTTPServer` ``request_callback`` interface. The `WSGIContainer` object can
then be passed to classes from the `tornado.routing` module,
`tornado.web.FallbackHandler`, or to `.HTTPServer` directly.
This class is intended to let other frameworks (Django, Flask, etc)
run on the Tornado HTTP server and I/O loop.
Realistic usage will be more complicated, but the simplest possible example uses a
hand-written WSGI application with `.HTTPServer`::
def simple_app(environ, start_response):
status = "200 OK"
Expand All @@ -83,18 +94,46 @@ async def main():
asyncio.run(main())
This class is intended to let other frameworks (Django, web.py, etc)
run on the Tornado HTTP server and I/O loop.
The recommended pattern is to use the `tornado.routing` module to set up routing
rules between your WSGI application and, typically, a `tornado.web.Application`.
Alternatively, `tornado.web.Application` can be used as the top-level router
and `tornado.web.FallbackHandler` can embed a `WSGIContainer` within it.
If the ``executor`` argument is provided, the WSGI application will be executed
on that executor. This must be an instance of `concurrent.futures.Executor`,
typically a ``ThreadPoolExecutor`` (``ProcessPoolExecutor`` is not supported).
If no ``executor`` is given, the application will run on the event loop thread in
Tornado 6.3; this will change to use an internal thread pool by default in
Tornado 7.0.
The `tornado.web.FallbackHandler` class is often useful for mixing
Tornado and WSGI apps in the same server. See
https://github.com/bdarnell/django-tornado-demo for a complete example.
.. warning::
By default, the WSGI application is executed on the event loop's thread. This
limits the server to one request at a time (per process), making it less scalable
than most other WSGI servers. It is therefore highly recommended that you pass
a ``ThreadPoolExecutor`` when constructing the `WSGIContainer`, after verifying
that your application is thread-safe. The default will change to use a
``ThreadPoolExecutor`` in Tornado 7.0.
.. versionadded:: 6.3
The ``executor`` parameter.
.. deprecated:: 6.3
The default behavior of running the WSGI application on the event loop thread
is deprecated and will change in Tornado 7.0 to use a thread pool by default.
"""

def __init__(self, wsgi_application: "WSGIAppType") -> None:
def __init__(
self,
wsgi_application: "WSGIAppType",
executor: Optional[concurrent.futures.Executor] = None,
) -> None:
self.wsgi_application = wsgi_application
self.executor = dummy_executor if executor is None else executor

def __call__(self, request: httputil.HTTPServerRequest) -> None:
IOLoop.current().spawn_callback(self.handle_request, request)

async def handle_request(self, request: httputil.HTTPServerRequest) -> None:
data = {} # type: Dict[str, Any]
response = [] # type: List[bytes]

Expand All @@ -113,15 +152,33 @@ def start_response(
data["headers"] = headers
return response.append

app_response = self.wsgi_application(
WSGIContainer.environ(request), start_response
loop = IOLoop.current()
app_response = await loop.run_in_executor(
self.executor,
self.wsgi_application,
self.environ(request),
start_response,
)
try:
response.extend(app_response)
body = b"".join(response)
app_response_iter = iter(app_response)

def next_chunk() -> Optional[bytes]:
try:
return next(app_response_iter)
except StopIteration:
# StopIteration is special and is not allowed to pass through
# coroutines normally.
return None

while True:
chunk = await loop.run_in_executor(self.executor, next_chunk)
if chunk is None:
break
response.append(chunk)
finally:
if hasattr(app_response, "close"):
app_response.close() # type: ignore
body = b"".join(response)
if not data:
raise Exception("WSGI app did not call start_response")

Expand All @@ -147,9 +204,12 @@ def start_response(
request.connection.finish()
self._log(status_code, request)

@staticmethod
def environ(request: httputil.HTTPServerRequest) -> Dict[Text, Any]:
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment."""
def environ(self, request: httputil.HTTPServerRequest) -> Dict[Text, Any]:
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
.. versionchanged:: 6.3
No longer a static method.
"""
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
Expand All @@ -172,7 +232,7 @@ def environ(request: httputil.HTTPServerRequest) -> Dict[Text, Any]:
"wsgi.url_scheme": request.protocol,
"wsgi.input": BytesIO(escape.utf8(request.body)),
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multithread": self.executor is not dummy_executor,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
Expand Down

0 comments on commit 1f3f28a

Please sign in to comment.