Skip to content

Commit

Permalink
Add StreamDecoder#{filter, withFilter} and Chunk#withFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
mpilquist committed May 14, 2024
1 parent 9b1b27c commit 9d91082
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
6 changes: 6 additions & 0 deletions core/shared/src/main/scala/fs2/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,12 @@ abstract class Chunk[+O] extends Serializable with ChunkPlatform[O] with ChunkRu
F.map(loop(0, size).value)(Chunk.chain)
}

/** Alias for [[filter]].
*
* Implemented to enable filtering in for comprehensions
*/
def withFilter(p: O => Boolean): Chunk[O] = filter(p)

/** Zips this chunk the the supplied chunk, returning a chunk of tuples.
*/
def zip[O2](that: Chunk[O2]): Chunk[(O, O2)] = zipWith(that)(Tuple2.apply)
Expand Down
3 changes: 2 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3010,7 +3010,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def unchunks[O2](implicit ev: O <:< Chunk[O2]): Stream[F, O2] =
underlying.flatMapOutput(Pull.output(_)).streamNoScope

/** Alias for [[filter]]
/** Alias for [[filter]].
*
* Implemented to enable filtering in for comprehensions
*/
def withFilter(f: O => Boolean): Stream[F, O] = this.filter(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ final class StreamDecoder[+A] private (private val step: StreamDecoder.Step[A])
}
)

def filter(p: A => Boolean): StreamDecoder[A] =
flatMap(a => if (p(a)) StreamDecoder.emit(a) else StreamDecoder.empty)

def handleErrorWith[A2 >: A](f: Throwable => StreamDecoder[A2]): StreamDecoder[A2] =
new StreamDecoder[A2](
self.step match {
Expand Down Expand Up @@ -224,6 +227,12 @@ final class StreamDecoder[+A] private (private val step: StreamDecoder.Step[A])
)
}
}

/** Alias for [[filter]].
*
* Implemented to enable filtering in for comprehensions
*/
def withFilter(p: A => Boolean): StreamDecoder[A] = filter(p)
}

object StreamDecoder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ class StreamCodecSuite extends Fs2Suite {
}
}

test("filter") {
val bits = StreamEncoder.many(int32).encodeAllValid(Vector(1, 2, 3, 4))
val decoder = StreamDecoder.tryMany(int32)
val filteredDecoder = for (n <- decoder if n % 2 != 0) yield n
assertEquals(filteredDecoder.decode[Fallible](Stream(bits)).toList, Right(List(1, 3)))
}


def genChunkSize = Gen.choose(1L, 128L)
def genSmallListOfString = Gen.choose(0, 10).flatMap(n => Gen.listOfN(n, Gen.alphaStr))

Expand Down

0 comments on commit 9d91082

Please sign in to comment.