New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of an end to end async pattern #71

Open
wants to merge 11 commits into
base: master
from

Conversation

3 participants
@yohanboniface
Member

yohanboniface commented Nov 12, 2018

This is a proof of concept, nothing really operational but a small example.

The goals are:

  • be able to consume a request body only if needed
  • be able to stream a request body (eg. to write it to a file)
  • be able to stream a response

On the read side, here is the pattern:

  • on_headers_complete => we call the handler (the view)
  • on_body => we save the current chunk (the body part of the first chunk of data we received) and we pause the socket flow
  • the flow is started again when request.read() is called
  • to consume a big request data, one would do async for data in request:

On the write side:

  • write is now an async function, so we should be able to iter over an async generator here to consume a stream (not yet in the POC)

We have two points here: impacts on the Roll API and technical implementation.

First questions regard the API (I guess the implementation details will depend on the API we want): the way to consume the request body is async, so we cannot have a @property like request.json, we should certainly switch to methods (so request.json() or something like that).
That breaks the "simple" spirit of Roll as it is now.
Plus, I'm not very happy with the idea that every handler should always call a read()or the like method to be able to access the body (maybe one should have an option on the Roll instance to decide if the body should be read automatically or not?).
Plus, that's one of the things I dislike in Falcon, which also have a stream based body access, and the user never now if the read() has been called or not, and if yes whether the data has been saved somewhere or not. For example, this is specifically annoying when dealing with middlewares.

So let's discuss and challenge all this!

@yohanboniface

This comment has been minimized.

Member

yohanboniface commented Nov 13, 2018

Quick brainstorming with @davidbgk on the API changes.

The philosophy is: keep the default usage simple, and allow more control when needed, while keeping the end-to-end async and only loading the body on RAM on demand.

Three main possible directions at this time:

  • transform each accessor in an async one (what is done in this PR), so instead of doing request.json one would do await request.json, plus of course a way to stream the request on demand (eg. to save a file without loading it in memory)
  • keep the current API as is (eg. request.json), but add an option await request.load() to load the body on RAM and make the accessors available
  • sniff the handler parameter to only pass the needed request elements, do the same with handler decorators and signals (middlewares), so one can consume the headers in a check_perms decorator and consume the body on the handler, if the decorator hasn't failed.

Here is a raw example of what an API could look with that latest option (very raw):

app = Roll()

@app.before_request
async def check_perms(headers: Headers):
    # do something with headers


@app.route('/pouet')
@check_perms
async def myhandler(form:Form, query:Query, response:Response):
    # do something with form and query

# Alternatively

@myotherhandler.before
@myhandler.before
async def check_perms(headers:Headers, url:Url):
    # do something with headers
@trollfot

This comment has been minimized.

Member

trollfot commented Dec 1, 2018

import inspect
from functools import wraps


def preprocessors(func):
    return getattr(func, '__before__', None)


def signature(func):
    if getattr(func, '__signature__', None) is None:
        func.__signature__ = inspect.signature(func, follow_wrapped=True)
    return func.__signature__


def signed(func):
    signature(func)
    return func


def before(*funcs):
    def persist_preprocessors(func):
        if preprocessors(func) is not None:
            raise RuntimeError(
                f'{func} is already annotated with `before` functions.')
        func.__before__ = [signed(f) for f in funcs]
        return signed(func)
    return persist_preprocessors


def catcher(func):
    @wraps(func)
    def pdb_catch(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except:
            import pdb
            pdb.set_trace()
    return pdb_catch


available = {
    'request': 'riki',
}

route_vars = {
    'userid': 123,
    'category': 'Sport'
}


def func_args(s, **vars):
    for name, param in s.parameters.items():
        if param.kind in (param.POSITIONAL_OR_KEYWORD, param.VAR_POSITIONAL):
            if name in available:
                yield name, available[name]
            elif name == 'body':
                available['body'] = 'bibi'
                yield name, available[name]
            elif name in vars:
                yield name, vars.pop(name)
        else:
            for name, value in vars.items():
                yield name, value


def apply(func):
    s = signature(func)
    args = dict(func_args(s, **route_vars))
    bound = s.bind(**args)
    return func(*bound.args, **bound.kwargs)


def process(func):
    preprocs = preprocessors(func)
    if preprocs is not None:
        for preproc in preprocs:
            apply(preproc)
    apply(func)


def preprocess(request):
    print(f'Preprocessing with {request}')


@before(preprocess)
def test(request, body, userid, **vars):
    print(f'Processing with {request}, {body}, {userid}, {vars}')


@before(preprocess)
@catcher
def test2(request, body, userid, **vars):
    print(f'Processing with {request}, {body}, {userid}, {vars}')
    

def faulty(request, john):
    pass
@yohanboniface

This comment has been minimized.

Member

yohanboniface commented on 503a244 Dec 7, 2018

👍

@yohanboniface

This comment has been minimized.

Member

yohanboniface commented on roll/http.py in 503a244 Dec 7, 2018

This should be run even if body is already bytes :)

trollfot added some commits Dec 7, 2018

@trollfot

This comment has been minimized.

Member

trollfot commented Dec 7, 2018

Merged the "preprocessors" + "signature/ondemand handlers arguments" + "streaming response"

@davidbgk

Looks good to me so far, the more I see that annotation thing the more I like it! 👍

@@ -13,11 +16,40 @@
traceback(app)
cheering = os.path.join(os.path.dirname(__file__), 'crowd-cheering.mp3')

This comment has been minimized.

@davidbgk

davidbgk Dec 7, 2018

Contributor

🙌

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

I'm a pathlib fan:

cheering = Path(__file__).parent / 'crowd-cheering.mp3'

cheering = os.path.join(os.path.dirname(__file__), 'crowd-cheering.mp3')
async def file_iterator(path):

This comment has been minimized.

@davidbgk

davidbgk Dec 7, 2018

Contributor

Don't we want that one as an extension somehow?

async def file_iterator(path):
async with AIOFile(path, 'rb') as afp:
reader = Reader(afp, chunk_size=4096)

This comment has been minimized.

@davidbgk

davidbgk Dec 7, 2018

Contributor

If yes, maybe set that magic number as a parameter?

@app.route('/app', methods=['GET'])
async def hello_app(response, app, query, form, body, **rest):
response.body = f"{app.__class__.__name__}\n"
print(rest, query, form, body)

This comment has been minimized.

@davidbgk

davidbgk Dec 7, 2018

Contributor

Do we want to keep that print statement?

strict_parsing=True)
parsed_qs = parse_qs((await self.read()).decode(),
keep_blank_values=True, strict_parsing=True)
print(parsed_qs)

This comment has been minimized.

@davidbgk

davidbgk Dec 7, 2018

Contributor

To be removed.

return func
def before(*funcs):

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

I'd move it to the Roll class, so it's exposed like route:

app.route(…)
app.before(…)
async def myhandler():
    …
return persist_preprocessors
async def process(func, request, response):

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

I'd also move process to Roll, as one of the Roll promise is that it's extendable. So even if we didn't provide a given hook, one can still extends whatever class and make it's own business.

async def process(func, request, response):
async def func_args(s, **route_vars):

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

build_args ?

preprocs = preprocessors(func)
if preprocs is not None:
for preproc in preprocs:
await apply(preproc)

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

Should we act like for hooks, and check for return value, and if True we stop processing ?

elif name == 'response':
yield name, response
elif name in request.__namespace__:
member = getattr(request, name)

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

Downside of this pattern: we don't cache the result, so if various processors or hooks or handlers ask for the same piece, we will compute it each time.

Also, while I'm still thinking about it, I'm looking toward a pattern a bit different: we don't make request.json, request.query… coroutines. Instead, we will await the body before returning any of those if they are requested, and then cache the body (and maybe any of those derived formats).
So if some preprocessor or hook or handler ask for json or files or whatever, this will be an in-memory computed data.

So if one does not want the body to be awaited, they should not ask for anything that read it and only ask for headers or url or raw_body or such.

I'm still balanced about the request param itself: should it still exist? should it be the raw request (body is not awaited)? or should it be the request as it is now? That last version sort of have my preference for now, because:

  • it's retrocompatible (which is not mandatory but makes life easier)
  • the default usage should be kept simple, and at the end I only see two (good but still) patterns where the body should not be loaded by us:
    • one wants to check the headers or url before deciding to load it: this is done by only asking for headers or url, being in a preprocessor or hook or handler.
    • one wants to stream the body (to deal with big files eg.): and this should also be done by asking something specific (as it's not the default), that may be raw_body or something
for name, value in route_vars.items():
yield name, value
async def apply(func):

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

At some point, we'll need to extend this to hooks too.

@app.route('/cheer')
async def cheer_for_streaming(response):
filename = os.path.basename(cheering)
response.body = file_iterator(cheering)

This comment has been minimized.

@yohanboniface

yohanboniface Dec 8, 2018

Member

Alternative pattern suggestion (food for thought):

async for data in file_iterator(cheering):
    await response.write()

That would roughly means:

  • we move the write logic to the Response
  • when the user never call write themself, we do it
  • when the user call write, at first call it also write response headers

I'd love to see the Reponse fulfill the file API, so we could pass it wherever a file is expected to be write, eg.:

reader = csv.reader(response)
for something in somewhere:
    await reader.write(something)
@yohanboniface

This comment has been minimized.

Member

yohanboniface commented Dec 8, 2018

Challenging the challenge, what about something like this instead of naming each neaded property as a different param:

async def myhandler(request: Request['headers', 'body']):
    pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment