diff --git a/tornado/httpserver.py b/tornado/httpserver.py index 2a765a53fd..9f0dfee109 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -23,6 +23,7 @@ import ioloop import iostream import logging +import os import socket import time import urlparse @@ -82,17 +83,57 @@ def handle_request(request): "keyfile": os.path.join(data_dir, "mydomain.key"), }) + By default, listen() runs in a single thread in a single process. You + can utilize all available CPUs on this machine by calling bind() and + start() instead of listen(): + + http_server = httpserver.HTTPServer(handle_request) + http_server.bind(8888) + http_server.start() # Forks multiple sub-processes + ioloop.IOLoop.instance().start() + + start() detects the number of CPUs on this machine and "pre-forks" that + number of child processes so that we have one Tornado process per CPU, + all with their own IOLoop. You can also pass in the specific number of + child processes you want to run with if you want to override this + auto-detection. """ def __init__(self, request_callback, no_keep_alive=False, io_loop=None, xheaders=False, ssl_options=None): + """Initializes the server with the given request callback. + + If you use pre-forking/start() instead of the listen() method to + start your server, you should not pass an IOLoop instance to this + constructor. Each pre-forked child process will create its own + IOLoop instance after the forking process. + """ self.request_callback = request_callback self.no_keep_alive = no_keep_alive - self.io_loop = io_loop or ioloop.IOLoop.instance() + self.io_loop = io_loop self.xheaders = xheaders self.ssl_options = ssl_options self._socket = None + self._started = False def listen(self, port, address=""): + """Binds to the given port and starts the server in a single process. + + This method is a shortcut for: + + server.bind(port, address) + server.start(1) + + """ + self.bind(port, address) + self.start(1) + + def bind(self, port, address=""): + """Binds this server to the given port on the given IP address. + + To start the server, call start(). If you want to run this server + in a single process, you can call listen() as a shortcut to the + sequence of bind() and start() calls. + """ assert not self._socket self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD) @@ -102,8 +143,48 @@ def listen(self, port, address=""): self._socket.setblocking(0) self._socket.bind((address, port)) self._socket.listen(128) - self.io_loop.add_handler(self._socket.fileno(), self._handle_events, - self.io_loop.READ) + + def start(self, num_processes=None): + """Starts this server in the IOLoop. + + By default, we detect the number of cores available on this machine + and fork that number of child processes. If num_processes is given, we + fork that specific number of sub-processes. + + If num_processes is 1 or we detect only 1 CPU core, we run the server + in this process and do not fork any additional child process. + + Since we run use processes and not threads, there is no shared memory + between any server code. + """ + assert not self._started + self._started = True + if num_processes is None: + # Use sysconf to detect the number of CPUs (cores) + try: + num_processes = os.sysconf("SC_NPROCESSORS_CONF") + except ValueError: + logging.error("Could not get num processors from sysconf; " + "running with one process") + num_processes = 1 + if num_processes > 1 and ioloop.IOLoop.initialized(): + logging.error("Cannot run in multiple processes: IOLoop instance " + "has already been initialized. You cannot call " + "IOLoop.instance() before calling start()") + num_processes = 1 + if num_processes > 1: + logging.info("Pre-forking %d server processes", num_processes) + for i in range(num_processes): + if os.fork() == 0: + ioloop.IOLoop.instance().add_handler( + self._socket.fileno(), self._handle_events, + ioloop.IOLoop.READ) + return + os.waitpid(-1, 0) + else: + io_loop = self.io_loop or ioloop.IOLoop.instance() + io_loop.add_handler(self._socket.fileno(), self._handle_events, + ioloop.IOLoop.READ) def _handle_events(self, fd, events): while True: diff --git a/tornado/ioloop.py b/tornado/ioloop.py index ad43ca756e..11ea4469ef 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -116,6 +116,10 @@ def __init__(self, io_loop=None): cls._instance = cls() return cls._instance + @classmethod + def initialized(cls): + return hasattr(cls, "_instance") + def add_handler(self, fd, handler, events): """Registers the given handler to receive the given events for fd.""" self._handlers[fd] = handler diff --git a/tornado/web.py b/tornado/web.py index 27fc8f94c7..dccb3297a5 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -47,10 +47,12 @@ def get(self): import binascii import calendar import Cookie +import cStringIO import datetime import email.utils import escape import functools +import gzip import hashlib import hmac import httplib @@ -402,10 +404,18 @@ def flush(self, include_footers=False): """Flushes the current output buffer to the nextwork.""" if self.application._wsgi: raise Exception("WSGI applications do not support flush()") + + chunk = "".join(self._write_buffer) + self._write_buffer = [] if not self._headers_written: self._headers_written = True + for transform in self._transforms: + self._headers, chunk = transform.transform_first_chunk( + self._headers, chunk, include_footers) headers = self._generate_headers() else: + for transform in self._transforms: + chunk = transform.transform_chunk(chunk, include_footers) headers = "" # Ignore the chunk and only write the headers for HEAD requests @@ -413,22 +423,6 @@ def flush(self, include_footers=False): if headers: self.request.write(headers) return - if self._write_buffer: - chunk = "".join(self._write_buffer) - self._write_buffer = [] - if chunk: - # Don't write out empty chunks because that means - # END-OF-STREAM with chunked encoding - for transform in self._transforms: - chunk = transform.transform_chunk(chunk) - else: - chunk = "" - if include_footers: - footers = [] - for transform in self._transforms: - footer = transform.footer() - if footer: chunk += footer - if headers or chunk: self.request.write(headers + chunk) @@ -694,12 +688,9 @@ def _execute(self, transforms, *args, **kwargs): self._handle_request_exception(e) def _generate_headers(self): - headers = self._headers - for transform in self._transforms: - headers = transform.transform_headers(headers) lines = [self.request.version + " " + str(self._status_code) + " " + httplib.responses[self._status_code]] - lines.extend(["%s: %s" % (n, v) for n, v in headers.iteritems()]) + lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()]) for cookie_dict in getattr(self, "_new_cookies", []): for cookie in cookie_dict.values(): lines.append("Set-Cookie: " + cookie.OutputString(None)) @@ -859,7 +850,10 @@ class Application(object): def __init__(self, handlers=None, default_host="", transforms=None, wsgi=False, **settings): if transforms is None: - self.transforms = [ChunkedTransferEncoding] + self.transforms = [] + if settings.get("gzip"): + self.transforms.append(GZipContentEncoding) + self.transforms.append(ChunkedTransferEncoding) else: self.transforms = transforms self.handlers = [] @@ -1113,31 +1107,64 @@ def prepare(self): class OutputTransform(object): """A transform modifies the result of an HTTP request (e.g., GZip encoding) - A new transform instance is created for every request. The sequence of - calls is: - - t = Transform(request) # Constructor - # Request processing - headers = t.transform_headers(headers) - # Write headers - for block in result: - write(t.transform_chunk(block) - write(t.footer()) - - See the ChunkedTransferEncoding example below if you want to implement a + A new transform instance is created for every request. See the + ChunkedTransferEncoding example below if you want to implement a new Transform. """ def __init__(self, request): pass - def transform_headers(self, headers): - return headers + def transform_first_chunk(self, headers, chunk, finishing): + return headers, chunk - def transform_chunk(self, block): - return block + def transform_chunk(self, chunk, finishing): + return chunk - def footer(self): - return None + +class GZipContentEncoding(OutputTransform): + """Applies the gzip content encoding to the response. + + See http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.11 + """ + CONTENT_TYPES = set([ + "text/plain", "text/html", "text/css", "text/xml", + "application/x-javascript", "application/xml", "application/atom+xml", + "text/javascript", "application/json", "application/xhtml+xml"]) + MIN_LENGTH = 5 + + def __init__(self, request): + self._gzipping = request.supports_http_1_1() and \ + "gzip" in request.headers.get("Accept-Encoding", "") + + def transform_first_chunk(self, headers, chunk, finishing): + if self._gzipping: + ctype = headers.get("Content-Type", "").split(";")[0] + self._gzipping = (ctype in self.CONTENT_TYPES) and \ + (not finishing or len(chunk) >= self.MIN_LENGTH) and \ + (finishing or "Content-Length" not in headers) and \ + ("Content-Encoding" not in headers) + if self._gzipping: + headers["Content-Encoding"] = "gzip" + self._gzip_value = cStringIO.StringIO() + self._gzip_file = gzip.GzipFile(mode="w", fileobj=self._gzip_value) + self._gzip_pos = 0 + chunk = self.transform_chunk(chunk, finishing) + if "Content-Length" in headers: + headers["Content-Length"] = str(len(chunk)) + return headers, chunk + + def transform_chunk(self, chunk, finishing): + if self._gzipping: + self._gzip_file.write(chunk) + if finishing: + self._gzip_file.close() + else: + self._gzip_file.flush() + chunk = self._gzip_value.getvalue() + if self._gzip_pos > 0: + chunk = chunk[self._gzip_pos:] + self._gzip_pos += len(chunk) + return chunk class ChunkedTransferEncoding(OutputTransform): @@ -1148,26 +1175,25 @@ class ChunkedTransferEncoding(OutputTransform): def __init__(self, request): self._chunking = request.supports_http_1_1() - def transform_headers(self, headers): + def transform_first_chunk(self, headers, chunk, finishing): if self._chunking: # No need to chunk the output if a Content-Length is specified if "Content-Length" in headers or "Transfer-Encoding" in headers: self._chunking = False else: headers["Transfer-Encoding"] = "chunked" - return headers + chunk = self.transform_chunk(chunk, finishing) + return headers, chunk - def transform_chunk(self, block): + def transform_chunk(self, block, finishing): if self._chunking: - return ("%x" % len(block)) + "\r\n" + block + "\r\n" - else: - return block - - def footer(self): - if self._chunking: - return "0\r\n\r\n" - else: - return None + # Don't write out empty chunks because that means END-OF-STREAM + # with chunked encoding + if block: + block = ("%x" % len(block)) + "\r\n" + block + "\r\n" + if finishing: + block += "0\r\n\r\n" + return block def authenticated(method):