Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Memory footprint StreamReader.readexactly #394

@frensjan

Description

@frensjan

StreamReader.readexactly is a very handy method to implement reading frames ('chunks' of data preceded with the size of the 'chunk') from a stream. E.g.:

frame_len = yield from reader.readexactly(4) # when using 32 bits sizes
frame = yield from reader.readexactly(frame_len)

However, it's memory footprint isn't ideal. A list of 'blocks' is kept until all data is read which is then joined with b''.join(blocks). This causes each 'frame' to reside in memory twice until the blocks list and the bytes objects it points to are freed (when leaving the readexactly method). So it's peak memory usage is O(2n).

For small 'frames' the only issue probably is some copying which could be avoided, but for larger 'frames' this hurts. E.g. when sending large files for processing to a machine which are larger than memory / #receivers / 2.


A possible solution might be to use a bytearray to gather the data incrementally. E.g. something along the lines of:

@asyncio.coroutine
def readexactly(self, n):
    if self._exception is not None:
        raise self._exception

    if not n:
        return b''

    if len(self._buffer) >= n and not self._eof:
        data = self._buffer[:n]
        del self._buffer[:n]
        self._maybe_resume_transport()
        return data

    data = bytearray(n)
    pos = 0

    while n:
        if not self._buffer:
            yield from self._wait_for_data('read')

        if self._eof or not self._buffer:
            raise asyncio.IncompleteReadError(data[:pos], pos + n)

        available = len(self._buffer)
        if available <= n:
            data[pos:pos + available] = self._buffer
            self._buffer.clear()
            n -= available
            pos += available
        else:
            data[pos:pos + n] = self._buffer[:n]
            del self._buffer[:n]
            n = 0
        self._maybe_resume_transport()

    return data

This implementation would have a peak memory footprint of O(n). Or O(n + max(map(len, blocks))), not sure what the other constant factors might be, probably lots more lower in the stack.

Note that the implementation above also does away with some unnecessary copying by duplicating some code from StreamReader.read.
Note as well that the implementation has a short path for readexactly when the required data is available in the _buffer of the StreamReader.

An obvious issue with the implementation above is that it returns (mutable) bytebuffer instead of bytes objects. For my use case I considered this acceptable; I'm not sure if it would be acceptable in asyncio / the standard library. I'm not a aware of a copy-free mechanism to turn a bytebuffer into an immutable bytes-like object (like asReadOnlyBuffer).

Apart from being more memory friendly in terms of 'volume', because there is less copying, it's also a tad bit faster (around 20% on my machine, see test script here: https://gist.github.com/frensjan/27061b47e1423dd73eeec9f5cad5f4ad

An alternative direction could be to add a socket.recv_into-like method which accepts a mutable buffer.


I hope the issue is clear. Curious to get feedback on relevance for the 'broader developer community' and to the solution. If desirable I'll whip up a PR.

Best regards,
Frens Jan

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions