From 34c6bdd2f6e72ffdf30ec4db5412d0f2d06f4182 Mon Sep 17 00:00:00 2001 From: Regis Kuckaertz Date: Sat, 2 Nov 2019 16:06:58 +0000 Subject: [PATCH] Add `ZSink.fromFunctionM` --- .../jvm/src/test/scala/zio/stream/SinkSpec.scala | 8 ++++++++ streams/shared/src/main/scala/zio/stream/Sink.scala | 6 ++++++ streams/shared/src/main/scala/zio/stream/ZSink.scala | 6 ++++++ 3 files changed, 20 insertions(+) diff --git a/streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala b/streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala index dffd3f00942..a82e5a2a2a4 100644 --- a/streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala +++ b/streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala @@ -1041,6 +1041,14 @@ object SinkSpec .runCollect, equalTo(List("1", "2", "3", "4", "5")) ) + }, + testM("fromFunctionM") { + assertM( + Stream("1", "2", "3", "4", "5") + .transduce(Sink.fromFunctionM[Throwable, String, Int](s => Task(s.toInt))) + .runCollect, + equalTo(List(1, 2, 3, 4, 5)) + ) } ), testM("fromOutputStream") { diff --git a/streams/shared/src/main/scala/zio/stream/Sink.scala b/streams/shared/src/main/scala/zio/stream/Sink.scala index 9d758157200..6b7d46d9a49 100644 --- a/streams/shared/src/main/scala/zio/stream/Sink.scala +++ b/streams/shared/src/main/scala/zio/stream/Sink.scala @@ -191,6 +191,12 @@ object Sink extends Serializable { final def fromFunction[A, B](f: A => B): Sink[Unit, Nothing, A, B] = ZSink.fromFunction(f) + /** + * see [[ZSink.fromFunctionM]] + */ + final def fromFunctionM[E, A, B](f: A => ZIO[Any, E, B]): Sink[Option[E], Nothing, A, B] = + ZSink.fromFunctionM(f) + /** * see [[ZSink.halt]] */ diff --git a/streams/shared/src/main/scala/zio/stream/ZSink.scala b/streams/shared/src/main/scala/zio/stream/ZSink.scala index 272dad9c679..594174cad7c 100644 --- a/streams/shared/src/main/scala/zio/stream/ZSink.scala +++ b/streams/shared/src/main/scala/zio/stream/ZSink.scala @@ -1495,6 +1495,12 @@ object ZSink extends ZSinkPlatformSpecific with Serializable { final def fromFunction[A, B](f: A => B): ZSink[Any, Unit, Nothing, A, B] = identity.map(f) + /** + * Creates a sink that effectfully transforms incoming values. + */ + final def fromFunctionM[R, E, A, B](f: A => ZIO[R, E, B]): ZSink[R, Option[E], Nothing, A, B] = + identity.mapError(_ => None).mapM(f(_).mapError(Some(_))) + /** * Creates a sink halting with a specified cause. */