-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix multi-frame Zstandard response decoding #3022
Fix multi-frame Zstandard response decoding #3022
Conversation
This solution is better than my #3021 . I like how you were able to create one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever solution! I like it, but have a few smaller questions.
@@ -169,10 +169,15 @@ def __init__(self) -> None: | |||
def decompress(self, data: bytes) -> bytes: | |||
if not data: | |||
return b"" | |||
return self._obj.decompress(data) # type: ignore[no-any-return] | |||
data_parts = [self._obj.decompress(data)] | |||
while self._obj.eof and self._obj.unused_data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any risk of an infinite loop if there is less data here than a frame's worth? Basically, is there a case where decompress(unused_data)
results in more unused data because there is not enough data for zstd to do anything with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that in that case eof
will be False
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confirmed with some testing:
>>> compressed = zstd.compress(b'foo') + zstd.compress(b'bar') # two frames
>>> obj = zstd.ZstdDecompressor().decompressobj()
>>> obj.decompress(compressed[:10])
b'f'
>>> obj.eof
False
>>> obj.unused_data
b''
>>> obj.decompress(compressed[10:])
b'oo'
>>> obj.eof
True
>>> obj.unused_data
b'(\xb5/\xfd \x03\x19\x00\x00bar'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, sorry, one more question here. What if there is truncated data? Is there any case where there would repeatedly be unused data if the network connection was closed early without receiving everything? Or would it throw an exception in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the eof
attribute is True and there is some unused_data
, then it will go into the next loop. At that point a new object is instantiated, and the unused_data
from the previous one is fed for decompression. And so on and so forth.
If you are concerned about possible infinite loop, then it will not happen because the unused_data
part between each iteration of the loop is guarantied to strictly decrease in size with each iteration.
Now, if the data is truncated, then nothing special will happen. The part of the received data that can be decompressed will be returned, and that will be it. No exception is raised in that case, at least due to the code about Zstandard decompression; I would guess that the code from HTTP handling will detect that packets are lost or something like this and react accordingly.
Not sure if that answers your question, please feel free to clarify otherwise!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that answers my question, thanks!
@@ -169,10 +169,15 @@ def __init__(self) -> None: | |||
def decompress(self, data: bytes) -> bytes: | |||
if not data: | |||
return b"" | |||
return self._obj.decompress(data) # type: ignore[no-any-return] | |||
data_parts = [self._obj.decompress(data)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly concerned about decompress performance in this case. The current design of urllib3 seems to be to just push any data it has right to zstd and ignore zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
, but won't that hurt performance when downloading larger files? It's nothing new to your change, but I am wondering if there is a solution that could respect zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
to not call decompress()
so often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To test this I have created a 1GB file containing fully-random bytes (so that output is >1GB) and compressed it:
$ zstd -l data.zst
Frames Skips Compressed Uncompressed Ratio Check Filename
1 0 954 MiB XXH64 data.zs
Then I run the following code with pytest-benchmark
:
Benchmark code
import zstandard as zstd
with open("data.zst", "rb") as f:
DATA = f.read()
def oneshot():
obj = zstd.ZstdDecompressor().decompressobj()
return len(obj.decompress(DATA))
def test_oneshot(benchmark):
size = benchmark(oneshot)
assert size == 1_000_000_000
def incremental():
obj = zstd.ZstdDecompressor().decompressobj()
size = 0
for i in range(0, len(DATA), zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE):
size += len(
obj.decompress(DATA[i : i + zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE])
)
return size
def test_incremental(benchmark):
size = benchmark(incremental)
assert size == 1_000_000_000
The results show that decompressing one 1G string is ~2.5 time slower than splitting it and sending the data incrementally:
---------------------------------------- benchmark: 2 tests ----------------------------------------
Name (time in ms) Min Max Mean Median
----------------------------------------------------------------------------------------------------
test_incremental 317.5479 (1.0) 384.3817 (1.0) 358.2648 (1.0) 359.8864 (1.0)
test_oneshot 921.7404 (2.90) 968.6641 (2.52) 949.1417 (2.65) 964.4232 (2.68)
----------------------------------------------------------------------------------------------------
Now I leave urllib3's maintainer decide if they want me to make a quick change in the code to do so or if that's ok as it is.
One could also argue that python-zstandard should do it automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also @grossag I did not find much documentation about DECOMPRESSION_RECOMMENDED_INPUT_SIZE
(nor ZSTD_DStreamInSize()
for that matter), feel free to share if you have some pointers!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
defaults to the result of a call into zstd's function ZSTD_DStreamInSize()
. After reading through some zstd source and also facebook/zstd#340 , I think this value may represent the chunk size that zstd performs best with.
That said, optimizing for network download + decompress is much different than disk read + decompress. Also, the variance will be much higher in terms of network download speed. So it's hard to know how much to tune this.
The downside with delaying decompression until you have a buffer of size DECOMPRESSION_RECOMMENDED_INPUT_SIZE
is that it defers more work into flush()
, which as shown in #3021 can break tests. If we delay decompression, the normal case for smaller chunked transfers is that flush()
will actually do the decompress operation.
So I am fine either way and will defer to @sethmlarson on whether more work is needed to delay decompression. Either way, I still think we should go with your approach. Mine has the benefit of using the only API that is known to be able to handle multiple frames now, which could slightly improve performance in that case. But I think the performance gain is lost by my approach having to use an intermediate io.BytesIO()
which would result in an additional copy. I like how your approach doesn't need extra copies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the pointers.
I am confused though. I though you asked about what happens when we feed more data than zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
to the decompressor, but then in your last message you speak about what happens when we feed less data. So here are my thoughts:
- Feeding more data: we see in the benchmark that there seems to be a penalty cost to do so (600ms per GB). I'm open to changing the code to consider that but would argue that the python-zstandard's lib should do it itself.
- Feeding less data: this case allows for the consumer of urllib3 to get the data as soon as it is ready, which would be helpful when streaming HTTP responses. I'm not sure how often this scenario would happen in real life though.
Also, you spoke about calling flush()
, but in the case of this PR ZstdDecompressionObj.flush
does nothing at all. I put a comment in the code for clarity, but did not remove the code calling it in case python-zstandard's API changes in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry! I am wondering specifically about feeding less data than zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
. But as discussed in Discord, zstd frames can be any size so my concern isn't important.
My biggest concern was: what happens when we receive half of a frame's worth of data? python-zstandard's underlying call to zstd's ZSTD_decompressStream
will return 3, meaning that there is more data to decode. But there isn't enough data to decode so we basically need to wait for more data.
I wrote some Python code where I read the full contents of a .zstd file and sliced it in half at different points. I then ran your code over it. It worked perfectly for various ways that I sliced the buffer, so I think your approach is great.
The part I didn't realize is that zstd internally remembers where it is at during a decompress operation. If there is not enough data, it will do as much as it can and return. Then your code is doing the right thing in reusing self._obj
if it doesn't have enough data and it's mid-frame (self._obj.eof
being False if we are mid-frame).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Thank you for taking the time to confirming that everything works as expected! 👍
Hello @grossag thank you for your review! I did not know you were working on it since you were not assigned to the ticket and your PR was not published when I started working on it. I'm glad you like my implementation though! |
This change looks good to me. I tested thousands of zstd-compressed GETs with your topic branch and confirmed that they all worked correctly. I am not a maintainer of urllib3 so I am not sure if I should hit the Approve button. I'll defer to the maintainers on approving it. But I think it's a great improvement that will help a lot, thanks for your work on this! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks! Did you check that the test was failing without your change?
I left two small questions.
|
||
def flush(self) -> bytes: | ||
ret = self._obj.flush() | ||
ret = self._obj.flush() # note: this is a no-op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove it then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was leaving it here for two reasons:
- For future reviewers, conceptually it avoids wondering why it's not there, if that's an error, etc.
- In the unlikely event of the API of python-zstandard changing in that regard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, the first reason makes sense, thanks.
test/test_response.py
Outdated
# Zstandard frame | ||
zstd.compress(b"foo") | ||
# skippable frame (must be ignored) | ||
+ bytes.fromhex("502A4D1810000000726f676468616d207761732068657265") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you come with this? Looking at https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#skippable-frames I'm not seeing 184D2A5
in your string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number is in little-endian, and the last half-byte (noted ?
in the spec) can be anything, I chose 0
here but it is arbitrary. So 0x184D2A50, which encoded little-endian is 502A4D18
.
Then followed by the frame size, I chose 16, so 10000000
(again little-endian).
Then the user data part, I chose 16 arbitrary bytes.
Maybe it would be clearer if I put some spaces in there, like one of the following:
# option 1
bytes.fromhex("502A4D18 10000000 726f676468616d207761732068657265")
# option 2
bytes.fromhex("50 2A 4D 18 10 00 00 00 72 6f 67 64 68 61 6d 20 77 61 73 20 68 65 72 65")
# option 3
bytes.fromhex(
"502A4D18" # Magic_Number
"10000000" # Frame_Size
"726f676468616d207761732068657265" # User_Data
)
Tell me if that makes more sense or if you want me to change it to something else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like option 3! Can we mention additionally that magic number and frame size are little-endian? And maybe use only zeroes for user data? Otherwise it sounds like the randomness is significant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's add comments to the parts of the frame and use zeroes to denote insignificant data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I have changed the frame size to 7 so that it is clearer that it does not need to be a round number.
The failing test is a timeout in one of the environments only because it reached 30min; I don't think it's related to my change. |
I think the question was whether your new zstd test fails without your current change. And I did confirm previously when reviewing your change that the new zstd test fails when run on current main top of tree. So you should be all set for that. |
Oh my bad I completely misread the question. Thank you for clarifying (and having tested it!), I have also confirmed it on my side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I should have been clearer regarding the failing test. The only thing remaining is commenting the skippable frame per #3022 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks for working on this! I had two comments:
test/test_response.py
Outdated
# Zstandard frame | ||
zstd.compress(b"foo") | ||
# skippable frame (must be ignored) | ||
+ bytes.fromhex("502A4D1810000000726f676468616d207761732068657265") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's add comments to the parts of the frame and use zeroes to denote insignificant data.
078ba9d
to
6a18181
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for iterating. And thanks everyone for the careful reviews.
test/test_response.py
Outdated
+ bytes.fromhex( | ||
"50 2A 4D 18" # Magic_Number (little-endian) | ||
"07 00 00 00" # Frame_Size (little-endian) | ||
"00 00 00 00 00 00 00 " # User_Data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you please remove the trailing space after the last 00?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for that, fixed!
6a18181
to
1ca2bc3
Compare
Thanks @Rogdham and @grossag! You are both invited to apply for the $300 OpenCollective bounty. Please submit an expense to our OpenCollective if you're eligible to do so and let us know if there's anything we can do to help with that process :) |
Thanks @sethmlarson ! I'm not eligible but appreciate the offer. The fix is going to be very useful for my work anyway. Thanks @Rogdham for making it. |
Thank you for merging the PR, and for the bounty offer! I have sent an expense on OpenCollective. Feel free to contact me if you have any questions with this piece of code in the future. Many thanks! |
Thanks! Approved. ✅ |
Fixes #3008.
Background on Zstandard
Data compressed with Zstandard is a sequence of frames. A frame can be of two kinds:
When decompressing some data, data from each frame should be concatenated.
Link to the Zstandard specification
Multi-frame with python-zstandard
The python-zstandard library used to decompress the data currently stops after the first frame. As a result, if several frames are chained in the response, the decompressed response will be partial.
This seems to be a temporary state of affairs as the author's library wants to introduce a
read_across_frames
parameter to their decompress APIs, which would default toFalse
for some time before defaulting toTrue
.With the current version, I relied of the
unused_data
attribute of theZstdDecompressionObj
wheneof
is reached to continue with the next frame. This behaviour is specified in the documentation.