Skip to content

Commit

Permalink
Sinks for grouping elements into n sized lists, sets and maps. (#1309)
Browse files Browse the repository at this point in the history
  • Loading branch information
PawelGizka committed Aug 7, 2019
1 parent 4811a5f commit bd78973
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
40 changes: 40 additions & 0 deletions streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
foldM $foldM
short circuits $foldMShortCircuits

collectAllN $collectAllN

collectAllToSet $collectAllToSet

collectAllToSetN $collectAllToSetN

collectAllToMap $collectAllToMap

collectAllToMapN $collectAllToMapN

collectAllWhile $collectAllWhile

foldWeighted $foldWeighted
Expand Down Expand Up @@ -885,6 +895,36 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv)
listResult.succeeded ==> (sinkResult must_=== listResult)
}

private def collectAllN = unsafeRun {
Stream[Int](1, 2, 3)
.run(Sink.collectAllN[Int](2))
.map(_ must_=== List(1, 2))
}

private def collectAllToSet = unsafeRun {
Stream[Int](1, 2, 3, 3, 4)
.run(Sink.collectAllToSet[Int])
.map(_ must_=== Set(1, 2, 3, 4))
}

private def collectAllToSetN = unsafeRun {
Stream[Int](1, 2, 1, 2, 3, 3, 4)
.run(Sink.collectAllToSetN[Int](3))
.map(_ must_=== Set(1, 2, 3))
}

private def collectAllToMap = unsafeRun {
Stream[Int](1, 2, 3)
.run(Sink.collectAllToMap[Int, Int](value => value))
.map(_ must_=== Map[Int, Int](1 -> 1, 2 -> 2, 3 -> 3))
}

private def collectAllToMapN = unsafeRun {
Stream[Int](1, 2, 3, 4, 5, 6)
.run(Sink.collectAllToMapN[Int, Int](2)(value => value % 2))
.map(_ must_=== Map[Int, Int](1 -> 1, 0 -> 2))
}

private def foldWeighted = unsafeRun {
Stream[Long](1, 5, 2, 3)
.transduce(Sink.foldWeighted(List[Long]())((_: Long) * 2, 12)((acc, el) => el :: acc).map(_.reverse))
Expand Down
30 changes: 30 additions & 0 deletions streams/shared/src/main/scala/zio/stream/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,36 @@ object Sink {
final def collectAll[A]: Sink[Nothing, Nothing, A, List[A]] =
ZSink.collectAll

/**
* see [[ZSink.collectAllN]]
*/
final def collectAllN[A](n: Long): ZSink[Any, Nothing, A, A, List[A]] =
ZSink.collectAllN(n)

/**
* see [[ZSink.collectAllToSet]]
*/
final def collectAllToSet[A]: ZSink[Any, Nothing, Nothing, A, Set[A]] =
ZSink.collectAllToSet

/**
* see [[ZSink.collectAllToSetN]]
*/
final def collectAllToSetN[A](n: Long): ZSink[Any, Nothing, A, A, Set[A]] =
ZSink.collectAllToSetN(n)

/**
* see [[ZSink.collectAllToMap]]
*/
final def collectAllToMap[K, A](key: A => K): ZSink[Any, Nothing, Nothing, A, Map[K, A]] =
ZSink.collectAllToMap(key)

/**
* see [[ZSink.collectAllToMapN]]
*/
final def collectAllToMapN[K, A](n: Long)(key: A => K): ZSink[Any, Nothing, A, A, Map[K, A]] =
ZSink.collectAllToMapN(n)(key)

/**
* see [[ZSink.collectAllWhile]]
*/
Expand Down
46 changes: 46 additions & 0 deletions streams/shared/src/main/scala/zio/stream/ZSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,52 @@ object ZSink extends ZSinkPlatformSpecific {
final def collectAll[A]: ZSink[Any, Nothing, Nothing, A, List[A]] =
fold[Nothing, A, List[A]](List.empty[A])((as, a) => Step.more(a :: as)).map(_.reverse)

/**
* Creates a sink accumulating incoming values into a list of maximum size `n`.
*/
def collectAllN[A](n: Long): ZSink[Any, Nothing, A, A, List[A]] =
foldUntil[List[A], A](List.empty[A], n)((list, element) => element :: list).map(_.reverse)

/**
* Creates a sink accumulating incoming values into a set.
*/
def collectAllToSet[A]: ZSink[Any, Nothing, Nothing, A, Set[A]] =
fold[Nothing, A, Set[A]](Set.empty[A])((set, element) => Step.more(set + element))

/**
* Creates a sink accumulating incoming values into a set of maximum size `n`.
*/
def collectAllToSetN[A](n: Long): ZSink[Any, Nothing, A, A, Set[A]] = {
def f(set: Set[A], element: A): ZSink.Step[Set[A], A] = {
val newSet = set + element
if (newSet.size > n) Step.done(set, Chunk.single(element))
else if (newSet.size == n) Step.done[Set[A], A](newSet, Chunk.empty)
else Step.more(newSet)
}
fold[A, A, Set[A]](Set.empty[A])(f)
}

/**
* Creates a sink accumulating incoming values into a map.
* Key of each element is determined by supplied function.
*/
def collectAllToMap[K, A](key: A => K): ZSink[Any, Nothing, Nothing, A, Map[K, A]] =
fold[Nothing, A, Map[K, A]](Map.empty[K, A])((map, element) => Step.more(map + (key(element) -> element)))

/**
* Creates a sink accumulating incoming values into a map of maximum size `n`.
* Key of each element is determined by supplied function.
*/
def collectAllToMapN[K, A](n: Long)(key: A => K): ZSink[Any, Nothing, A, A, Map[K, A]] = {
def f(map: Map[K, A], element: A): ZSink.Step[Map[K, A], A] = {
val newMap = map + (key(element) -> element)
if (newMap.size > n) Step.done(map, Chunk.single(element))
else if (newMap.size == n) Step.done[Map[K, A], A](newMap, Chunk.empty)
else Step.more(newMap)
}
fold[A, A, Map[K, A]](Map.empty[K, A])(f)
}

/**
* Accumulates incoming elements into a list as long as they verify predicate `p`.
*/
Expand Down

0 comments on commit bd78973

Please sign in to comment.