Skip to content

Commit

Permalink
Merge pull request #697 from 38elements/stream
Browse files Browse the repository at this point in the history
Add Request.stream
  • Loading branch information
seemethere committed May 24, 2017
2 parents c6d6800 + a50d842 commit 48de321
Show file tree
Hide file tree
Showing 13 changed files with 733 additions and 28 deletions.
76 changes: 75 additions & 1 deletion docs/sanic/streaming.md
@@ -1,5 +1,79 @@
# Streaming

## Request Streaming

Sanic allows you to get request data by stream, as below. When the request ends, `request.stream.get()` returns `None`. Only post, put and patch decorator have stream argument.

```python
from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text

bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream')


class SimpleView(HTTPMethodView):

@stream_decorator
async def post(self, request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)


@app.post('/stream', stream=True)
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
response.write(body)
return stream(streaming)


@bp.put('/bp_stream', stream=True)
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)


async def post_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)

app.blueprint(bp)
app.add_route(SimpleView.as_view(), '/method_view')
view = CompositionView()
view.add(['POST'], post_handler, stream=True)
app.add_route(view, '/composition_view')


if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)
```

## Response Streaming

Sanic allows you to stream content to the client with the `stream` method. This method accepts a coroutine callback which is passed a `StreamingHTTPResponse` object that is written to. A simple example is like follows:

```python
Expand Down Expand Up @@ -29,4 +103,4 @@ async def index(request):
response.write(record[0])

return stream(stream_from_db)
```
```
10 changes: 10 additions & 0 deletions examples/request_stream/client.py
@@ -0,0 +1,10 @@
import requests

# Warning: This is a heavy process.

data = ""
for i in range(1, 250000):
data += str(i)

r = requests.post('http://127.0.0.1:8000/stream', data=data)
print(r.text)
65 changes: 65 additions & 0 deletions examples/request_stream/server.py
@@ -0,0 +1,65 @@
from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text

bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream')


class SimpleView(HTTPMethodView):

@stream_decorator
async def post(self, request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)


@app.post('/stream', stream=True)
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.get()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
response.write(body)
return stream(streaming)


@bp.put('/bp_stream', stream=True)
async def bp_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)


async def post_handler(request):
result = ''
while True:
body = await request.stream.get()
if body is None:
break
result += body.decode('utf-8')
return text(result)

app.blueprint(bp)
app.add_route(SimpleView.as_view(), '/method_view')
view = CompositionView()
view.add(['POST'], post_handler, stream=True)
app.add_route(view, '/composition_view')


if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)
36 changes: 27 additions & 9 deletions sanic/app.py
Expand Up @@ -60,6 +60,7 @@ def __init__(self, name=None, router=None, error_handler=None,
self.sock = None
self.listeners = defaultdict(list)
self.is_running = False
self.is_request_stream = False
self.websocket_enabled = False
self.websocket_tasks = []

Expand Down Expand Up @@ -110,12 +111,14 @@ def decorator(listener):

# Decorator
def route(self, uri, methods=frozenset({'GET'}), host=None,
strict_slashes=False):
strict_slashes=False, stream=False):
"""Decorate a function to be registered as a route
:param uri: path of the URL
:param methods: list or tuple of methods allowed
:param host:
:param strict_slashes:
:param stream:
:return: decorated function
"""

Expand All @@ -124,7 +127,12 @@ def route(self, uri, methods=frozenset({'GET'}), host=None,
if not uri.startswith('/'):
uri = '/' + uri

if stream:
self.is_request_stream = True

def response(handler):
if stream:
handler.is_stream = stream
self.router.add(uri=uri, methods=methods, handler=handler,
host=host, strict_slashes=strict_slashes)
return handler
Expand All @@ -136,13 +144,13 @@ def get(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=frozenset({"GET"}), host=host,
strict_slashes=strict_slashes)

def post(self, uri, host=None, strict_slashes=False):
def post(self, uri, host=None, strict_slashes=False, stream=False):
return self.route(uri, methods=frozenset({"POST"}), host=host,
strict_slashes=strict_slashes)
strict_slashes=strict_slashes, stream=stream)

def put(self, uri, host=None, strict_slashes=False):
def put(self, uri, host=None, strict_slashes=False, stream=False):
return self.route(uri, methods=frozenset({"PUT"}), host=host,
strict_slashes=strict_slashes)
strict_slashes=strict_slashes, stream=stream)

def head(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=frozenset({"HEAD"}), host=host,
Expand All @@ -152,9 +160,9 @@ def options(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=frozenset({"OPTIONS"}), host=host,
strict_slashes=strict_slashes)

def patch(self, uri, host=None, strict_slashes=False):
def patch(self, uri, host=None, strict_slashes=False, stream=False):
return self.route(uri, methods=frozenset({"PATCH"}), host=host,
strict_slashes=strict_slashes)
strict_slashes=strict_slashes, stream=stream)

def delete(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=frozenset({"DELETE"}), host=host,
Expand All @@ -173,20 +181,28 @@ def add_route(self, handler, uri, methods=frozenset({'GET'}), host=None,
:param host:
:return: function or class instance
"""
stream = False
# Handle HTTPMethodView differently
if hasattr(handler, 'view_class'):
methods = set()

for method in HTTP_METHODS:
if getattr(handler.view_class, method.lower(), None):
_handler = getattr(handler.view_class, method.lower(), None)
if _handler:
methods.add(method)
if hasattr(_handler, 'is_stream'):
stream = True

# handle composition view differently
if isinstance(handler, CompositionView):
methods = handler.handlers.keys()
for _handler in handler.handlers.values():
if hasattr(_handler, 'is_stream'):
stream = True
break

self.route(uri=uri, methods=methods, host=host,
strict_slashes=strict_slashes)(handler)
strict_slashes=strict_slashes, stream=stream)(handler)
return handler

# Decorator
Expand Down Expand Up @@ -664,6 +680,8 @@ def _helper(self, host=None, port=None, debug=False,
server_settings = {
'protocol': protocol,
'request_class': self.request_class,
'is_request_stream': self.is_request_stream,
'router': self.router,
'host': host,
'port': port,
'sock': sock,
Expand Down
24 changes: 13 additions & 11 deletions sanic/blueprints.py
Expand Up @@ -5,7 +5,7 @@

FutureRoute = namedtuple('Route',
['handler', 'uri', 'methods',
'host', 'strict_slashes'])
'host', 'strict_slashes', 'stream'])
FutureListener = namedtuple('Listener', ['handler', 'uri', 'methods', 'host'])
FutureMiddleware = namedtuple('Route', ['middleware', 'args', 'kwargs'])
FutureException = namedtuple('Route', ['handler', 'args', 'kwargs'])
Expand Down Expand Up @@ -47,7 +47,8 @@ def register(self, app, options):
uri=uri[1:] if uri.startswith('//') else uri,
methods=future.methods,
host=future.host or self.host,
strict_slashes=future.strict_slashes
strict_slashes=future.strict_slashes,
stream=future.stream
)(future.handler)

for future in self.websocket_routes:
Expand Down Expand Up @@ -88,14 +89,15 @@ def register(self, app, options):
app.listener(event)(listener)

def route(self, uri, methods=frozenset({'GET'}), host=None,
strict_slashes=False):
strict_slashes=False, stream=False):
"""Create a blueprint route from a decorated function.
:param uri: endpoint at which the route will be accessible.
:param methods: list of acceptable HTTP methods.
"""
def decorator(handler):
route = FutureRoute(handler, uri, methods, host, strict_slashes)
route = FutureRoute(
handler, uri, methods, host, strict_slashes, stream)
self.routes.append(route)
return handler
return decorator
Expand Down Expand Up @@ -132,7 +134,7 @@ def websocket(self, uri, host=None, strict_slashes=False):
:param uri: endpoint at which the route will be accessible.
"""
def decorator(handler):
route = FutureRoute(handler, uri, [], host, strict_slashes)
route = FutureRoute(handler, uri, [], host, strict_slashes, False)
self.websocket_routes.append(route)
return handler
return decorator
Expand Down Expand Up @@ -195,13 +197,13 @@ def get(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=["GET"], host=host,
strict_slashes=strict_slashes)

def post(self, uri, host=None, strict_slashes=False):
def post(self, uri, host=None, strict_slashes=False, stream=False):
return self.route(uri, methods=["POST"], host=host,
strict_slashes=strict_slashes)
strict_slashes=strict_slashes, stream=stream)

def put(self, uri, host=None, strict_slashes=False):
def put(self, uri, host=None, strict_slashes=False, stream=False):
return self.route(uri, methods=["PUT"], host=host,
strict_slashes=strict_slashes)
strict_slashes=strict_slashes, stream=stream)

def head(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=["HEAD"], host=host,
Expand All @@ -211,9 +213,9 @@ def options(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=["OPTIONS"], host=host,
strict_slashes=strict_slashes)

def patch(self, uri, host=None, strict_slashes=False):
def patch(self, uri, host=None, strict_slashes=False, stream=False):
return self.route(uri, methods=["PATCH"], host=host,
strict_slashes=strict_slashes)
strict_slashes=strict_slashes, stream=stream)

def delete(self, uri, host=None, strict_slashes=False):
return self.route(uri, methods=["DELETE"], host=host,
Expand Down
3 changes: 2 additions & 1 deletion sanic/request.py
Expand Up @@ -45,7 +45,7 @@ class Request(dict):
__slots__ = (
'app', 'headers', 'version', 'method', '_cookies', 'transport',
'body', 'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files',
'_ip', '_parsed_url', 'uri_template'
'_ip', '_parsed_url', 'uri_template', 'stream'
)

def __init__(self, url_bytes, headers, version, method, transport):
Expand All @@ -66,6 +66,7 @@ def __init__(self, url_bytes, headers, version, method, transport):
self.parsed_args = None
self.uri_template = None
self._cookies = None
self.stream = None

@property
def json(self):
Expand Down
11 changes: 11 additions & 0 deletions sanic/router.py
Expand Up @@ -345,3 +345,14 @@ def _get(self, url, method, host):
if hasattr(route_handler, 'handlers'):
route_handler = route_handler.handlers[method]
return route_handler, [], kwargs, route.uri

def is_stream_handler(self, request):
""" Handler for request is stream or not.
:param request: Request object
:return: bool
"""
handler = self.get(request)[0]
if (hasattr(handler, 'view_class') and
hasattr(handler.view_class, request.method.lower())):
handler = getattr(handler.view_class, request.method.lower())
return hasattr(handler, 'is_stream')

0 comments on commit 48de321

Please sign in to comment.