New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

explicit chunk control #1340

Merged
merged 1 commit into from Dec 17, 2015

Conversation

Projects
None yet
3 participants
@yannick
Contributor

yannick commented Nov 29, 2015

for the implementation of a S3 Library we needed more granular control over the http chunks.
( see https://github.com/tamediadigital/vibe-aws/tree/feature-s3 )

this PR should not break any existing api but just adds the methods to manipulate the chunks headers.

@mfrischknecht

This comment has been minimized.

Show comment
Hide comment
@mfrischknecht

mfrischknecht Nov 30, 2015

Contributor

this PR should not break any existing api

It might be necessary to note that I changed an implicit behavior in the finalize() method: It is now possible to call it multiple times without triggering an exception. The reason for this is that vibe.d's HTTP server internally always does this while I also used the method to transmit the final (0-sized) chunk.
This is mainly due to the fact that S3 requires chunk signatures for every chunk (including the last) and I had to prevent the stream from sending a second 0-sized chunk when the finalize() Is called by the server.

Contributor

mfrischknecht commented Nov 30, 2015

this PR should not break any existing api

It might be necessary to note that I changed an implicit behavior in the finalize() method: It is now possible to call it multiple times without triggering an exception. The reason for this is that vibe.d's HTTP server internally always does this while I also used the method to transmit the final (0-sized) chunk.
This is mainly due to the fact that S3 requires chunk signatures for every chunk (including the last) and I had to prevent the stream from sending a second 0-sized chunk when the finalize() Is called by the server.

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 2, 2015

Member

I'm just a little worried about the API changes and wonder if a slightly more general solution would work. What about defining a callback string delegate(const(ubyte)[] data) chunkExtensionCallback that is called for every chunk? That would mean that the ordinary stream API continues to work in that scenario, too.

Member

s-ludwig commented Dec 2, 2015

I'm just a little worried about the API changes and wonder if a slightly more general solution would work. What about defining a callback string delegate(const(ubyte)[] data) chunkExtensionCallback that is called for every chunk? That would mean that the ordinary stream API continues to work in that scenario, too.

@mfrischknecht

This comment has been minimized.

Show comment
Hide comment
@mfrischknecht

mfrischknecht Dec 2, 2015

Contributor

I think that approach should work for the S3 use case. I'll have a go at the implementation and update the branch accordingly.

Contributor

mfrischknecht commented Dec 2, 2015

I think that approach should work for the S3 use case. I'll have a go at the implementation and update the branch accordingly.

@mfrischknecht

This comment has been minimized.

Show comment
Hide comment
@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Okay, I reverted the first couple of commits and added the delegate property instead. There are a couple of points worth looking at though.
To get rid of the reverts, I could squash the commits together into one if you are statisfied with the solution as is.

Contributor

mfrischknecht commented Dec 9, 2015

Okay, I reverted the first couple of commits and added the delegate property instead. There are a couple of points worth looking at though.
To get rid of the reverts, I could squash the commits together into one if you are statisfied with the solution as is.

@@ -426,11 +426,13 @@ final class ChunkedInputStream : InputStream {
Outputs data to an output stream in HTTP chunked format.
*/
final class ChunkedOutputStream : OutputStream {
alias ChunkExtensionCallback = string delegate(in ubyte[] data);

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Note that I changed the delegate's signature to taking an argument of in ubyte[] instead of const(ubyte)[]. This should prove to be especially useful for a VibeManualMemoryManagement version later on (and, in fact, already is now, since one write() overload also takes in ubyte[]).

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Note that I changed the delegate's signature to taking an argument of in ubyte[] instead of const(ubyte)[]. This should prove to be especially useful for a VibeManualMemoryManagement version later on (and, in fact, already is now, since one write() overload also takes in ubyte[]).

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Sounds good.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Sounds good.

Show outdated Hide outdated source/vibe/http/common.d
if (buffer is null || buffer.length < size)
buffer = new ubyte[](size);
return buffer[0..size];
};

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

This could easily be done using an appropriate allocator from memutils.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

This could easily be done using an appropriate allocator from memutils.

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Why not just reuse m_buffer here?

@s-ludwig

s-ludwig Dec 9, 2015

Member

Why not just reuse m_buffer here?

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Mainly because I wanted to keep the buffer management in one place. But if I take the flushing out, I should be able to move that into it's own method.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Mainly because I wanted to keep the buffer management in one place. But if I take the flushing out, I should be able to move that into it's own method.

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Actually, I'm not sure if that would still be reasonable. I thought about writing a method that requests N bytes in the buffer and returns the according range in the buffer (so one could stream.read(appendBytes(N))), but the Appender struct seems not to be made for this case.
To be able to conveniently append data to the buffer, one seems to basically always have a range of items already allocated.

This could work, but I'm not sure if it is still efficient:

private ubyte[] appendBytes(ulong maxSize)
{
    if (m_maxBufferSize > 0 && m_maxBufferSize < m_buffer.data.length + maxSize)
        maxSize = m_maxBufferSize - min(m_buffer.data.length, m_maxBufferSize);
    auto pos = m_buffer.data.length;
    for (ulong i = 0; i < maxSize; ++i)
        m_buffer.put(0);
    return m_buffer.data[pos..pos+maxSize];
}

Then I could basically do:

auto bytes = appendBytes(sz);
if (bytes.length > 0)
    data.read(bytes);
else
    flush();

And for the write(in ubyte[]) method it would be:

while (bytes.length > 0) {
    auto sz = bytes.length;
    auto dst = appendBytes(sz);
    if (dst.length > 0)
    {
        dst[0..sz] = bytes[0..sz];
        bytes = bytes[sz..$];
    }
    else
        flush();
}

Do you perhaps have a better idea? Another variant could be to use initialize the appender with the according block size and use it like a normal array as long as m_maxBufferSize > 0 (basically only using put() for unbounded chunk sizes).

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Actually, I'm not sure if that would still be reasonable. I thought about writing a method that requests N bytes in the buffer and returns the according range in the buffer (so one could stream.read(appendBytes(N))), but the Appender struct seems not to be made for this case.
To be able to conveniently append data to the buffer, one seems to basically always have a range of items already allocated.

This could work, but I'm not sure if it is still efficient:

private ubyte[] appendBytes(ulong maxSize)
{
    if (m_maxBufferSize > 0 && m_maxBufferSize < m_buffer.data.length + maxSize)
        maxSize = m_maxBufferSize - min(m_buffer.data.length, m_maxBufferSize);
    auto pos = m_buffer.data.length;
    for (ulong i = 0; i < maxSize; ++i)
        m_buffer.put(0);
    return m_buffer.data[pos..pos+maxSize];
}

Then I could basically do:

auto bytes = appendBytes(sz);
if (bytes.length > 0)
    data.read(bytes);
else
    flush();

And for the write(in ubyte[]) method it would be:

while (bytes.length > 0) {
    auto sz = bytes.length;
    auto dst = appendBytes(sz);
    if (dst.length > 0)
    {
        dst[0..sz] = bytes[0..sz];
        bytes = bytes[sz..$];
    }
    else
        flush();
}

Do you perhaps have a better idea? Another variant could be to use initialize the appender with the according block size and use it like a normal array as long as m_maxBufferSize > 0 (basically only using put() for unbounded chunk sizes).

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Actually, I have been looking at the wrong thing all along - guess I didn't read the type toroughly enough and just assumed a std.array.Appender. Yeah, if we could have a new method for this use case, that would be a nice solution.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Actually, I have been looking at the wrong thing all along - guess I didn't read the type toroughly enough and just assumed a std.array.Appender. Yeah, if we could have a new method for this use case, that would be a nice solution.

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

I think AllocAppender should definitely get extended. Either by adding an append method similar to the one you mentioned, or using the scheme that I mentioned in my reply to the wrong message thread:

Flushing can indeed be removed here, it was just necessary to cleanly switch between buffered and stream modes. I just realized now that m_buffer is an AllocAppender. We'd need something like AllocAppender.put(scope size_t delegate(scope T[] dst) del);, where the delegate is called with m_remaining and the return value is the number of bytes written to that buffer.

The advantage of the callback based version would be that it could be made to work with immutable(T)[], but using a method that just returns a buffer is of course much nicer to look at.

@s-ludwig

s-ludwig Dec 9, 2015

Member

I think AllocAppender should definitely get extended. Either by adding an append method similar to the one you mentioned, or using the scheme that I mentioned in my reply to the wrong message thread:

Flushing can indeed be removed here, it was just necessary to cleanly switch between buffered and stream modes. I just realized now that m_buffer is an AllocAppender. We'd need something like AllocAppender.put(scope size_t delegate(scope T[] dst) del);, where the delegate is called with m_remaining and the return value is the number of bytes written to that buffer.

The advantage of the callback based version would be that it could be made to work with immutable(T)[], but using a method that just returns a buffer is of course much nicer to look at.

Show outdated Hide outdated source/vibe/http/common.d
m_out.flush();
auto bytes = getBytes(sz);
data.read(bytes);
write(bytes);

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

This change will result in the stream always using chunks of the size maxBufferSize, except for the last one. This is an implicit requirement for the S3 use case (since chunks are not to allowed to be under a certain size).
Using the new extension mechanism it is also the easiest solution, since all data written to the stream has to be put into a buffer in this class anyway (in order to be able to call the delegate). It will however differ from the current chunking behavior. Is this a problem?
Edit: Actually, the argument can also be made that this change makes the stream's behavior more deterministic. This is especially useful if one still needs to calculate the amount of data to be transmitted although it will be chunked (this is also currently done in our vibe-aws branch). Maybe this calculation would also be a useful method?

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

This change will result in the stream always using chunks of the size maxBufferSize, except for the last one. This is an implicit requirement for the S3 use case (since chunks are not to allowed to be under a certain size).
Using the new extension mechanism it is also the easiest solution, since all data written to the stream has to be put into a buffer in this class anyway (in order to be able to call the delegate). It will however differ from the current chunking behavior. Is this a problem?
Edit: Actually, the argument can also be made that this change makes the stream's behavior more deterministic. This is especially useful if one still needs to calculate the amount of data to be transmitted although it will be chunked (this is also currently done in our vibe-aws branch). Maybe this calculation would also be a useful method?

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

I think that should be fine. If specific smaller chunk sizes are required, it's always possible to do something like vibe.stream.operations.pipeRealtime.

@s-ludwig

s-ludwig Dec 9, 2015

Member

I think that should be fine. If specific smaller chunk sizes are required, it's always possible to do something like vibe.stream.operations.pipeRealtime.

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Flushing can indeed be removed here, it was just necessary to cleanly switch between buffered and stream modes. I just realized now that m_buffer is an AllocAppender. We'd need something like AllocAppender.put(scope size_t delegate(scope T[] dst) del);, where the delegate is called with m_remaining and the return value is the number of bytes written to that buffer.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Flushing can indeed be removed here, it was just necessary to cleanly switch between buffered and stream modes. I just realized now that m_buffer is an AllocAppender. We'd need something like AllocAppender.put(scope size_t delegate(scope T[] dst) del);, where the delegate is called with m_remaining and the return value is the number of bytes written to that buffer.

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

As noted above, you can ignore the comment about appending bytes then (my bad). Should I have a look at that too and post another PR (would take some time though)?

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

As noted above, you can ignore the comment about appending bytes then (my bad). Should I have a look at that too and post another PR (would take some time though)?

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

I'll have quick look at that and commit that to master.

@s-ludwig

s-ludwig Dec 9, 2015

Member

I'll have quick look at that and commit that to master.

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Thank you. Actually, once this put() overload is implemented, one could even build something like AllocAppender.put(InputStream data, ulong size) on top of it - this seems (at least to me) to be a much more obvious use case from an outside perspective.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Thank you. Actually, once this put() overload is implemented, one could even build something like AllocAppender.put(InputStream data, ulong size) on top of it - this seems (at least to me) to be a much more obvious use case from an outside perspective.

This comment has been minimized.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Done now (6ba887e) and Travis looks like it will pass.

@s-ludwig

s-ludwig Dec 9, 2015

Member

Done now (6ba887e) and Travis looks like it will pass.

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 9, 2015

Member

To get rid of the reverts, I could squash the commits together into one if you are statisfied with the solution as is.

Would be good, it's a nicely isolated change that fits well in a single commit.

Member

s-ludwig commented Dec 9, 2015

To get rid of the reverts, I could squash the commits together into one if you are statisfied with the solution as is.

Would be good, it's a nicely isolated change that fits well in a single commit.

s-ludwig added a commit that referenced this pull request Dec 9, 2015

Added a delegate property that can be used to specify chunk extension…
…s on a per-chunk basis.

Also, the `write(InputStream,ulong)` method now will always transfer chunks of the defined block size.
/// ditto
@property void chunkExtensionCallback(ChunkExtensionCallback cb) { m_chunkExtensionCallback = cb; }
private void append(scope void delegate(scope ubyte[] dst) del, ulong nbytes)

This comment has been minimized.

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Okay, thanks again for the quick responses and extension of AllocAppender!
I integrated the AllocAppender.append() calls and squashed everything into a single commit. To homogenize the buffer handling in both write() overloads I implemented a new (private) append() method.

Do you think there's further need for modifications (any obvious gotchas)?

@mfrischknecht

mfrischknecht Dec 9, 2015

Contributor

Okay, thanks again for the quick responses and extension of AllocAppender!
I integrated the AllocAppender.append() calls and squashed everything into a single commit. To homogenize the buffer handling in both write() overloads I implemented a new (private) append() method.

Do you think there's further need for modifications (any obvious gotchas)?

@s-ludwig

This comment has been minimized.

Show comment
Hide comment
@s-ludwig

s-ludwig Dec 17, 2015

Member

Looks good! Thanks for doing the modifications!

Member

s-ludwig commented Dec 17, 2015

Looks good! Thanks for doing the modifications!

s-ludwig added a commit that referenced this pull request Dec 17, 2015

Merge pull request #1340 from tamediadigital/feature-explicit-chunk-c…
…ontrol

Add support for writing chunk extensions in ChunkedOutputStream.

@s-ludwig s-ludwig merged commit 2eca873 into vibe-d:master Dec 17, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment