Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadAsync doesn't return until output buffer filled or stream ends #32

Closed
LinusKardellInfobric opened this issue Feb 20, 2024 · 11 comments · Fixed by #33
Closed

ReadAsync doesn't return until output buffer filled or stream ends #32

LinusKardellInfobric opened this issue Feb 20, 2024 · 11 comments · Fixed by #33

Comments

@LinusKardellInfobric
Copy link
Contributor

I'm trying to use this library for adding compression to a network protocol, and the idea I had is to simply wrap the protocol in a ZSTD-compressed stream. Each time an end sends a message, it would flush out a compressed block. But I'm running into the problem that DecompressionStream.ReadAsync doesn't exit return the data it has already decompressed even if no more data is currently available on the inner stream. ReadAsync only exits once output.pos >= output.size, or innerStream.ReadAsync returns 0 (when the stream has ended). So there seems to be no way fetch all currently available data from a DecompressionStream when reading from a never-ending stream (unless you read one byte at a time from the DecompressionStream, but that would presumably be inefficient), and there seems to be no other way to access streaming decompression in this library without using unsafe code.

I don't see any clear, clean way of having it check if there is any more available data (there is no generic DataAvailable on Stream, and no way to use timeout or cancellation with NetworkStream), so the only solutions I can think of would be to:

  1. Have ReadAsync not await innerStream.ReadAsync if it already has some decompressed data, and instead possibly leave it in progress to be picked up by a subsequent call. Something like:

            private Task<int> readInProgress;var output = new ZSTD_outBuffer_s { pos = 0, size = (nuint)buffer.Length };
             while (output.pos < output.size)
             {
                if (input.pos >= input.size)
                {
                   int bytesRead;
                   var task = readInProgress != null ? readInProgress : innerStream.ReadAsync(inputBuffer, 0, inputBufferSize, cancellationToken);
                   int bytesRead = 0;
                   if (output.pos == 0)
                   {
                      bytesRead = await task.ConfigureAwait(false);
                      task = null;
                   }
                   else if (task.IsCompleted)
                   {
                      bytesRead = task.Result;
                      task = null;
                   }
    
                   readInProgress = task;
    
                   if (bytesRead == 0 && task == null)
                   {
                      if (checkEndOfStream && lastDecompressResult != 0)
                         throw new EndOfStreamException("Premature end of stream");
    
                      break;
                   }
    
                   input.size = (nuint)bytesRead;
                   input.pos = 0;
                }
    
                lastDecompressResult = DecompressStream(ref output, buffer.Span);
             }

    Though that's just something I quickly threw together, I haven't tested it, and Read would also need to be modified in order for mixed Sync/Async read to be safe, and I have no idea how to deal with that in Dispose.

  2. Have ReadAsync never call innerStream.ReadAsync when output.pos > 0 (which means it would call innerStream.ReadAsync at most once on each call). Would have the drawback that ReadAsync might not fill the output buffer even if there is more data available.

  3. Provide a thin, non-stream-based, safe wrapper around Decompressor.DecompressStream. Something like:

            public (nuint Status, nuint InputPos, nuint OutputPos) DecompressStream(Span<byte> inputBuffer, Span<byte> outputBuffer)
            {
                fixed (byte* inputBufferPtr = inputBuffer)
                fixed (byte* outputBufferPtr = outputBuffer)
                {
                    var input = new ZSTD_inBuffer_s { size = (nuint)inputBuffer.Length, pos = 0, src = inputBufferPtr };
                    var output = new ZSTD_outBuffer_s { size = (nuint)outputBuffer.Length, pos = 0, dst = outputBufferPtr };
                    var returnValue = DecompressStream(ref input, ref output);
                    return (returnValue, input.pos, output.pos);
                }
            }
@oleg-st
Copy link
Owner

oleg-st commented Feb 20, 2024

I like the second option, we can increase the inputBuffer via bufferSize if we want to get more data in ReadAsync.

@LinusKardellInfobric
Copy link
Contributor Author

LinusKardellInfobric commented Feb 20, 2024

I think in that option ReadAsync would also need to start out by calling DecompressStream without reading any new input, to check if there is any data still buffered in the decompression context or in inputBuffer which didn't fit in output on the previous call (at least if the previous call ended with output.pos == output.size).

@LinusKardellInfobric
Copy link
Contributor Author

Maybe something like (again, something I just quickly scratched together without testing):

            EnsureNotDisposed();

            var output = new ZSTD_outBuffer_s { pos = 0, size = (nuint)buffer.Length };

            while (true)
            {
                // Check if we already have some data available to decompress
                lastDecompressResult = DecompressStream(ref output, buffer.Span);
                if (output.pos > 0)
                {
                    return (int)output.pos;
                }

                // Otherwise, read some more data
                if (input.pos >= input.size)
                {
                    int bytesRead;
                    if ((bytesRead = await innerStream.ReadAsync(inputBuffer, 0, inputBufferSize, cancellationToken)
                         .ConfigureAwait(false)) == 0)
                    {
                        if (checkEndOfStream && lastDecompressResult != 0)
                        {
                            throw new EndOfStreamException("Premature end of stream");
                        }

                        return 0;
                    }

                    input.size = (nuint)bytesRead;
                    input.pos = 0;
                }
            }

@oleg-st
Copy link
Owner

oleg-st commented Feb 20, 2024

input.pos >= input.size checks that all input is consumed and we need to read more

@LinusKardellInfobric
Copy link
Contributor Author

LinusKardellInfobric commented Feb 20, 2024

True, but if I understand correctly it is still possible that the ZSTD_DCtx_s has consumed all compressed data from input, but could not fit all the decompressed data in output on the previous call.

@oleg-st
Copy link
Owner

oleg-st commented Feb 20, 2024

We can add this before ReadAsync:

// frame is completely decoded so flush it before next read
if (lastDecompressResult == 0 && output.pos > 0) {
    break;
}

@LinusKardellInfobric
Copy link
Contributor Author

I'm afraid I don't see the purpose of checking if a frame has ended there.

@oleg-st
Copy link
Owner

oleg-st commented Feb 20, 2024

A frame ends at the flush/close of the compression stream, so a flush in compression will cause a flush in decompression.

@LinusKardellInfobric
Copy link
Contributor Author

LinusKardellInfobric commented Feb 20, 2024

Flushing the compression would end the current block (ZSTD_e_flush), but not necessarily end the current frame (ZSTD_e_end). And it is still possible that when a frame ends the sender immediately starts a new frame (so the start of the new frame is received at the same time as the end of the previous frame), or that data remains buffered in the ZSTD_DCtx_s.

@LinusKardellInfobric
Copy link
Contributor Author

Should I make a pull request with my solution?

@oleg-st
Copy link
Owner

oleg-st commented Feb 20, 2024

Flush before ReadAsync then?

if (output.pos > 0) {
    break;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants