Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can we minimize memory consumption when decoding data? #2787

Closed
pquentin opened this issue Nov 10, 2022 · 6 comments
Closed

How can we minimize memory consumption when decoding data? #2787

pquentin opened this issue Nov 10, 2022 · 6 comments

Comments

@pquentin
Copy link
Member

pquentin commented Nov 10, 2022

For the needs of #2712, I'm looking for a queue-like data structure that:

  • holds bytes
  • supports put()ing an arbitrary number of bytes
  • supports get()ing an arbitrary number of bytes
  • consumes O(size()) memory
  • is less complex than a rope!
@pquentin
Copy link
Member Author

pquentin commented Nov 10, 2022

The current solution is a bytearray, which is pretty bad because it consumes three times the amount of bytes for get() operations:

  • the data still in the buffer
  • the byterarray slice
  • the bytes conversion of that slice

Here's a self-contained illustration of that problem:

from urllib3.response import MultiDecoder


class ByteArrayDecoder:
    def __init__(self):
        self.buffer = bytearray()
        self.decoder = MultiDecoder("gzip")

    def put(self, compressed_data: bytes):
        decoded = self.decoder.decompress(compressed_data)
        self.buffer.extend(decoded)

    def get(self, n: int):
        decoded_data = bytes(self.buffer[:n])
        del self.buffer[:n]
        return decoded_data

I'm using this code on the following test data (2GiB of zeroes, this compresses to 2MiB of data):

import gzip

with open("zeroes.gc", "w") as f:
    f.write(bytes(2**31))

And then I run the following code using memray:

with open("zeroes.gz", "rb") as f:
    compressed = f.read()

decoder = ByteArrayDecoder()
decoder.put(compressed)
decoder.get(2**31)

We can use a memoryview to avoid one of the extra copies:

class MemoryViewDecoder:
    def __init__(self):
        self.buffer = bytearray()
        self.decoder = MultiDecoder("gzip")

    def put(self, compressed_data: bytes):
        decoded = self.decoder.decompress(compressed_data)
        self.buffer.extend(decoded)

    def get(self, n: int):
        buffer_view = memoryview(self.buffer)
        decoded_data = bytes(buffer_view[:n])
        del buffer_view
        del self.buffer[:n]
        return decoded_data

Now both put and get do an extra copy, so we're looking at 4GiB of memory use. Hmm. put() too!? Let's confirm that:

from urllib3.response import MultiDecoder

with open("zeroes.gz", "rb") as f:
    compressed = f.read()
decoder = MultiDecoder("gzip")
decoded = decoder.decompress(compressed)

OK, this script also consumes 4GiB. Is MultiDecoder at fault? Let's try without it:

import zlib

with open("zeroes.gz", "rb") as f:
    compressed = f.read()

obj = zlib.decompressobj(16 + zlib.MAX_WBITS)
decoded = obj.decompress(compressed)
assert len(decoded) == 2**31

OK, that's also 4GiB. What if I read the file in chunks:

import zlib


def chunks():
    with open("zeroes.gz", "rb") as f:
        while chunk := f.read(2**20):
            yield chunk


obj = zlib.decompressobj(16 + zlib.MAX_WBITS)
output = bytearray()
for chunk in chunks():
    output += obj.decompress(chunk)
output += obj.flush()
assert len(output) == 2**31

This is where the results start to be intriguing. For zeroes.gz, the max is at 3GiB with 21 allocations. But for a random file that does not compress at all (head -c 2G </dev/urandom > rand && gzip rand), the max is at 2.1GiB with 7 allocations.

Aha! By reducing the chunk size from 2**20 to 2**10 in the zeroes case, I also get down to 2.1GiB with 7 allocations. This makes sense. zeroes uncompress extremely well, so 1MiB of compressed data is roughly 1GiB uncompressed, which means we only had two chunks.

If we get back to MultiDecoder, another inefficiency here is that MultiDecoder uses a bytearray internally, returns bytes, and we then convert them back to a bytearray! We could avoid a copy.

All in all, it sounds like the following tricks could help us significantly:

  • memoryview
  • read in chunks (but the optimal size would have to be tuned)
  • avoid the bytearray -> bytes -> bytearray roundtrip

We would likely still use 4GiB in get() though. To be continued.

@pquentin
Copy link
Member Author

The more I think about it, the more I want to try a queue of bytes objects.

@illia-v
Copy link
Member

illia-v commented Nov 10, 2022

#2657 (comment) might be a discussion of a similar problem. This was a chosen solution for the latter:

buffer = io.BytesIO()
# Besides `max_chunk_amt` being a maximum chunk size, it
# affects memory overhead of reading a response by this
# method in CPython.
# `c_int_max` equal to 2 GiB - 1 byte is the actual maximum
# chunk size that does not lead to an overflow error, but
# 256 MiB is a compromise.
max_chunk_amt = 2**28
while amt is None or amt != 0:
if amt is not None:
chunk_amt = min(amt, max_chunk_amt)
amt -= chunk_amt
else:
chunk_amt = max_chunk_amt
data = self._fp.read(chunk_amt)
if not data:
break
buffer.write(data)
del data # to reduce peak memory usage by `max_chunk_amt`.

@pquentin
Copy link
Member Author

Yes I'm familiar with that buffer but in _fp_read we only append data to the right. For #2712 we need efficient right append but also left pop, which I don't think is something that can be done efficiently with a file-like object such as BytesIO.

@pquentin
Copy link
Member Author

pquentin commented Nov 11, 2022

The more I think about it, the more I want to try a queue of bytes objects.

I could not resist trying. Here is the result:

import collections
import io

from urllib3.response import MultiDecoder


class BytesQueueDecoder:
    def __init__(self):
        self.buffer = collections.deque()
        self.decoder = MultiDecoder("gzip")

    def put(self, compressed_data: bytes):
        decoded = self.decoder.decompress(compressed_data)
        self.buffer.append(decoded)
        print(len(self.buffer))

    def get(self, n: int):
        if not self.buffer:
            raise ValueError("nothing to get")
        elif n < 0:
            raise ValueError("n should be non-negative")

        fetched = 0
        ret = io.BytesIO()
        while fetched < n:
            print(len(self.buffer))
            remaining = n - fetched
            chunk = self.buffer.popleft()

            if remaining < len(chunk):
                left_chunk, right_chunk = chunk[:remaining], chunk[remaining:]
                ret.write(left_chunk)
                self.buffer.appendleft(right_chunk)
                break
            else:
                ret.write(chunk)
                fetched += len(chunk)
        return ret.getvalue()


decoder = BytesQueueDecoder()
with open("zeroes.gz", "rb") as f:
    while chunk := f.read(2**15):
        decoder.put(chunk)

assert len(decoder.get(2**31)) == 2**31

Let's compare the memoryview and deque approaches:

MemoryViewDecoder

memoryview

BytesQueueDecoder

byteslist

For the new approach, the max allocation is determined by the sum of two things:

  • the size of the decompressed data we're getting (here 2GiB)
  • the size of a decompressed chunk (here ~215MB)

I don't think we can do better than that. It does mean that for optimal results we need to make sure to feed the decoder reasonably small chunks (the tuning alluded above). With a single chunk we'll use twice the memory usage of the decompressed data (4GiB here, just like the memoryview approach)

@pquentin
Copy link
Member Author

I captured a CPU profile to check that this would not be too slow. Regardless of the buffer size, 14% of the time is spent converting to bytes in MultiDecoder, another 14% is spent writing into the BytesIO (a price that is a paid only in the streaming case), and the rest (70%) is the actual decompression. Seems OK.

image

This spike is done, closing. Now let's try to use that in #2712. 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants