Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 'fs.io.readInputStreamGeneric' overallocation of underlying buffers #3318

Merged

Conversation

seigert
Copy link
Contributor

@seigert seigert commented Oct 11, 2023

Current implementation of fs2.io.readInputStream allocates new array of chunkSize size for every invocation of InputStream#read(..). The problem is that read spec does not require InputStream implementations to write provided buffer fully or until stream exhaustion.

This leads to situation where if readInputStream chunk size is big (megabytes) and underlying input stream innner 'chunks' are small (bytes or kilobytes), returned Stream[F, Byte] are very 'sparse' where in every chunk only small amount of allocated capacity is actually used.

It could be shown easily by combining fs2.io.toInputStream with fs2.io.readInputStream.

This PR fixes this by reusing leftovers of allocated Array[Byte] for consequent 'InputStream' reads until either buffer is fully written or stream is exhausted. Like current implementation fs2.Stream chunks are published with each successful InputStream#read invocation but may share underlying array buffer.

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Unfortunately I do not think this is the right way to solve this problem. The concern is what if there is actually not enough data available on the InputStream at a particular moment, but it is not yet closed. E.g. consider interacting with an external process over stdin/stdout.

Have you seen this issue?

I proposed a couple different ideas to solve it in #3106 (comment).

Ah, I'm sorry, I missed your edit. This is interesting :)

Like current implementation fs2.Stream chunks are published with each successful InputStream#read invocation but may share underlying array buffer.

@armanbilge
Copy link
Member

armanbilge commented Oct 11, 2023

chunks ... may share underlying array buffer.

I guess my only concern with this approach is that a single Chunk that is retained for a long time may still have a surprisingly large memory footprint due to a large backing array. I'm not sure how realistic this is in practice :)

To avoid that would require allocating an appropriately sized array and copying into that, as suggested in #3106 (comment).

@seigert
Copy link
Contributor Author

seigert commented Oct 12, 2023

@armanbilge, thanks for your input!

The issue I'm trying to resolve is that we have network data in size of megabytes and, naturally, we assume that buffer of 1MiB would be good enough. Only problem is that underlying InputStream#read decided to write data in 1KiB chunks. And suddenly per every mere megabyte of data we have whooping 1 Gigabyte of allocated RAM that only 0.1% populated.

What's worse, it is really hard to diagnose: at first everything works, maybe using more memory than expecting (it's JVM after all), then suddenly there is OOM error, then you spend some time to find a leak, except there's no leaks, and only finally you find that memory is full of fs2.Chunk.ArraySlice of 1MiB each with only 1 first kilo written.

To fix this currently we need to put custom Pull or something like .chunkN(chunkSize).map(_.compact).unchunks after every readInputStream. :(

I guess my only concern with this approach is that a single Chunk that is retained for a long time may still have a surprisingly large memory footprint due to a large backing array. I'm not sure how realistic this is in practice :)

To avoid that would require allocating an appropriately sized array and copying into that, as suggested in #3106 (comment).

I think situation of 'short data streams' if not less frequent then at least much more detectable than issue above.

To replicate issue above you will need 1024 unconsumed streams of 1KiB each and I would say that it is more question if wrongly guessing median data size.

Also, use of copy introduces some questions:

  • What should we do in unsafeReadInputStream? Copy, write into buffer from zero index each time, write in circular pattern?
  • What should be the ratio when we copy? 1:1000, 1:10, 1:2?
  • With every copy we introduce additional allocation and pointer indirection. Yes, currently we allocate new ArraySlice too but at least backing array is the same. Maybe we could even optimise chunk ++ to detect concat of two nearest slices with common backing array (maybe .compact already does that?). This way unchunkN, unchunkMin, etc won't add another nesting level.

I would argue that this implementation tries its best to translate user intention: 'write data from stream into continuous memory regions of chunkSize bytes'.

@seigert seigert force-pushed the fix/io-read_input_stream-overallocation branch from 96e2787 to aea8f85 Compare October 12, 2023 10:03
@seigert seigert force-pushed the fix/io-read_input_stream-overallocation branch from ae7ced1 to 71bd067 Compare October 20, 2023 14:18
@seigert seigert force-pushed the fix/io-read_input_stream-overallocation branch from 71bd067 to 4e10816 Compare October 20, 2023 15:19
@mpilquist
Copy link
Member

I like this approach and I'm curious if we should do something similar for TCP sockets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants