Skip to content

Broadcast. High CPU/Memory usage #2178

@iRevive

Description

@iRevive

Using Stream#broadcastTo with the high message rate source leads to increased CPU usage.
Meanwhile, Akka shows 2-3x less CPU usage.

I tried different JVMs: OpenJDK 11, OpenJDK 14, OpenJDK 14 OpenJ9. The CPU usage is more or less the same on every virtual machine.

Prerequisites

JVM: OpenJDK 64-Bit Server VM (11.0.4+11, mixed mode)
fs2: 2.4.6
akka-streams: 2.6.6

Scenario 1. fs2. map. ~15% CPU usage

import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object CPUTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val source: Stream[Pure, Int] = Stream.range(1, Int.MaxValue)
    val discard: Pipe[IO, Int, Unit] = _.map(_ => ())

    source.map(discard).compile.drain.as(ExitCode.Success)
  }
}

image

Scenario 2. fs2. Broadcast to 1 pipe. ~28% CPU usage

import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object CPUTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val source: Stream[Pure, Int] = Stream.range(1, Int.MaxValue)
    val discard: Pipe[IO, Int, Unit] = _.map(_ => ())

    source.broadcastTo(pipe).compile.drain.as(ExitCode.Success)
  }
}

image

Scenario 3. fs2. Broadcast to 100 pipes. 50-80% CPU usage

import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object CPUTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val source: Stream[Pure, Int] = Stream.range(1, Int.MaxValue)
    val discard: Pipe[IO, Int, Unit] = _.map(_ => ())
    val pipes: List[Pipe[IO, Int, Unit]] = List.fill(100)(discard)

    source.broadcastTo(pipes: _*).compile.drain.as(ExitCode.Success)
  }
}

image

Scenario 4. Akka. Broadcast to 100 pipes. ~28% CPU usage

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ClosedShape, Materializer}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, RunnableGraph, Sink, Source}
import cats.effect.{ExitCode, IO, IOApp}

object AkkaCPUTest {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem: ActorSystem = ActorSystem()
    implicit val mat: Materializer = Materializer(actorSystem)

    val source: Source[Int, NotUsed] = Source(Range(1, Int.MaxValue))
    val discard: Flow[Int, Unit, NotUsed] =  Flow[Int].map(_ => ())
    val pipes: List[Flow[Int, Unit, NotUsed]] = List.fill(100)(discard)
    val sinks = pipes.map(_.toMat(Sink.ignore)(Keep.right))

    val broadcastGraph = GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Int](pipes.size))

      source ~> broadcast
      sinks.foreach(sink => broadcast ~> sink)

      ClosedShape
    }

    val _ = RunnableGraph.fromGraph(broadcastGraph).run()
  }

}

image

Related issues:
#1406
#1469

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions