Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Support for streaming request body data in POST and PUT requests #685

Closed
wants to merge 18 commits into from

8 participants

@nephics

This pull request contains the streambody branch, providing support for streaming request body data in POST and PUT requests, see the "demos" folder example usage.

The code has been been running on production servers without problems in the last year.

@bdarnell
Owner

This looks cool, but I just want to say for now that we're trying to wrap up the 3.0 release right now, so I'm going to hold off on reviewing and merging this until that's done.

@D1plo1d

+1 to streaming uploads, this is exactly what I need. Now that 3.0 is released, is there any chance of this getting merged?

@bdarnell
Owner

3.1 is already out the door, but now I want to get this or something like it into 3.2.

@nephics

I've merged Tornado 3.1 into the streambody branch.

@D1plo1d

bump

I've merged the latest facebook master with nephics's streambody if anyone needs a quick git remote for their project: https://github.com/D1plo1d/tornado

(the merge was automatic / trivial)

@kzahel

Perhaps it makes sense to think about this: #862
Chunked encoding is the perfect example of a POST body that NEEDS to be streamed.
The interface might be something like on_{{method}}headers, on{{method}}chunk, on{{method}}_complete or similar.

@Blender3D

Any updates? I don't know of any Python web frameworks support this.

@iamtio

Is there any chance of this getting merged?

@bdarnell
Owner

I'm working on a broader refactoring of the HTTP machinery that will include the ability to handle streaming uploads. I hope to have something to show soon.

@bdarnell
Owner

The big HTTP refactoring has landed in the master branch. See the tornado.web.stream_request_body decorator.

@bdarnell bdarnell closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 3, 2012
  1. @jcbsnd
Commits on Jan 5, 2012
  1. @jcbsnd
Commits on Jan 24, 2012
  1. @jcbsnd
  2. @jcbsnd
Commits on Jan 28, 2012
  1. @jcbsnd
  2. @jcbsnd
  3. @jcbsnd
Commits on Jan 31, 2012
  1. @jcbsnd
Commits on May 3, 2012
  1. @jcbsnd

    Update to v2.2.1 by merge of commit '235cb17a0ae1b23d4eee592f358bf847…

    jcbsnd authored
    …07d89edc' into streambody
Commits on Jun 4, 2012
  1. @jcbsnd

    Update to version 2.3 by merging commit '13598908a7872080c6cc20b0dcfb…

    jcbsnd authored
    …cb0ce89a3447' into streambody
Commits on Sep 20, 2012
  1. @jcbsnd

    Update to version 2.4

    jcbsnd authored
    Merge commit '648bebf0eeb220ebfaa8200bab3bac41add9d650' into streambody
    
    Conflicts:
    	tornado/httpserver.py
Commits on Dec 19, 2012
  1. @jcbsnd

    Update to the newest commit in the main Tornado repository

    jcbsnd authored
    Merge branch 'master' of git://github.com/facebook/tornado into streambody
    
    Fixed conflicts for:
    	tornado/httpserver.py
    	tornado/web.py
  2. @jcbsnd
Commits on Jun 18, 2013
  1. @jcbsnd
  2. @jcbsnd
Commits on Sep 10, 2013
  1. @JohnVinyard

    Fix bug whereby asynchronous decorator is applied to put and post met…

    JohnVinyard authored
    …hods on each class instantiation
  2. @jcbsnd

    Merge pull request #1 from JohnVinyard/streambody

    jcbsnd authored
    Fix bug whereby asynchronous decorator is applied to put and post method...
  3. @jcbsnd
This page is out of date. Refresh to see the latest.
View
54 demos/streambody/client.py
@@ -0,0 +1,54 @@
+'''Demo code of the functionality in the Tornado "streambody" branch,
+providing support for streaming request body data in POST and PUT requests.
+
+The streambody branch is available at:
+https://github.com/nephics/tornado
+
+Run the demo by first starting the server and then the client.
+'''
+
+import os
+import sys
+# use the local version of tornado
+sys.path.insert(0, os.path.abspath('../..'))
+
+import hashlib
+import tornado.httpclient as httpclient
+
+
+def upload(path, body):
+ url = 'http://localhost:8888' + path
+ http_client = httpclient.HTTPClient()
+ request = httpclient.HTTPRequest(url, 'PUT', body=body,
+ headers={'Content-Type': 'text/plain'})
+ try:
+ response = http_client.fetch(request)
+ # the server's calculated md5 hash is in the second line of the response
+ msg, md5 = response.body.decode('utf8').split('\n')
+ print(msg)
+ return md5
+ except httpclient.HTTPError as e:
+ print("Error:", e)
+ sys.exit()
+
+def main():
+ # generate 350k of random data, and remeber the MD5 hash
+ body = os.urandom(350000)
+ md5_orig = hashlib.md5(body).hexdigest()
+
+ # upload using normal upload handler
+ md5 = upload('/', body)
+ if md5 != md5_orig:
+ print('!! Hash mismatch with default upload handler !!')
+ return
+
+ # upload using streambody handler
+ md5 = upload('/stream', body)
+ if md5 != md5_orig:
+ print('!! Hash mismatch with streambody handler !!')
+ return
+
+ print('Hashes of uploaded data match the original.')
+
+if __name__ == '__main__':
+ main()
View
67 demos/streambody/server.py
@@ -0,0 +1,67 @@
+'''Demo code of the functionality in the Tornado "streambody" branch,
+providing support for streaming request body data in POST and PUT requests.
+
+The streambody branch is available at:
+https://github.com/nephics/tornado
+
+Run the demo by first starting the server and then the client.
+'''
+
+import os.path
+import sys
+# use the local version of tornado
+sys.path.insert(0, os.path.abspath('../..'))
+
+import hashlib
+import logging
+
+logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)-6s: %(levelname)s - %(message)s')
+
+
+import tornado.ioloop
+import tornado.web
+
+
+class MainHandler(tornado.web.RequestHandler):
+ def put(self):
+ md5 = hashlib.md5(self.request.body)
+ self.write('Default body handler: received %d bytes\n%s'
+ % (len(self.request.body), md5.hexdigest()))
+
+
+@tornado.web.stream_body
+class StreamHandler(tornado.web.RequestHandler):
+
+ def put(self):
+ self.read_bytes = 0
+ self.request.request_continue()
+ self.read_chunks()
+ self.md5 = hashlib.md5()
+
+ def read_chunks(self, chunk=''):
+ self.read_bytes += len(chunk)
+ if chunk:
+ logging.info('Received {} bytes'.format(len(chunk)))
+ self.md5.update(chunk)
+ chunk_length = min(100000,
+ self.request.content_length - self.read_bytes)
+ if chunk_length > 0:
+ self.request.connection.stream.read_bytes(
+ chunk_length, self.read_chunks)
+ else:
+ self.uploaded()
+
+ def uploaded(self):
+ self.write('Stream body handler: received %d bytes\n%s'
+ % (self.read_bytes, self.md5.hexdigest()))
+ self.finish()
+
+
+if __name__ == "__main__":
+ application = tornado.web.Application([
+ (r"/", MainHandler),
+ (r"/stream", StreamHandler),
+ ])
+ application.listen(8888)
+ tornado.ioloop.IOLoop.instance().start()
View
31 tornado/httpserver.py
@@ -319,10 +319,7 @@ def _on_headers(self, data):
content_length = int(content_length)
if content_length > self.stream.max_buffer_size:
raise _BadRequestException("Content-Length too long")
- if headers.get("Expect") == "100-continue":
- self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
- self.stream.read_bytes(content_length, self._on_request_body)
- return
+ self._request.content_length = content_length
self.request_callback(self._request)
except _BadRequestException as e:
@@ -331,14 +328,6 @@ def _on_headers(self, data):
self.close()
return
- def _on_request_body(self, data):
- self._request.body = data
- if self._request.method in ("POST", "PATCH", "PUT"):
- httputil.parse_body_arguments(
- self._request.headers.get("Content-Type", ""), data,
- self._request.arguments, self._request.files)
- self.request_callback(self._request)
-
class HTTPRequest(object):
"""A single HTTP request.
@@ -458,6 +447,24 @@ def __init__(self, method, uri, version="HTTP/1.0", headers=None,
self.path, sep, self.query = uri.partition('?')
self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
+ def request_continue(self):
+ '''Send a 100-Continue, telling the client to send the request body'''
+ if self.headers.get("Expect") == "100-continue":
+ self.connection.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
+
+ def _read_body(self, exec_req_cb):
+ self.request_continue()
+ self.connection.stream.read_bytes(self.content_length,
+ lambda data: self._on_request_body(data, exec_req_cb))
+
+ def _on_request_body(self, data, exec_req_cb):
+ self.body = data
+ if self.method in ("POST", "PATCH", "PUT"):
+ httputil.parse_body_arguments(
+ self.headers.get("Content-Type", ""), data,
+ self.arguments, self.files)
+ exec_req_cb()
+
def supports_http_1_1(self):
"""Returns True if this request supports HTTP/1.1 semantics"""
return self.version == "HTTP/1.1"
View
63 tornado/web.py
@@ -1123,6 +1123,18 @@ def _execute(self, transforms, *args, **kwargs):
try:
if self.request.method not in self.SUPPORTED_METHODS:
raise HTTPError(405)
+ # read and parse the request body, if not disabled
+ exec_req_cb = lambda: self._execute_request(*args, **kwargs)
+ if (getattr(self, '_read_body', True) and
+ hasattr(self.request, 'content_length')):
+ self.request._read_body(exec_req_cb)
+ else:
+ exec_req_cb()
+ except Exception as e:
+ self._handle_request_exception(e)
+
+ def _execute_request(self, *args, **kwargs):
+ try:
self.path_args = [self.decode_argument(arg) for arg in args]
self.path_kwargs = dict((k, self.decode_argument(v, name=k))
for (k, v) in kwargs.items())
@@ -1252,6 +1264,57 @@ def _clear_headers_for_304(self):
self.clear_header(h)
+def stream_body(cls):
+ """Wrap RequestHandler classes with this to prevent the
+ request body of PUT and POST request from being read and parsed
+ automatically.
+
+ If this decorator is given, the request body is not read and parsed
+ when a PUT or POST handler method is executed. It is up to the request
+ handler to read the body from the stream in the HTTP connection.
+
+ Using this decorator automatically implies the asynchronous decorator.
+
+ Without this decorator, the request body is automatically read and
+ parsed before a PUT or POST method is executed. ::
+
+ @web.stream_body
+ class StreamHandler(web.RequestHandler):
+
+ def put(self):
+ self.read_bytes = 0
+ self.request.request_continue()
+ self.read_chunks()
+
+ def read_chunks(self, chunk=''):
+ self.read_bytes += len(chunk)
+ chunk_length = min(10000,
+ self.request.content_length - self.read_bytes)
+ if chunk_length > 0:
+ self.request.connection.stream.read_bytes(
+ chunk_length, self.read_chunks)
+ else:
+ self.uploaded()
+
+ def uploaded(self):
+ self.write('Uploaded %d bytes' % self.read_bytes)
+ self.finish()
+
+ """
+ if hasattr(cls, 'post'):
+ cls.post = asynchronous(cls.post)
+ if hasattr(cls, 'put'):
+ cls.put = asynchronous(cls.put)
+
+ class StreamBody(cls):
+ def __init__(self, *args, **kwargs):
+ if args[0]._wsgi:
+ raise Exception("@stream_body is not supported for WSGI apps")
+ self._read_body = False
+ cls.__init__(self, *args, **kwargs)
+ return StreamBody
+
+
def asynchronous(method):
"""Wrap request handler methods with this if they are asynchronous.
Something went wrong with that request. Please try again.