Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove balance (and PubSub) #2340

Merged
merged 1 commit into from Mar 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 0 additions & 78 deletions core/shared/src/main/scala/fs2/Stream.scala
Expand Up @@ -740,84 +740,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
}
.stream

/** Like [[balance]] but uses an unlimited chunk size.
*
* Alias for `through(Balance(Int.MaxValue))`.
*/
def balanceAvailable[F2[x] >: F[x]: Concurrent]: Stream[F2, Stream[F2, O]] =
through(Balance[F2, O](Int.MaxValue))

/** Returns a stream of streams where each inner stream sees an even portion of the
* elements of the source stream relative to the number of inner streams taken from
* the outer stream. For example, `src.balance(chunkSize).take(2)` results in two
* inner streams, each which see roughly half of the elements of the source stream.
*
* The `chunkSize` parameter specifies the maximum chunk size from the source stream
* that should be passed to an inner stream. For completely fair distribution of elements,
* use a chunk size of 1. For best performance, use a chunk size of `Int.MaxValue`.
*
* See [[fs2.concurrent.Balance.apply]] for more details.
*
* Alias for `through(Balance(chunkSize))`.
*/
def balance[F2[x] >: F[x]: Concurrent](
chunkSize: Int
): Stream[F2, Stream[F2, O]] =
through(Balance(chunkSize))

/** Like [[balance]] but instead of providing a stream of sources, runs each pipe.
*
* The pipes are run concurrently with each other. Hence, the parallelism factor is equal
* to the number of pipes.
* Each pipe may have a different implementation, if required; for example one pipe may
* process elements while another may send elements for processing to another machine.
*
* Each pipe is guaranteed to see all `O` pulled from the source stream, unlike `broadcast`,
* where workers see only the elements after the start of each worker evaluation.
*
* Note: the resulting stream will not emit values, even if the pipes do.
* If you need to emit `Unit` values, consider using `balanceThrough`.
*
* @param chunkSize max size of chunks taken from the source stream
* @param pipes pipes that will concurrently process the work
*/
def balanceTo[F2[x] >: F[x]: Concurrent](
chunkSize: Int
)(pipes: Pipe[F2, O, Nothing]*): Stream[F2, INothing] =
balanceThrough[F2, INothing](chunkSize)(pipes: _*)

/** Variant of `balanceTo` that broadcasts to `maxConcurrent` instances of a single pipe.
*
* @param chunkSize max size of chunks taken from the source stream
* @param maxConcurrent maximum number of pipes to run concurrently
* @param pipe pipe to use to process elements
*/
def balanceTo[F2[x] >: F[x]: Concurrent](chunkSize: Int, maxConcurrent: Int)(
pipe: Pipe[F2, O, INothing]
): Stream[F2, Unit] =
balanceThrough[F2, INothing](chunkSize, maxConcurrent)(pipe)

/** Alias for `through(Balance.through(chunkSize)(pipes)`.
*/
def balanceThrough[F2[x] >: F[x]: Concurrent, O2](
chunkSize: Int
)(pipes: Pipe[F2, O, O2]*): Stream[F2, O2] =
through(Balance.through[F2, O, O2](chunkSize)(pipes: _*))

/** Variant of `balanceThrough` that takes number of concurrency required and single pipe.
*
* @param chunkSize max size of chunks taken from the source stream
* @param maxConcurrent maximum number of pipes to run concurrently
* @param pipe pipe to use to process elements
*/
def balanceThrough[F2[x] >: F[x]: Concurrent, O2](
chunkSize: Int,
maxConcurrent: Int
)(
pipe: Pipe[F2, O, O2]
): Stream[F2, O2] =
balanceThrough[F2, O2](chunkSize)((0 until maxConcurrent).map(_ => pipe): _*)

/** Removes all output values from this stream.
*
* Often used with `merge` to run one side of the merge for its effect
Expand Down
145 changes: 0 additions & 145 deletions core/shared/src/main/scala/fs2/concurrent/Balance.scala

This file was deleted.