Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Initial implementation of distributed and broadcast #1235

Merged
merged 7 commits into from Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions core/jvm/src/test/scala/fs2/concurrent/BalanceSpec.scala
@@ -0,0 +1,32 @@
package fs2.concurrent

import cats.effect.IO
import fs2._
import TestUtil._

class BalanceSpec extends Fs2Spec {

"Balance" - {

"all elements are processed" in {
forAll { (source: PureStream[Int], concurrent: SmallPositive, chunkSize: SmallPositive) =>
val expected = source.get.compile.toVector.map(_.toLong).sorted

val result =
source.get
.covary[IO]
.balanceThrough(chunkSize = chunkSize.get, maxConcurrent = concurrent.get)(
_.map(_.toLong))
.compile
.toVector
.unsafeRunSync()
.sorted

result shouldBe expected

}
}

}

}
42 changes: 42 additions & 0 deletions core/jvm/src/test/scala/fs2/concurrent/BroadcastSpec.scala
@@ -0,0 +1,42 @@
package fs2.concurrent

import cats.effect.IO
import fs2._
import TestUtil._

class BroadcastSpec extends Fs2Spec {

"Broadcast" - {

"all subscribers see all elements" in {
forAll { (source: PureStream[Int], concurrent: SmallPositive) =>
val expect = source.get.compile.toVector.map(_.toString)

def pipe(idx: Int): Pipe[IO, Int, (Int, String)] =
_.map { i =>
(idx, i.toString)
}

val result =
source.get
.broadcastThrough((0 until concurrent.get).map(idx => pipe(idx)): _*)
.compile
.toVector
.map(_.groupBy(_._1).mapValues(_.map(_._2).toVector))
.unsafeRunSync()

if (expect.nonEmpty) {
result.size shouldBe (concurrent.get)
result.values.foreach { v =>
v shouldBe expect
}
} else {
result.values.size shouldBe 0
}

}
}

}

}
106 changes: 105 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Expand Up @@ -5,7 +5,7 @@ import cats.data.{Chain, NonEmptyList}
import cats.effect._
import cats.effect.concurrent._
import cats.implicits.{catsSyntaxEither => _, _}
import fs2.concurrent.{Queue, Signal, SignallingRef}
import fs2.concurrent._
import fs2.internal.FreeC.Result
import fs2.internal._

Expand Down Expand Up @@ -117,6 +117,51 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
delays: Stream[F2, FiniteDuration]): Stream[F2, Either[Throwable, O]] =
attempt ++ delays.flatMap(delay => Stream.sleep_(delay) ++ attempt)

/**
* Alias for `this.through(Broadcast(1))`
*/
def broadcast[F2[x] >: F[x]: Concurrent]: Stream[F2, Stream[F2, O]] =
this.through(Broadcast(1))

/**
* Like `broadcast` but instead of providing stream as source for worker, it runs each stream through
* supplied workers defined as `sink`
*
* Each supplied `sink` is run concurrently with each other. This means that amount of sinks determines parallelism.
* Each sink may have different implementation, if required, for example one sink may process elements, the other
* may send elements for processing to another machine.
*
* Also this guarantees, that each sink will view all `O` pulled from source stream, unlike `broadcast`
* where the resulting stream (worker) sees only elements when it starts its evaluation.
*
* Note that resulting stream will not emit single value (Unit), even if the sinks do.
* If you need to emit Unit values from your sinks, consider using `broadcastThrough`
*
* @param sinks Sinks that will concurrently process the work.
*/
def broadcastTo[F2[x] >: F[x]: Concurrent](sinks: Sink[F2, O]*): Stream[F2, Unit] =
this.to(Broadcast.through(sinks.map(_.andThen(_.drain)): _*))

/**
* Variant of `broadcastTo` that takes number of concurrency required and single sink definition
*/
def broadcastTo[F2[x] >: F[x]: Concurrent](maxConcurrent: Int)(
sink: Sink[F2, O]): Stream[F2, Unit] =
this.broadcastTo[F2]((0 until maxConcurrent).map(_ => sink): _*)

/**
* Alias for `this.to(Broadcast.through(pipes))`
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] =
this.through(Broadcast.through(pipes: _*))

/**
* Variant of `broadcastThrough` that takes number of concurrency required and single pipe
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](maxConcurrent: Int)(
pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
this.broadcastThrough[F2, O2]((0 until maxConcurrent).map(_ => pipe): _*)

/**
* Behaves like the identity function, but requests `n` elements at a time from the input.
*
Expand Down Expand Up @@ -500,6 +545,65 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
}
.stream

/**
* Alias for `this.through(Balance(Int.MaxValue))`
*/
def balanceAvailable[F2[x] >: F[x]: Concurrent]: Stream[F2, Stream[F2, O]] =
this.through(Balance(Int.MaxValue))

/**
* Alias for `this.through(Balance(chunkSize))`
*/
def balance[F2[x] >: F[x]: Concurrent](chunkSize: Int): Stream[F2, Stream[F2, O]] =
this.through(Balance(chunkSize))

/**
* Like `balance` but instead of providing stream as source for worker, it runs each worker through
* supplied sink.
*
* Each supplied sink is run concurrently with other. This means that amount of sinks determines parallelism.
* Each sink may have different implementation, if required, for example one sink may process elements, the other
* may send elements for processing to another machine.
*
* Note that resulting stream will not emit single value (Unit). If you need to emit unit values from your sinks, consider
* using `balanceThrough`
*
* @param sinks Sinks that will concurrently process the work.
* @tparam F2
* @return
*/
def balanceTo[F2[x] >: F[x]: Concurrent](chunkSize: Int)(sinks: Sink[F2, O]*): Stream[F2, Unit] =
balanceThrough[F2, Unit](chunkSize)(sinks.map(_.andThen(_.drain)): _*)

/**
* Variant of `balanceTo` that takes number of concurrency required and single sink
* @param chunkSize Max size of output chunk for each stream supplied to sink
* @param maxConcurrent Maximum number of sinks to run concurrently
* @param sink Sink to use to process elements
* @return
*/
def balanceTo[F2[x] >: F[x]: Concurrent](chunkSize: Int, maxConcurrent: Int)(
sink: Sink[F2, O]): Stream[F2, Unit] =
this.balanceThrough[F2, Unit](chunkSize, maxConcurrent)(sink.andThen(_.drain))

/**
* Alias for `this.through(Balance.through(chunkSize)(pipes)`
*/
def balanceThrough[F2[x] >: F[x]: Concurrent, O2](chunkSize: Int)(
pipes: Pipe[F2, O, O2]*): Stream[F2, O2] =
this.through(Balance.through[F2, O, O2](chunkSize)(pipes: _*))

/**
* Variant of `balanceThrough` that takes number of concurrency required and single pipe
* @param chunkSize Max size of output chunk for each stream supplied to sink
* @param maxConcurrent Maximum number of pipes to run concurrently
* @param sink Pipe to use to process elements
* @return
*/
def balanceThrough[F2[x] >: F[x]: Concurrent, O2](chunkSize: Int, maxConcurrent: Int)(
pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
this.balanceThrough[F2, O2](chunkSize)((0 until maxConcurrent).map(_ => pipe): _*)

/**
* Removes all output values from this stream.
*
Expand Down
126 changes: 126 additions & 0 deletions core/shared/src/main/scala/fs2/concurrent/Balance.scala
@@ -0,0 +1,126 @@
package fs2.concurrent

import cats.effect.Concurrent
import fs2._

object Balance {

/**
* Allows to balance processing of this stream to parallel streams.
*
* This could be viewed as Stream `fan-out` operation allowing to process incoming `O` in parallel.
*
* As the elements arrive, they are balanced to streams that already started their evaluation.
* To control the fairness of the balance, the `chunkSize` parameter is available, that controls
* a maximum number of element pulled by single `stream`.
*
* Note that this will pull only that much `O` to satisfy needs of all workers currently being evaluated.
* When there are no stream awaiting the elements, this will stop pulling more elements from source.
*
* If there is need to achieve high throughput, `balance` may be used together with `prefetch` to initially prefetch
* large chunks that will be available for immediate distribution to streams. For example
* {{{
* source.prefetch(100).balance(chunkSize=10).take(10)
* }}}
* Constructs stream of 10 subscribers, that always takes 100 elements, and gives 10 elements to each subscriber. While
* subscriber processes the elements, this will pull another 100 elements, that will be again available, shall
* balance be done with supplying 10 elements to each of its subscribers.
*
* Usually this combinator is used together with parJoin, such as :
*
* {{{
* Stream(1,2,3,4).balance.map { worker =>
* worker.map(_.toString)
* }.take(3).parJoinUnbounded.compile.toVector.unsafeRunSync.toSet
* }}}
*
*
* When `source` terminates, the resulting streams (workers) are terminated once all elements so far pulled
* from `source` are processed.
*
* When `source` terminates, the resulting stream won't terminate.
*
* When the resulting stream is evaluated, then `source` will terminate if resulting stream will terminate.
*
* @return
*/
def apply[F[_]: Concurrent, O](chunkSize: Int): Pipe[F, O, Stream[F, O]] = { source =>
Stream.eval(PubSub(PubSub.Strategy.closeDrainFirst(strategy[O]))).flatMap { pubSub =>
def subscriber =
Stream
.repeatEval(pubSub.get(chunkSize))
.unNoneTerminate
.flatMap(Stream.chunk)

def push =
source.chunks
.evalMap(chunk => pubSub.publish(Some(chunk)))
.onFinalize(pubSub.publish(None))

Stream.constant(subscriber).concurrently(push)
}
}

/**
* Like `apply` but instead of providing stream as source, it runs each stream through
* supplied pipe.
*
* Each supplied pipe is run concurrently with other. This means that amount of pipes determines parallelism.
* Each pipe may have different implementation, if required, for example one pipe may process elements, the other
* may send elements for processing to another machine.
*
* Results from pipes are collected and emitted as resulting stream.
*
* This will terminate when :
*
* - this terminates
* - any pipe fails
* - all pipes terminate
*
* @param pipes Pipes to use to process work, that will be concurrently evaluated
* @param chunkSize A maximum chunk to present to every pipe. This allows fair distribution of the work
* between pipes.
* @return
*/
def through[F[_]: Concurrent, O, O2](chunkSize: Int)(pipes: Pipe[F, O, O2]*): Pipe[F, O, O2] =
_.balance(chunkSize)
.take(pipes.size)
.zipWith(Stream.emits(pipes)) { case (stream, pipe) => stream.through(pipe) }
.parJoinUnbounded

private def strategy[O]: PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] =
new PubSub.Strategy[Chunk[O], Chunk[O], Option[Chunk[O]], Int] {
def initial: Option[Chunk[O]] =
Some(Chunk.empty) // causes to block first push, hence all the other chunks must be non-empty.

def accepts(i: Chunk[O], queueState: Option[Chunk[O]]): Boolean =
queueState.isEmpty

def publish(i: Chunk[O], queueState: Option[Chunk[O]]): Option[Chunk[O]] =
Some(i).filter(_.nonEmpty)

def get(selector: Int, queueState: Option[Chunk[O]]): (Option[Chunk[O]], Option[Chunk[O]]) =
queueState match {
case None => (None, None)
case Some(chunk) =>
if (chunk.isEmpty)
(None, None) // first case for first subscriber, allows to publish to first publisher
else {
val (head, keep) = chunk.splitAt(selector)
if (keep.isEmpty) (None, Some(head))
else (Some(keep), Some(head))
}

}

def empty(queueState: Option[Chunk[O]]): Boolean =
queueState.isEmpty

def subscribe(selector: Int, queueState: Option[Chunk[O]]): (Option[Chunk[O]], Boolean) =
(queueState, false)

def unsubscribe(selector: Int, queueState: Option[Chunk[O]]): Option[Chunk[O]] =
queueState
}

}