Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiofile LineReader does a read for every line in spite of having multiple lines in CHUNK_SIZE #80

Open
fredgido opened this issue Mar 26, 2023 · 2 comments

Comments

@fredgido
Copy link

fredgido commented Mar 26, 2023

Long story short

LineReader is very slow

Expected behavior

LineReader is as fast as normal line reading.
Read Chunk size actually prevents extra reads.

Actual behavior

LineReader is slow, takes many ms per line.
LineReader causes a read for each line.

Steps to reproduce

import asyncio
import functools
import time

import aiofile


def print_on_call_decorator(func):
    @functools.wraps(func)
    def wrapper_decorator(*args, **kwargs):
        print("real read called")
        value = func(*args, **kwargs)
        return value

    return wrapper_decorator


aiofile.AIOFile.read_bytes = print_on_call_decorator(aiofile.AIOFile.read_bytes)


async def main():
    async with aiofile.AIOFile("test_line_iter_file", "r") as f:
        last_line_time = time.perf_counter()
        async for line in aiofile.LineReader(f, chunk_size=aiofile.LineReader.CHUNK_SIZE * 16*16):
            # print("line_time", time.perf_counter() - last_line_time)
            last_line_time = time.perf_counter()
            # print(line, end="")


if __name__ == "__main__":
    open("test_line_iter_file", "w").write("\n".join(str(i) for i in range(1000000)))
    asyncio.run(main())

Additional info

Sync version to compare:

import time

open("test_line_iter_file", "w").write("\n".join(str(i) for i in range(100000)))
start = time.perf_counter()
with open("test_line_iter_file", "r", buffering=4192 * 16) as f:
    last_line_time = time.perf_counter()
    for line in f:
        # print("line_time", time.perf_counter() - last_line_time)
        last_line_time = time.perf_counter()
        # print(line, end="")

print("end_time", time.perf_counter() - start)

My temporary solution that only works for python approved new lines from the file __iter__, its only twice as slow as sync version:

import asyncio
import asyncio
import collections.abc
import functools
import io
import itertools
import time
from typing import Union, Self

import aiofile


class CustomLineReader(collections.abc.AsyncIterable):
    CHUNK_SIZE = 4192

    def __init__(
        self,
        aio_file: aiofile.AIOFile,
        offset: int = 0,
        chunk_size: int = CHUNK_SIZE,
        line_sep: str = "\n",
    ):
        self.__reader = aiofile.Reader(aio_file, chunk_size=chunk_size, offset=offset)

        self._buffer = None

        self.linesep = aio_file.encode_bytes(line_sep) if aio_file.mode.binary else line_sep

        self.chunk_iterator = None
        self.last_read = None

    async def setup_buffer(self, buffer_initialization=None):
        chunk = await self.__reader.read_chunk()
        if not chunk:
            raise StopAsyncIteration(chunk)

        if self._buffer:
            self._buffer.close()
            del self._buffer
        self._buffer = io.BytesIO() if self.__reader.file.mode.binary else io.StringIO()
        if buffer_initialization:
            self._buffer.write(buffer_initialization)

        self._buffer.write(chunk)
        self._buffer.seek(0)

        self.chunk_iterator = self._buffer.__iter__()

    async def __anext__(self) -> Union[bytes, str]:
        if not self._buffer:
            await self.setup_buffer()
        try:
            self.last_read = next(self.chunk_iterator)
            if self.last_read[-1] != "\n":
                await self.setup_buffer(self.last_read)
                self.last_read = next(self.chunk_iterator)
        except StopIteration:
            await self.setup_buffer(self.last_read)
            self.last_read = next(self.chunk_iterator)
        return self.last_read


    def __aiter__(self) -> Self:
        return self
@mosquito
Copy link
Owner

mosquito commented Jun 5, 2023

Quick fix:

from aiofile.utils import LineReader

# 1 megabyte chunks
LineReader.CHUNK_SIZE = 2 ** 20

For complete fix the some time or help is wanted.

@fredgido
Copy link
Author

Pretty sure this still does a read of the size chunk size for each line anyway so increasing only makes slower

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants