Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZStream.groupedWithin doesn't groups batch when got exact items count #3860

Closed
fillson-shady opened this issue Jun 23, 2020 · 2 comments · Fixed by #3885
Closed

ZStream.groupedWithin doesn't groups batch when got exact items count #3860

fillson-shady opened this issue Jun 23, 2020 · 2 comments · Fixed by #3885
Labels
bug Something isn't working stream ZIO Stream

Comments

@fillson-shady
Copy link

Hello!

I've met strange behaviour of the ZStream.groupedWithin which can significantly affect performance of an application when it's passing batches of messages of fixed size. Here is the reproducer (works both on RC-20, RC-21 and SNAPSHOT):

import zio._
import zio.duration._
import zio.stream.ZStream

object TestApp extends App {

  def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] =
    for {
      queue <- Queue.unbounded[Int]
      stream = ZStream.fromQueue(queue)
      _ <- stream
        .tap(i => ZIO.effectTotal(println(java.time.LocalDateTime.now() + " got: " + i)))
        .groupedWithin(4, 10.seconds)
        .mapM(batch =>
          ZIO
            .effectTotal(
              println(java.time.LocalDateTime.now() + s" got batch [${batch.size}]: " + batch.mkString("[", ", ", "]"))
            )
            .as(true)
        )
        .runDrain
        .fork

      _ <- ZIO.foreach(1 to 4)(i => queue.offer(i))
      _ <- ZIO.sleep(15.second)

    } yield ExitCode.success
}

It produces the following output:

2020-06-23T21:57:59.711925 got: 1
2020-06-23T21:57:59.866961 got: 2
2020-06-23T21:58:00.107421 got: 3
2020-06-23T21:58:00.188661 got: 4
2020-06-23T21:58:10.422722 got batch [4]: [1, 2, 3, 4]

One can see that we are able to compose a full batch after a millis from start, but actually we got batch only at the end of a group window (after 10 seconds).

Is it a bug or an expected behaviour?

Thank you!

@iravid
Copy link
Member

iravid commented Jun 26, 2020

Definitely a bug. I just ran into that too.

@iravid iravid added bug Something isn't working stream ZIO Stream labels Jun 26, 2020
@iravid
Copy link
Member

iravid commented Jun 26, 2020

@fillson-shady The problem is that the transducer used in groupedWithin, ZTransducer.collectAllN, waits for the N+1th element before emitting. So as a workaround, you could try modifying the batch size. But most likely in real-world situations, the number of elements on the stream is variable and you might not be hitting this as often as thought.

Either way - will fix the problem with collectAllN.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working stream ZIO Stream
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants