Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inversion of control for a streaming upload provider #48

Closed
samuelfekete opened this issue Oct 10, 2016 · 14 comments
Closed

Inversion of control for a streaming upload provider #48

samuelfekete opened this issue Oct 10, 2016 · 14 comments

Comments

@samuelfekete
Copy link
Contributor

I need a way to proxy streaming uploads, that is to forward bytes from the client as soon as they are received, without temp files. To do that, I need to have a file-like object given to the provider for reading - as opposed to the provider giving the request server a file-like object for writing.

I am thinking something along the lines of checking for a flag in doPUT, and if the provider prefers, just give it the reference to environ["wsgi.input"] for reading.

Any thoughts?
@mar10 @tomviner

@mar10
Copy link
Owner

mar10 commented Oct 11, 2016

I see the point.
PUT implementation would be trivial if there wasn't hotfixes for broken WebDAV clients and support for chunked transfer. Any change should maintain these capabilities.

I will look into it over the next days (don't hesitate to ping me if I don't ;-)

@mar10
Copy link
Owner

mar10 commented Oct 11, 2016

BTW: res.beginWrite() could be a stream (or anything that implements .write() and .close()) so you could pass a buffering wrapper for your proxy?

@samuelfekete
Copy link
Contributor Author

samuelfekete commented Oct 11, 2016

The problem is that HTTP libraries (e.g. requests) require an object that implements .read() for streamed requests, they do not provide anything that implements .write().

@tomviner
Copy link
Contributor

I've got an idea for solving this and some related requirements we have.

For each HTTP method dealt with in request_server.py we give the resource a chance to completely handle the WSGI response. For example for PUT:

class RequestServer(object):
    # ...
    def doPUT(self, environ, start_response):
        # ...
        res = provider.getResourceInst(path, environ)
        try:
            return res.overridePUT(environ, start_response)
        except NotImplemented:
            pass
        # continue as usual

This allows completely short circuiting the default functionality when required.

We'd add overridePUT and friends to the _DAVResource class and have each raise NotImplementedError.

This would allow:

  • full access to environ["wsgi.input"] as mentioned in this issue, to stream user uploads to another server
  • doing redirects on GET for when resources live on another server
  • providing functionality for POST requests

@mar10
Copy link
Owner

mar10 commented Oct 12, 2016

I was thinking about a similar approach

    def doPUT(self, environ, start_response):
        res = provider.getResourceInst(path, environ)
        return res.handlePUT(environ, start_response)

with the default implementation in DAVNonCollection

Having this for all other HTTP commands as well is a good idea, I didn't think of those use cases.
This would offer very general and flexible extension points.

For Samuel's use case however (which might be quite common) I would like to try to find a more elegant solution, than implementing a slightly modified copy to the 150 lines of code of doPUT.
(which handles fixes for broken clients and chunked transfers)

I wonder if it would be possible to implement have a buffer, and pass a BufferedWriter() to the PUT handler with beginWrite(). Then use a BufferedReader() for this buffer as input to requests.post(..., stream=True).
Makes sense? This is only a sketch of what I have in mind, I did not test anything. There also may be more efficient or obvious approaches.
It also does not solve the problem of chunked transfer yet.

@tomviner
Copy link
Contributor

Looking at the code for the chunked transfer as well as the non-chunked upload, it seems we could pull these out into a generator that accepts the environ and yields either one chunk at a time, or one DEFAULT_BLOCK_SIZE at a time.

The default handlePUT could then loop over this and call fileobj.write as it does now.

If the request is not chunked, we'd still prefer to pass environ["wsgi.input"] directly to our networking library, but if it is chunked, we can use the generator to form a readable object to pass on.

@mar10
Copy link
Owner

mar10 commented Oct 14, 2016

Thanks @samuelfekete, #51 looks good to me, I think we can merge soon.
I would like to explore a 'lightweight' approach as well, like using a pipe-object, or @tomviner 's approach, or some kind of callback/handler from within the PUT handler...
Since you have a real use case, do you think you can share your current custom handler and/or can test other approaches with and without chunked transfer?

@samuelfekete
Copy link
Contributor Author

samuelfekete commented Oct 14, 2016

@mar10, I have made another change to use generators, and I have isolated the code that writes to the file object in the small method processData. This can be overridden, for example, to just hand over the stream to the file object, so that the provider can read from it or forward it to a networking library. ebb0e99

@mar10
Copy link
Owner

mar10 commented Oct 23, 2016

I merged #51 , thank you all.

I also added a branch that explores another approach:

    def beginWrite(self, contentType=None):
        queue = FileLikeQueue(maxsize=1)
        requests.post(..., data=queue)
        return queue

It also includes some tests and adds requests to the dependencies.

Would this work in your use case as well?

@samuelfekete
Copy link
Contributor Author

samuelfekete commented Oct 23, 2016

Thank you.

Having quickly glanced at your branch I can tell you that callers (at least Boto3) assume the end of file was reached if read() returns less than requested.

The way I'm using it now, writelines() creates a file-like wrapper around the iterator, which looks like this:

class StreamingFile(object):
    """A file object wrapped around an iterator / data stream."""

    def __init__(self, data_stream):
        """Intialise the object with the data stream."""
        self.data_stream = data_stream
        self.buffer = ''

    def read(self, size=None):
        """Read bytes from an iterator."""
        while size is None or len(self.buffer) < size:
            try:
                self.buffer += self.data_stream.next()
            except StopIteration:
                break

        sized_chunk = self.buffer[:size]
        if size is None:
            self.buffer = ''
        else:
            self.buffer = self.buffer[size:]
return sized_chunk

@samuelfekete
Copy link
Contributor Author

@mar10, I think a queued file object would work as well for our use case. Though, what would be the advantage of that over simply reading from the iterators?

@mar10
Copy link
Owner

mar10 commented Oct 26, 2016

Your StreamingFile looks elegant, haven't thought about this approach.

I played with Queue, because it is thread save (though I wouldn't bet that my implementation still is).
Another benefit may be a kind of decoupling and buffering of chunks if maxsize > 1.
As you already mentioned, the sized reading is still missing.
A possible extension would be to have a version that supports next and iter, which would result in different behavior (i.e. chunked encoding) when passed to requests(... data=x) if I understood correctly.
Any way, this was just an experiment - I don't even have a real use case currently.
We may add both to addons.stream_tools, so others may benefit, and close this issue?

mar10 added a commit that referenced this issue Oct 26, 2016
* Add FileLikeQueue class and tests

* Move FileLikeQueue to filielike_queue module

* Rename filelike_queue to stream_tools

Update #48
@samuelfekete
Copy link
Contributor Author

Thanks @mar10. I'm fine with adding them to stream_tools and closing this issue.

@mar10
Copy link
Owner

mar10 commented Oct 29, 2016

@samuelfekete could add some lines to the docstring of StreamingFile, maybe a small example how to use it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants