Enumeratee memory leak #554

Open
anovstrup opened this Issue Oct 3, 2013 · 3 comments

Projects

None yet

3 participants

@anovstrup
Contributor

I've run into an apparent memory leak when processing a large data stream where I need to zip the chunks with their indices and group them into larger units for processing.

The following code exhibits the behavior, resulting in an OutOfMemoryError:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// Define a stream of n zero-filled arrays of size sz
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Byte], IO](
    Iterator.continually(Array.fill(sz)(0.toByte)).take(n))

// Define a stream of 2^16 64-KiB arrays;
// The size of the stream may need to be higher 
// depending on your heap space.
def e = enumArrs(1 << 16, 1 << 16)

// Fold over the zipped/grouped stream, computing the 
// total length of all the arrays.
(Iteratee.fold[Vector[(Array[Byte], Long)], IO, Long](0) {
  _+_.map(_._1.length).sum
} &= (e.zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO

The error does not arise if I only apply one of the enumeratees (i.e., zipWithIndex or group, but not both), even when I dramatically increase the number of elements in the stream.

I also posted this issue as a question on StackOverflow yesterday, in case the problem is in my code. I'm fairly certain that it should run in constant heap space, though.

@anovstrup
Contributor

I discovered today that tweaking the code to run in an Id context changes the behavior -- it results in a StackOverflow:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.stream._, scalaz.Id._

// Define a stream of n zero-filled arrays of size sz
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumStream[Array[Byte], Id](
    Stream.continually(Array.fill(sz)(0.toByte)).take(n))

// Define a stream of 2^16 64-KiB arrays;
// The size of the stream may need to be higher 
// depending on your heap space.
def e = enumArrs(1 << 16, 1 << 16)

// Fold over the zipped/grouped stream, computing the 
// total length of all the arrays.
(Iteratee.fold[Stream[(Array[Byte], Long)], Id, Long](0) {
  _+_.map(_._1.length).sum
} &= (e.zipWithIndex mapE Iteratee.group(4))).run
@anovstrup
Contributor

If anyone happened to already see the comment I just deleted, please ignore it. I inadvertently tested in an environment with more heap space -- when I went back to the original environment, the leak was still evident. :(

@anovstrup
Contributor

I have to abandon the search for now, but I created this gist to record my failed attempts at a solution:

https://gist.github.com/anovstrup/6894978

Hopefully this helps with diagnosis...

There's a combined grouping/folding method there (groupAndFold2) that works as a reasonable workaround.

@larsrh larsrh modified the milestone: 7.2, 7.1 Jun 28, 2014
@xuwei-k xuwei-k removed this from the 7.2 milestone Nov 27, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment