From 411f287eaa6e732b4312b9f450de32b4780109c3 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 26 Apr 2024 11:00:13 +0300 Subject: [PATCH 01/29] strm_mapZioPar_opt: introduce mapZioPar and mapZioParUnordered 'direct' impls --- .../src/main/scala/zio/stream/ZPipeline.scala | 15 ++- .../src/main/scala/zio/stream/ZStream.scala | 103 +++++++++++++++++- 2 files changed, 113 insertions(+), 5 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index 028bf7783d7..256ddbf9f07 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -1814,13 +1814,22 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { */ def mapZIOParUnordered[Env, Err, In, Out](n: => Int)(f: In => ZIO[Env, Err, Out])(implicit trace: Trace - ): ZPipeline[Env, Err, In, Out] = - new ZPipeline( + ): ZPipeline[Env, Err, In, Out] = { + ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + /*strm + .toChannel + .concatMap(ZChannel.writeChunk(_)) + .mergeMap(n, 16)(in => ZStream.fromZIO(f(in)).channel) + .toStream*/ + strm.mapZIOParUnordered(n)(f) + } + /*new ZPipeline( ZChannel .identity[Nothing, Chunk[In], Any] .concatMap(ZChannel.writeChunk(_)) .mergeMap(n, 16)(in => ZStream.fromZIO(f(in)).channel) - ) + )*/ + } /** * Emits the provided chunk before emitting any other value. diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 706b78024e5..56be5858e39 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1931,7 +1931,58 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - self >>> ZPipeline.mapZIOPar(n)(f) + /*self >>> ZPipeline.mapZIOPar(n)(f)*/ this.mapZIOPar2[R1, E1, A2](n)(f) + + def mapZIOPar2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit + trace: Trace + ): ZStream[R1, E1, A2] = { + val z0: ZIO[Scope, Nothing, ZStream[R1, E1, A2]] = for { + scope <- ZIO.scope + q <- zio.Queue.bounded[zio.stream.Take[E1, Fiber[E1, A2]]](bufferSize) + permits <- zio.Semaphore.make(n) + failureSignal <- zio.Promise.make[E1, Nothing] + } yield { + def forkF(a : A): ZIO[R1, Nothing, Fiber.Runtime[E1, A2]] = ZIO.uninterruptibleMask { restore => + for { + localScope <- zio.Scope.make + _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) + fib <- restore(f(a)) + .tapError(failureSignal.fail(_)) + .onExit(localScope.close(_)) + .forkIn(scope) + } yield fib + } + val enqueuer: URIO[R1, Fiber.Runtime[Nothing, Unit]] = self + .mapZIO(forkF) + .runIntoQueue(q) + .forkIn(scope) + + ZStream + .fromZIO(enqueuer) + .flatMap { fib => + ZStream + .fromQueue(q) + .flattenTake + /*.mapZIO{f => + (f.join race failureSignal.await.debug("err signaled")).debug("raced") + }*/ + .flatMap { f => + ZStream.unwrap { + (f.join.exit race failureSignal.await.exit).map { ex => + ex.foldExit( + ZStream.failCause(_), + ZStream.succeed(_) + ) + } + } + } + .concat(ZStream.execute(fib.join)) + } + } + + ZStream + .unwrapScoped[R1](z0) + } /** * Maps over elements of the stream with the specified effectful function, @@ -1955,7 +2006,55 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOParUnordered[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - self >>> ZPipeline.mapZIOParUnordered(n)(f) + /*self >>> ZPipeline.mapZIOParUnordered(n)(f)*/ this.mapZIOParUnordered2[R1, E1, A2](n)(f) + + def mapZIOParUnordered2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)( + f: A => ZIO[R1, E1, A2] + )(implicit trace: Trace): ZStream[R1, E1, A2] = { + val z0: ZIO[Scope, Nothing, ZStream[R1, E1, A2]] = for { + scope <- ZIO.scope + q <- zio.Queue.bounded[zio.stream.Take[E1, A2]](bufferSize) + permits <- zio.Semaphore.make(n) + } yield { + def enqueue(a : A) = ZIO.uninterruptibleMask { restore => + for { + localScope <- zio.Scope.make + _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) + fib <- restore { + zio.stream.Take.fromZIO(f(a)) + .flatMap(q.offer(_)) + } + .onExit(localScope.close(_)) + .unit + .forkIn(scope) + } yield () + } + + val enqueuer = self + .runForeach(enqueue) + .onExit{ex => + val t = ex.foldExit( + zio.stream.Take.failCause(_), + _ => zio.stream.Take.end + ) + q.offer(t) + } + .forkIn(scope) + + val s0 = ZStream + .fromZIO(enqueuer) + .flatMap { enqFib => + ZStream + .fromQueue(q) + .flattenTake + .concat(ZStream.execute(enqFib.join)) + } + + s0 + } + + ZStream.unwrapScoped[R1](z0) + } /** * Merges this stream and the specified stream together. From 6efc2d7d6337ad89b6363a40a9751dd560e99d0f Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 26 Apr 2024 16:45:21 +0300 Subject: [PATCH 02/29] strm_mapZioPar_opt: stream.mapZIOParUnordered --- .../src/main/scala/zio/stream/ZPipeline.scala | 13 +++--- .../src/main/scala/zio/stream/ZStream.scala | 41 ++++++++++++------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index 256ddbf9f07..9205f1a042e 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -1798,14 +1798,17 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { */ def mapZIOPar[Env, Err, In, Out](n: => Int)(f: In => ZIO[Env, Err, Out])(implicit trace: Trace - ): ZPipeline[Env, Err, In, Out] = - new ZPipeline( - ZChannel - .identity[Nothing, Chunk[In], Any] + ): ZPipeline[Env, Err, In, Out] = { + ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + /*strm + .toChannel .concatMap(ZChannel.writeChunk(_)) .mapOutZIOPar(n)(f) .mapOut(Chunk.single) - ) + .toStream*/ + strm.mapZIOPar(n)(f) + } + } /** * Maps over elements of the stream with the specified effectful function, diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 56be5858e39..aeadc656e80 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -21,7 +21,7 @@ import zio.internal.{SingleThreadedRingBuffer, UniqueKey} import zio.metrics.MetricLabel import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stm._ -import zio.stream.ZStream.{DebounceState, HandoffSignal, zipChunks} +import zio.stream.ZStream.{DebounceState, HandoffSignal, failCause, zipChunks} import zio.stream.internal.{ZInputStream, ZReader} import java.io.{IOException, InputStream} @@ -1941,6 +1941,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, q <- zio.Queue.bounded[zio.stream.Take[E1, Fiber[E1, A2]]](bufferSize) permits <- zio.Semaphore.make(n) failureSignal <- zio.Promise.make[E1, Nothing] + //completionSignal <- zio.Promise.make[Nothing, Any] } yield { def forkF(a : A): ZIO[R1, Nothing, Fiber.Runtime[E1, A2]] = ZIO.uninterruptibleMask { restore => for { @@ -1949,7 +1950,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, fib <- restore(f(a)) .tapError(failureSignal.fail(_)) .onExit(localScope.close(_)) - .forkIn(scope) + .fork } yield fib } val enqueuer: URIO[R1, Fiber.Runtime[Nothing, Unit]] = self @@ -1970,13 +1971,15 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, ZStream.unwrap { (f.join.exit race failureSignal.await.exit).map { ex => ex.foldExit( - ZStream.failCause(_), + { cause => + ZStream.fromZIO(failureSignal.await) + }, ZStream.succeed(_) ) } } } - .concat(ZStream.execute(fib.join)) + //.concat(ZStream.execute(completionSignal.succeed(()) *> fib.join)) } } @@ -2016,38 +2019,46 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, q <- zio.Queue.bounded[zio.stream.Take[E1, A2]](bufferSize) permits <- zio.Semaphore.make(n) } yield { - def enqueue(a : A) = ZIO.uninterruptibleMask { restore => + def enqueue(a : A): ZIO[R1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => for { localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - fib <- restore { - zio.stream.Take.fromZIO(f(a)) - .flatMap(q.offer(_)) - } - .onExit(localScope.close(_)) - .unit - .forkIn(scope) + z0 = f(a) + t = zio.stream.Take.fromZIO(z0) + offer = t.flatMap{tt => + q.offer(tt) + } + fib <- { + offer + .onExit(localScope.close(_)) + .unit + .fork + } } yield () } val enqueuer = self - .runForeach(enqueue) + .runForeach(enqueue _) .onExit{ex => val t = ex.foldExit( zio.stream.Take.failCause(_), _ => zio.stream.Take.end ) - q.offer(t) + permits + .withPermits(n) { + q.offer(t) + } + .interruptible } .forkIn(scope) val s0 = ZStream .fromZIO(enqueuer) .flatMap { enqFib => + //ZStream.never *> ZStream .fromQueue(q) .flattenTake - .concat(ZStream.execute(enqFib.join)) } s0 From d911350d52893627e69bda9761e38fafd2024e1f Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 28 Apr 2024 12:21:10 +0300 Subject: [PATCH 03/29] strm_mapZioPar_opt: avoid mapZIOPar atm, add tests for mapZIOParUnordered, stabilize mapZIOParUnordered2 --- .../test/scala/zio/stream/ZStreamSpec.scala | 52 ++++++++++++++++++- .../src/main/scala/zio/stream/ZPipeline.scala | 5 +- .../src/main/scala/zio/stream/ZStream.scala | 48 +++++++++-------- 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala index 7cd9642ee3e..e5b9fba4751 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala @@ -2677,7 +2677,57 @@ object ZStreamSpec extends ZIOBaseSpec { val stream = ZStream.fromIterable(0 to 3).mapZIOParUnordered(10)(_ => ZIO.fail("fail")) assertZIO(stream.runDrain.exit)(fails(equalTo("fail"))) - } @@ nonFlaky @@ TestAspect.diagnose(10.seconds) + } @@ nonFlaky @@ TestAspect.diagnose(10.seconds), + test("interruption propagation") { + for { + interrupted <- Ref.make(false) + latch <- Promise.make[Nothing, Unit] + fib <- ZStream(()) + .mapZIOParUnordered(1)(_ => (latch.succeed(()) *> ZIO.infinity).onInterrupt(interrupted.set(true))) + .runDrain + .fork + _ <- latch.await + _ <- fib.interrupt + result <- interrupted.get + } yield assert(result)(isTrue) + }, + test("interrupts pending tasks when one of the tasks fails U") { + for { + interrupted <- Ref.make(0) + latch1 <- Promise.make[Nothing, Unit] + latch2 <- Promise.make[Nothing, Unit] + result <- ZStream(1, 2, 3) + .mapZIOParUnordered(3) { + case 1 => (latch1.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1)) + case 2 => (latch2.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1)) + case 3 => latch1.await *> latch2.await *> ZIO.fail("Boom") + } + .runDrain + .exit + count <- interrupted.get + } yield assert(count)(equalTo(2)) && assert(result)(fails(equalTo("Boom"))) + } @@ nonFlaky, + test("awaits children fibers properly") { + assertZIO( + ZStream + .fromIterable((0 to 100)) + .interruptWhen(ZIO.never) + .mapZIOParUnordered(8)(_ => ZIO.succeed(1).repeatN(2000)) + .runDrain + .exit + .map(_.isInterrupted) + )(equalTo(false)) + }, + test("propagates error of original stream") { + for { + fiber <- (ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) ++ ZStream.fail(new Throwable("Boom"))) + .mapZIOParUnordered(2)(_ => ZIO.sleep(1.second)) + .runDrain + .fork + _ <- TestClock.adjust(5.seconds) + exit <- fiber.await + } yield assert(exit)(fails(hasMessage(equalTo("Boom")))) + } ), suite("mergeLeft/Right")( test("mergeLeft with HaltStrategy.Right terminates as soon as the right stream terminates") { diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index 9205f1a042e..a990e7e009b 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -1800,13 +1800,12 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { trace: Trace ): ZPipeline[Env, Err, In, Out] = { ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => - /*strm + strm .toChannel .concatMap(ZChannel.writeChunk(_)) .mapOutZIOPar(n)(f) .mapOut(Chunk.single) - .toStream*/ - strm.mapZIOPar(n)(f) + .toStream } } diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index aeadc656e80..ec1ddc5d328 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1931,7 +1931,8 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - /*self >>> ZPipeline.mapZIOPar(n)(f)*/ this.mapZIOPar2[R1, E1, A2](n)(f) + self >>> ZPipeline.mapZIOPar(n)(f) + /*this.mapZIOPar2[R1, E1, A2](n)(f)*/ def mapZIOPar2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace @@ -2014,8 +2015,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOParUnordered2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)( f: A => ZIO[R1, E1, A2] )(implicit trace: Trace): ZStream[R1, E1, A2] = { - val z0: ZIO[Scope, Nothing, ZStream[R1, E1, A2]] = for { - scope <- ZIO.scope + val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { q <- zio.Queue.bounded[zio.stream.Take[E1, A2]](bufferSize) permits <- zio.Semaphore.make(n) } yield { @@ -2023,13 +2023,14 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, for { localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - z0 = f(a) - t = zio.stream.Take.fromZIO(z0) + z1 = f(a) + t = zio.stream.Take.fromZIO(z1) offer = t.flatMap{tt => q.offer(tt) } fib <- { offer + .interruptible .onExit(localScope.close(_)) .unit .fork @@ -2037,23 +2038,28 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, } yield () } - val enqueuer = self - .runForeach(enqueue _) - .onExit{ex => - val t = ex.foldExit( - zio.stream.Take.failCause(_), - _ => zio.stream.Take.end - ) - permits - .withPermits(n) { - q.offer(t) + val enqueuer = ZIO.transplant { grafter => + self + .runForeach(a => grafter(enqueue(a))) + .foldCauseZIO ( + c => { + permits + .withPermits(n) { + q.offer(zio.stream.Take.failCause(c)) + } + }, + _ => { + permits + .withPermits(n) { + q.offer(zio.stream.Take.end) + } } - .interruptible - } - .forkIn(scope) + ) + } + .forkScoped - val s0 = ZStream - .fromZIO(enqueuer) + val s0: ZStream[R1, E1, A2] = ZStream + .scoped[R1](enqueuer) .flatMap { enqFib => //ZStream.never *> ZStream @@ -2064,7 +2070,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, s0 } - ZStream.unwrapScoped[R1](z0) + ZStream.unwrap(z0) } /** From 277bf35e89e9085a3f38299b0a42361ff9044916 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 28 Apr 2024 15:07:19 +0300 Subject: [PATCH 04/29] strm_mapZioPar_opt: mapZIOPar2 --- .../src/main/scala/zio/stream/ZPipeline.scala | 5 +-- .../src/main/scala/zio/stream/ZStream.scala | 32 +++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index a990e7e009b..fd2763bfb38 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -1801,11 +1801,12 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { ): ZPipeline[Env, Err, In, Out] = { ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => strm - .toChannel + /*.toChannel .concatMap(ZChannel.writeChunk(_)) .mapOutZIOPar(n)(f) .mapOut(Chunk.single) - .toStream + .toStream*/ + .mapZIOPar(n)(f) } } diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index ec1ddc5d328..1d814a64cf2 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1931,14 +1931,13 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - self >>> ZPipeline.mapZIOPar(n)(f) - /*this.mapZIOPar2[R1, E1, A2](n)(f)*/ + /*self >>> ZPipeline.mapZIOPar(n)(f)*/ + this.mapZIOPar2[R1, E1, A2](n)(f) def mapZIOPar2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = { - val z0: ZIO[Scope, Nothing, ZStream[R1, E1, A2]] = for { - scope <- ZIO.scope + val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { q <- zio.Queue.bounded[zio.stream.Take[E1, Fiber[E1, A2]]](bufferSize) permits <- zio.Semaphore.make(n) failureSignal <- zio.Promise.make[E1, Nothing] @@ -1949,25 +1948,27 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) fib <- restore(f(a)) - .tapError(failureSignal.fail(_)) - .onExit(localScope.close(_)) + .foldCauseZIO( + c => failureSignal.failCause(c) *> localScope.close(Exit.failCause(c)) *> ZIO.refailCause(c), + u => localScope.close(Exit.succeed(u)) as u + ) .fork } yield fib } - val enqueuer: URIO[R1, Fiber.Runtime[Nothing, Unit]] = self - .mapZIO(forkF) - .runIntoQueue(q) - .forkIn(scope) + val enqueuer: ZIO[R1 with Scope, Nothing, Fiber.Runtime[Nothing, Unit]] = ZIO + .transplant { grafter => + self + .mapZIO(a => grafter(forkF(a))) + .runIntoQueue(q) + .forkScoped + } ZStream - .fromZIO(enqueuer) + .scoped[R1](enqueuer) .flatMap { fib => ZStream .fromQueue(q) .flattenTake - /*.mapZIO{f => - (f.join race failureSignal.await.debug("err signaled")).debug("raced") - }*/ .flatMap { f => ZStream.unwrap { (f.join.exit race failureSignal.await.exit).map { ex => @@ -1980,12 +1981,11 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, } } } - //.concat(ZStream.execute(completionSignal.succeed(()) *> fib.join)) } } ZStream - .unwrapScoped[R1](z0) + .unwrap(z0) } /** From 58cabb4aee4e90e7d223a584447007fa4000dbdb Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 30 Apr 2024 15:31:24 +0300 Subject: [PATCH 05/29] strm_mapZioPar_opt: use queue of Exit rather than a queue of Take --- .../src/main/scala/zio/stream/ZStream.scala | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 1d814a64cf2..bf9d74a58fc 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -2016,7 +2016,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, f: A => ZIO[R1, E1, A2] )(implicit trace: Trace): ZStream[R1, E1, A2] = { val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { - q <- zio.Queue.bounded[zio.stream.Take[E1, A2]](bufferSize) + q <- zio.Queue.bounded[zio.Exit[Option[E1], A2]](bufferSize) permits <- zio.Semaphore.make(n) } yield { def enqueue(a : A): ZIO[R1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => @@ -2024,10 +2024,11 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) z1 = f(a) - t = zio.stream.Take.fromZIO(z1) + /*t = zio.stream.Take.fromZIO(z1) offer = t.flatMap{tt => q.offer(tt) - } + }*/ + offer = z1.mapError(Some(_)) .exit.flatMap(ex => q.offer(ex)) fib <- { offer .interruptible @@ -2045,26 +2046,38 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, c => { permits .withPermits(n) { - q.offer(zio.stream.Take.failCause(c)) + q.offer(zio.Exit.failCause(c.map(Some(_)))) } }, _ => { permits .withPermits(n) { - q.offer(zio.stream.Take.end) + q.offer(zio.Exit.fail(None)) } } ) } .forkScoped + lazy val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = + ZChannel + .fromZIO(q.take) + .flatMap { ex => + ex.foldExit( + c => { + Cause + .flipCauseOption(c) + .map(ZChannel.refailCause(_)) + .getOrElse(ZChannel.unit) + }, + a2 => ZChannel.write(Chunk.single(a2)) *> reader + ) + } + val s0: ZStream[R1, E1, A2] = ZStream .scoped[R1](enqueuer) .flatMap { enqFib => - //ZStream.never *> - ZStream - .fromQueue(q) - .flattenTake + reader.toStream } s0 From 0c31b9d1aefcebd859c3480080c883a89e761bdf Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 1 May 2024 16:56:30 +0300 Subject: [PATCH 06/29] strm_mapZioPar_opt: strm.mapZIOParUnordered, run upstream in a scoped executor so we can guarantee child fibers are not interrupted too soon. --- streams/shared/src/main/scala/zio/stream/ZStream.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index bf9d74a58fc..59283238713 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -2028,7 +2028,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, offer = t.flatMap{tt => q.offer(tt) }*/ - offer = z1.mapError(Some(_)) .exit.flatMap(ex => q.offer(ex)) + offer = z1.mapError(Some(_)).exit.flatMap(ex => q.offer(ex)) fib <- { offer .interruptible @@ -2039,9 +2039,8 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, } yield () } - val enqueuer = ZIO.transplant { grafter => - self - .runForeach(a => grafter(enqueue(a))) + val enqueuer = self + .runScoped(ZSink.foreach(enqueue(_))) .foldCauseZIO ( c => { permits @@ -2056,7 +2055,6 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, } } ) - } .forkScoped lazy val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = From b7cb92ed1c115a3f7379983dbc62bd18e622e173 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 1 May 2024 16:57:43 +0300 Subject: [PATCH 07/29] strm_mapZioPar_opt__fiberChildren: make fiber's children collection fiber-accessed only, this allows lifting the synchronized layer on top of it --- core/shared/src/main/scala/zio/Fiber.scala | 2 + .../scala/zio/internal/FiberRuntime.scala | 66 ++++++++++++++++--- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/core/shared/src/main/scala/zio/Fiber.scala b/core/shared/src/main/scala/zio/Fiber.scala index b5737521038..bc02939b8cd 100644 --- a/core/shared/src/main/scala/zio/Fiber.scala +++ b/core/shared/src/main/scala/zio/Fiber.scala @@ -521,6 +521,8 @@ object Fiber extends FiberPlatformSpecific { def getFiberRefs()(implicit unsafe: Unsafe): FiberRefs def removeObserver(observer: Exit[E, A] => Unit)(implicit unsafe: Unsafe): Unit + + def alreadyExited(implicit unsafe: Unsafe): Boolean } /** diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 4dcdd76813e..123ba2aa34d 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -85,7 +85,16 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, ) } - def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = + private def childrenChunk = if(_children eq null) Chunk.empty else { + val bldr = Chunk.newBuilder[Fiber.Runtime[_, _]] + _children.forEach { child => + if((child ne null) && !child.unsafe.alreadyExited(Unsafe.unsafe)) + bldr.addOne(child) + } + bldr.result() + } + def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = { + ZIO.succeed { val childs = _children if (childs == null) Chunk.empty @@ -94,6 +103,25 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, Chunk.fromJavaIterable(childs) } } + ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { + case (fib, _) => + if (fib eq self) + ZIO.succeed(self.childrenChunk) + else { + ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k => + ZIO.succeed { + this.tell { + FiberMessage.Stateful { + case (fib, _) => + k(ZIO.succeed(fib.childrenChunk)) + } + } + } + } + } + } + } + def fiberRefs(implicit trace: Trace): UIO[FiberRefs] = ZIO.succeed(_fiberRefs) @@ -501,7 +529,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, */ private[zio] def getChildren(): JavaSet[Fiber.Runtime[_, _]] = { if (_children eq null) { - _children = Platform.newConcurrentWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) + _children = Platform./*newConcurrentWeakSet*/newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) } _children } @@ -663,18 +691,34 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, private def interruptAllChildren(): UIO[Any] = if (sendInterruptSignalToAllChildren(_children)) { val iterator = _children.iterator() - _children = null - val body = () => { - val next = iterator.next() + var curr: Fiber.Runtime[_, _] = null - if (next != null) next.await(id.location) else Exit.unit + def skip() = { + curr = null + while(iterator.hasNext && (curr eq null)) { + curr = iterator.next() + if((curr ne null) && curr.unsafe.alreadyExited(zio.Unsafe.unsafe)) + curr = null + } } - // Now await all children to finish: - ZIO - .whileLoop(iterator.hasNext)(body())(_ => ())(id.location) + skip() + + + if(null ne curr) { + val body = () => { + val c = curr + skip() + c.await(id.location) + } + + // Now await all children to finish: + ZIO + .whileLoop(null ne curr)(body())(_ => ())(id.location) + } + else null } else null private[zio] def isAlive(): Boolean = @@ -1163,7 +1207,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, while (iterator.hasNext) { val next = iterator.next() - if (next ne null) { + if ((next ne null) && !next.unsafe.alreadyExited(zio.Unsafe.unsafe)) { next.tellInterrupt(cause) told = true @@ -1377,6 +1421,8 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, def removeObserver(observer: Exit[E, A] => Unit)(implicit unsafe: Unsafe): Unit = self.removeObserver(observer) + + override def alreadyExited(implicit unsafe: Unsafe): Boolean = _exitValue ne null } } From 65961421afa31c51f0a346ed47964a34cdd36737 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 3 May 2024 17:52:47 +0300 Subject: [PATCH 08/29] strm_mapZioPar_opt: mapZIOParUnordered2, record counting impl --- .../src/main/scala/zio/stream/ZStream.scala | 88 +++++++++++++------ 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 59283238713..292a9b44ab0 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -17,7 +17,7 @@ package zio.stream import zio._ -import zio.internal.{SingleThreadedRingBuffer, UniqueKey} +import zio.internal.{PartitionedRingBuffer, SingleThreadedRingBuffer, UniqueKey} import zio.metrics.MetricLabel import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stm._ @@ -2016,7 +2016,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, f: A => ZIO[R1, E1, A2] )(implicit trace: Trace): ZStream[R1, E1, A2] = { val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { - q <- zio.Queue.bounded[zio.Exit[Option[E1], A2]](bufferSize) + q <- zio.Queue.bounded/*[zio.Exit[Option[E1], A2]]*/[/*zio.Exit[Either[E1, Int], A2]*/ Any](bufferSize) permits <- zio.Semaphore.make(n) } yield { def enqueue(a : A): ZIO[R1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => @@ -2024,11 +2024,11 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) z1 = f(a) - /*t = zio.stream.Take.fromZIO(z1) - offer = t.flatMap{tt => - q.offer(tt) - }*/ - offer = z1.mapError(Some(_)).exit.flatMap(ex => q.offer(ex)) + offer = //z1.flatMap(q.offer(_)) + z1.foldCauseZIO( + c => q.offer(Left(c)), + a2 => q.offer(a2) + ) fib <- { offer .interruptible @@ -2039,39 +2039,67 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, } yield () } + val countingForeach: ZChannel[R1, E, Chunk[A], Any, E1, Nothing, ZStream.NRows] = ZChannel.suspend{ + var nRows = 0 + lazy val proc : ZChannel[R1, E, Chunk[A], Any, E1, Nothing, ZStream.NRows] = + ZChannel.readWithCause( + in => { + nRows += in.size + ZChannel.fromZIO(ZIO.foreachDiscard(in)(enqueue)) *> proc + }, + ZChannel.refailCause(_), + _ => ZChannel.succeedNow(ZStream.NRows(nRows)) + ) + + proc + } + val enqueuer = self - .runScoped(ZSink.foreach(enqueue(_))) + .toChannel + .pipeTo(countingForeach) + .runScoped .foldCauseZIO ( c => { - permits - .withPermits(n) { - q.offer(zio.Exit.failCause(c.map(Some(_)))) - } + q.offer(Left(c)) }, - _ => { - permits - .withPermits(n) { - q.offer(zio.Exit.fail(None)) + nRows => { + /*permits + .withPermits(n)*/ { + q.offer(Right(nRows)) } } ) .forkScoped - lazy val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = - ZChannel - .fromZIO(q.take) - .flatMap { ex => - ex.foldExit( - c => { - Cause - .flipCauseOption(c) - .map(ZChannel.refailCause(_)) - .getOrElse(ZChannel.unit) - }, - a2 => ZChannel.write(Chunk.single(a2)) *> reader - ) + val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = ZChannel.suspend { + var seenRows = 0 + var nRows = -1 + lazy val reader0 : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = ZChannel + .fromZIO(q.take/*.debug(s"reader(seen=$seenRows, n=$nRows")*/) + .flatMap { + case e: Either[Cause[E1], ZStream.NRows] @unchecked => + e match { + case Right(nr) => + if (seenRows == nr.n) + ZChannel.unit + else { + nRows = nr.n + reader0 + } + case Left(c) => + ZChannel.refailCause(c) + } + case a2: A2 @unchecked => + seenRows += 1 + if (nRows == seenRows) + ZChannel.write(Chunk.single(a2)) + else + ZChannel.write(Chunk.single(a2)) *> reader0 } + reader0 + } + val s0: ZStream[R1, E1, A2] = ZStream .scoped[R1](enqueuer) .flatMap { enqFib => @@ -6186,4 +6214,6 @@ object ZStream extends ZStreamPlatformSpecificConstructors { (cl.take(cr.size).zipWith(cr)(f), Left(cl.drop(cr.size))) else (cl.zipWith(cr.take(cl.size))(f), Right(cr.drop(cl.size))) + + private case class NRows(n : Int) } From 560ebcd8a8d8349ce33d609625438f2c7e0ab8c0 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 3 May 2024 18:07:27 +0300 Subject: [PATCH 09/29] strm_mapZioPar_opt: strm.mapZIOParUnordered, ditch counting, rely on withPermits(n) and a simple Left(()) message to signal EOF --- .../src/main/scala/zio/stream/ZStream.scala | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 292a9b44ab0..73f573c8e77 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -2016,7 +2016,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, f: A => ZIO[R1, E1, A2] )(implicit trace: Trace): ZStream[R1, E1, A2] = { val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { - q <- zio.Queue.bounded/*[zio.Exit[Option[E1], A2]]*/[/*zio.Exit[Either[E1, Int], A2]*/ Any](bufferSize) + q <- zio.Queue.bounded[Any](bufferSize) permits <- zio.Semaphore.make(n) } yield { def enqueue(a : A): ZIO[R1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => @@ -2039,16 +2039,14 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, } yield () } - val countingForeach: ZChannel[R1, E, Chunk[A], Any, E1, Nothing, ZStream.NRows] = ZChannel.suspend{ - var nRows = 0 - lazy val proc : ZChannel[R1, E, Chunk[A], Any, E1, Nothing, ZStream.NRows] = + val foreachCh: ZChannel[R1, E, Chunk[A], Any, E1, Nothing, Any] = { + lazy val proc : ZChannel[R1, E, Chunk[A], Any, E1, Nothing, Any] = ZChannel.readWithCause( in => { - nRows += in.size ZChannel.fromZIO(ZIO.foreachDiscard(in)(enqueue)) *> proc }, ZChannel.refailCause(_), - _ => ZChannel.succeedNow(ZStream.NRows(nRows)) + _ => ZChannel.unit ) proc @@ -2056,45 +2054,36 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, val enqueuer = self .toChannel - .pipeTo(countingForeach) + .pipeTo(foreachCh) .runScoped .foldCauseZIO ( + //this message terminates processing so it's ok for it to race with in-flight computations c => { q.offer(Left(c)) }, - nRows => { - /*permits - .withPermits(n)*/ { - q.offer(Right(nRows)) + _ => { + //make sure this is the last message in the queue + permits + .withPermits(n) { + q.offer(Right(())) } } ) .forkScoped - val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = ZChannel.suspend { - var seenRows = 0 - var nRows = -1 + val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = { lazy val reader0 : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = ZChannel .fromZIO(q.take/*.debug(s"reader(seen=$seenRows, n=$nRows")*/) .flatMap { case e: Either[Cause[E1], ZStream.NRows] @unchecked => e match { - case Right(nr) => - if (seenRows == nr.n) - ZChannel.unit - else { - nRows = nr.n - reader0 - } + case Right(_) => + ZChannel.unit case Left(c) => ZChannel.refailCause(c) } case a2: A2 @unchecked => - seenRows += 1 - if (nRows == seenRows) - ZChannel.write(Chunk.single(a2)) - else - ZChannel.write(Chunk.single(a2)) *> reader0 + ZChannel.write(Chunk.single(a2)) *> reader0 } reader0 From b9eb3ac7cc98c47b8c938bab351f664ddc6e4d47 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 3 May 2024 22:23:36 +0300 Subject: [PATCH 10/29] strm_mapZioPar_opt: make sure strm.mapZIOParUnordered doesn't break when A2 is an Either. --- .../src/main/scala/zio/stream/ZStream.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 73f573c8e77..0283489dd45 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -21,7 +21,7 @@ import zio.internal.{PartitionedRingBuffer, SingleThreadedRingBuffer, UniqueKey} import zio.metrics.MetricLabel import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stm._ -import zio.stream.ZStream.{DebounceState, HandoffSignal, failCause, zipChunks} +import zio.stream.ZStream.{DebounceState, HandoffSignal, QRes, failCause, zipChunks} import zio.stream.internal.{ZInputStream, ZReader} import java.io.{IOException, InputStream} @@ -2026,7 +2026,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, z1 = f(a) offer = //z1.flatMap(q.offer(_)) z1.foldCauseZIO( - c => q.offer(Left(c)), + c => q.offer(QRes.failCause(c)), a2 => q.offer(a2) ) fib <- { @@ -2059,13 +2059,13 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, .foldCauseZIO ( //this message terminates processing so it's ok for it to race with in-flight computations c => { - q.offer(Left(c)) + q.offer(QRes.failCause(c)) }, _ => { //make sure this is the last message in the queue permits .withPermits(n) { - q.offer(Right(())) + q.offer(QRes.unit) } } ) @@ -2075,11 +2075,11 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, lazy val reader0 : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = ZChannel .fromZIO(q.take/*.debug(s"reader(seen=$seenRows, n=$nRows")*/) .flatMap { - case e: Either[Cause[E1], ZStream.NRows] @unchecked => - e match { - case Right(_) => + case QRes(v) => + v match { + case () => ZChannel.unit - case Left(c) => + case c : Cause[E1] @unchecked => ZChannel.refailCause(c) } case a2: A2 @unchecked => @@ -6204,5 +6204,10 @@ object ZStream extends ZStreamPlatformSpecificConstructors { else (cl.zipWith(cr.take(cl.size))(f), Right(cr.drop(cl.size))) - private case class NRows(n : Int) + private case class QRes[A](value : A) + + private object QRes { + val unit: QRes[Unit] = QRes(()) + def failCause[E](c : Cause[E]): QRes[Cause[E]] = QRes(c) + } } From 6ad59d8fb10d4e7e1de0688279f2a51d550d8b0c Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 5 May 2024 15:18:15 +0300 Subject: [PATCH 11/29] strm_mapZioPar_opt__fiberChildren: strm.mapZIOPar, use queue of fibers (drop the Take wrappers). --- .../src/main/scala/zio/stream/ZStream.scala | 78 +++++++++++++------ 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 0283489dd45..698a77e0822 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1938,49 +1938,77 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, trace: Trace ): ZStream[R1, E1, A2] = { val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { - q <- zio.Queue.bounded[zio.stream.Take[E1, Fiber[E1, A2]]](bufferSize) + q <- zio.Queue.bounded[Fiber[Option[E1], A2]](bufferSize) permits <- zio.Semaphore.make(n) failureSignal <- zio.Promise.make[E1, Nothing] //completionSignal <- zio.Promise.make[Nothing, Any] } yield { - def forkF(a : A): ZIO[R1, Nothing, Fiber.Runtime[E1, A2]] = ZIO.uninterruptibleMask { restore => + def forkF(a : A): ZIO[R1, Nothing, Fiber.Runtime[Option[E1], A2]] = ZIO.uninterruptibleMask { restore => for { localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) fib <- restore(f(a)) .foldCauseZIO( - c => failureSignal.failCause(c) *> localScope.close(Exit.failCause(c)) *> ZIO.refailCause(c), + c => failureSignal.failCause(c) *> localScope.close(Exit.failCause(c)) *> ZIO.refailCause(c.map(Some(_))), u => localScope.close(Exit.succeed(u)) as u ) .fork } yield fib } - val enqueuer: ZIO[R1 with Scope, Nothing, Fiber.Runtime[Nothing, Unit]] = ZIO - .transplant { grafter => - self - .mapZIO(a => grafter(forkF(a))) - .runIntoQueue(q) - .forkScoped - } + lazy val enqueueCh : ZChannel[R1, E, Chunk[A], Any, Nothing, Nothing, Any] = ZChannel + .readWithCause( + in => ZChannel.fromZIO(ZIO.foreachDiscard(in)(a => forkF(a).flatMap(q.offer(_)))/*.debug(s"enqueueCh.in(${in.size})")*/) *> enqueueCh, + err => ZChannel.fromZIO(failureSignal.failCause(err) *> q.offer(zio.Fiber.failCause(err.map(Some(_))))/*.debug(s"enqueueCh.err($err)")*/), + done => ZChannel.fromZIO(permits.withPermits(n)(q.offer(zio.Fiber.fail(None)))/*.debug("enqueueCh.done")*/) + ) + val enqueuer: ZIO[R1 with Scope, Nothing, Fiber.Runtime[Nothing, Any]] = self + .toChannel + .pipeTo(enqueueCh.unit) + .runScoped + .forkScoped + + lazy val readerCh : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = + ZChannel + .unwrap { + val z: ZIO[Any, Nothing, ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any]] = q + .take + .flatMap(_.join) + .raceWith(failureSignal.await)( + (leftEx, rightFib) => { + leftEx.foldCauseZIO( + c => Cause + .flipCauseOption(c) + .map{ c2 => + ZIO.succeed(ZChannel.fromZIO(rightFib.join)) + } + .getOrElse{ + rightFib.interrupt.as(ZChannel.unit) + }, + a2 => rightFib.interrupt.as{ + ZChannel.write(Chunk.single(a2)) *> readerCh + } + ) + }, + (rightEx, leftFib) => { + leftFib + .interrupt + .as{ + rightEx.foldExit( + ZChannel.refailCause(_), + //how on earth? + nothing => ZChannel.failCause(Cause.die(new RuntimeException(s"no idea how we got '$nothing' through the failureSignal"))) + ) + } + } + ) + + z + } ZStream .scoped[R1](enqueuer) .flatMap { fib => - ZStream - .fromQueue(q) - .flattenTake - .flatMap { f => - ZStream.unwrap { - (f.join.exit race failureSignal.await.exit).map { ex => - ex.foldExit( - { cause => - ZStream.fromZIO(failureSignal.await) - }, - ZStream.succeed(_) - ) - } - } - } + readerCh.toStream } } From 7ad415ba8d2894e7e13b0d2b3fe51f60ed4fdb59 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 5 May 2024 20:33:32 +0300 Subject: [PATCH 12/29] strm_mapZioPar_opt__fiberChildren: fix issues in mapZIOPar --- .../src/main/scala/zio/stream/ZStream.scala | 60 +++++++++---------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 698a77e0822..720d9e665da 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1947,12 +1947,27 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, for { localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - fib <- restore(f(a)) - .foldCauseZIO( - c => failureSignal.failCause(c) *> localScope.close(Exit.failCause(c)) *> ZIO.refailCause(c.map(Some(_))), - u => localScope.close(Exit.succeed(u)) as u - ) - .fork + fib <- restore { + f(a) //.debug("forkF.f(a)") + .foldCauseZIO( + c => failureSignal.failCause(c) *> ZIO.refailCause(c.map(Some(_))), + u => ZIO.succeed(u) + ) + //.debug("forkF.folded") + .raceWith(failureSignal.await.mapError(Some(_)))( + { + case (leftEx, rightFib) => + rightFib.interrupt *> leftEx + }, + { + case (rightEx, leftFib) => + leftFib.interrupt *> rightEx + } + ) + //.debug("forkF.race") + } + .onExit(localScope.close(_)) + .fork } yield fib } lazy val enqueueCh : ZChannel[R1, E, Chunk[A], Any, Nothing, Nothing, Any] = ZChannel @@ -1973,33 +1988,12 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, val z: ZIO[Any, Nothing, ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any]] = q .take .flatMap(_.join) - .raceWith(failureSignal.await)( - (leftEx, rightFib) => { - leftEx.foldCauseZIO( - c => Cause - .flipCauseOption(c) - .map{ c2 => - ZIO.succeed(ZChannel.fromZIO(rightFib.join)) - } - .getOrElse{ - rightFib.interrupt.as(ZChannel.unit) - }, - a2 => rightFib.interrupt.as{ - ZChannel.write(Chunk.single(a2)) *> readerCh - } - ) - }, - (rightEx, leftFib) => { - leftFib - .interrupt - .as{ - rightEx.foldExit( - ZChannel.refailCause(_), - //how on earth? - nothing => ZChannel.failCause(Cause.die(new RuntimeException(s"no idea how we got '$nothing' through the failureSignal"))) - ) - } - } + .foldCause( + c => Cause.flipCauseOption(c) + .map(ZChannel.refailCause(_)) + .getOrElse(ZChannel.unit), + a2 => + ZChannel.write(Chunk.single(a2)) *> readerCh ) z From fe66a4d08fa6b83211bf33a698bb01bb052ab653 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 5 May 2024 23:17:20 +0300 Subject: [PATCH 13/29] strm_mapZioPar_opt__chhannel: introduce a channel.mapZIOPar implementation, this falls 8-9% short of the direct stream.mapZIOPar impl. --- .../src/main/scala/zio/stream/ZChannel.scala | 83 +++++++++++++++++++ .../src/main/scala/zio/stream/ZStream.scala | 10 +++ 2 files changed, 93 insertions(+) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index a86bca2b56c..a21a7d2b3f2 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -681,6 +681,89 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon } } + final def mapOutZIOPar2[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int)( + f: OutElem => ZIO[Env1, OutErr1, OutElem2] + )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = { + val z: ZIO[Any, Nothing, ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone]] = for { + input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] + queueReader = ZChannel.fromInput(input) + queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](n) + permits <- zio.Semaphore.make(n) + failureSignal <- Promise.make[Option[OutErr1], Nothing] + outDoneSignal <- Promise.make[Nothing, OutDone] + } yield { + def forkF(a : OutElem): ZIO[Env1, Nothing, Fiber.Runtime[Option[OutErr1], OutElem2]] = ZIO.uninterruptibleMask { restore => + for { + localScope <- zio.Scope.make + _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) + fib <- restore { + f(a) //.debug("forkF.f(a)") + .foldCauseZIO( + c => failureSignal.failCause(c.map(Some(_))) *> ZIO.refailCause(c.map(Some(_))), + u => ZIO.succeed(u) + ) + //.debug("forkF.folded") + .raceWith(failureSignal.await)( + { + case (leftEx, rightFib) => + rightFib.interrupt *> leftEx + }, + { + case (rightEx, leftFib) => + leftFib.interrupt *> rightEx + } + ) + //.debug("forkF.race") + } + .onExit(localScope.close(_)) + .fork + } yield fib + } + + lazy val enqueueCh : ZChannel[Env1, OutErr, OutElem, OutDone, Nothing, Nothing, Any] = ZChannel + .readWithCause( + in => ZChannel.fromZIO(forkF(in).flatMap(queue.offer(_))) *> enqueueCh, + err => ZChannel.fromZIO(failureSignal.failCause(err.map(Some(_))) *> queue.offer(Fiber.failCause(err.map(Some(_))))), + done => ZChannel.fromZIO(outDoneSignal.succeed(done) *> queue.offer(Fiber.fail(None))) + ) + + val enqueuer: ZIO[Env1 with Scope, Nothing, Fiber.Runtime[Nothing, Any]] = queueReader + .pipeTo(self) + .pipeTo(enqueueCh) + .runScoped + .forkScoped + + lazy val readerCh : ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = + ZChannel + .unwrap{ + val z0: URIO[Any, ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone]] = queue + .take + .flatMap(_.join) + .foldCause( + c => Cause.flipCauseOption(c) + .map(ZChannel.refailCause(_)) + .getOrElse(ZChannel.fromZIO(outDoneSignal.await)), + out2 => + ZChannel.write(out2) *> readerCh + ) + z0 + } + + val resCh: ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = ZChannel + .scoped[Env1](enqueuer) + .concatMapWith { enqueueFib => + readerCh + } ( (_, o) => o, + (o, _) => o + ) + .embedInput(input) + + resCh + } + + ZChannel.unwrap(z) + } + /** * Returns a new channel which creates a new channel for each emitted element * and merges some of them together. Different merge strategies control what diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 720d9e665da..47fd8eb784e 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -2010,6 +2010,16 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, .unwrap(z0) } + def mapZIOPar3[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit + trace: Trace + ): ZStream[R1, E1, A2] = { + self + .toChannel + .concatMap(ZChannel.writeChunk(_)) + .mapOutZIOPar2[R1, E1, Chunk[A2]](n)(a => f(a).map(Chunk.single(_))) + .toStream + } + /** * Maps over elements of the stream with the specified effectful function, * partitioned by `p` executing invocations of `f` concurrently. The number of From 1e2bfd8e2d846c40b7d358024fb5fbee2b4180ca Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 6 May 2024 16:34:58 +0300 Subject: [PATCH 14/29] strm_mapZioPar_opt__chhannel: duplicate the impls to the channel level --- .../src/main/scala/zio/stream/ZChannel.scala | 161 ++++++++++++------ .../src/main/scala/zio/stream/ZStream.scala | 33 ++-- 2 files changed, 130 insertions(+), 64 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index a21a7d2b3f2..7d9b54d51a7 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -4,6 +4,7 @@ import zio.{ZIO, _} import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.internal.{AsyncInputConsumer, AsyncInputProducer, ChannelExecutor, SingleProducerAsyncInput} import ChannelExecutor.ChannelState +import zio.stream.ZChannel.QRes /** * A `ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]` is a nexus @@ -630,64 +631,15 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon final def mapOutZIOPar[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int)( f: OutElem => ZIO[Env1, OutErr1, OutElem2] )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = - ZChannel.unwrapScopedWith { scope => - for { - input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] - queueReader = ZChannel.fromInput(input) - queue <- Queue.bounded[ZIO[Env1, OutErr1, Either[OutDone, OutElem2]]](n) - _ <- scope.addFinalizer(queue.shutdown) - errorSignal <- Promise.make[OutErr1, Nothing] - permits <- Semaphore.make(n.toLong) - pull <- (queueReader >>> self).toPullIn(scope) - _ <- pull - .foldCauseZIO( - cause => queue.offer(ZIO.refailCause(cause)), - { - case Left(outDone) => - permits.withPermits(n.toLong)(ZIO.unit).interruptible *> queue.offer(ZIO.succeed(Left(outDone))) - case Right(outElem) => - for { - p <- Promise.make[OutErr1, OutElem2] - latch <- Promise.make[Nothing, Unit] - _ <- queue.offer(p.await.map(Right(_))) - _ <- permits.withPermit { - latch.succeed(()) *> - ZIO.uninterruptibleMask { restore => - restore(errorSignal.await) raceFirstAwait restore(f(outElem)) - } - .tapErrorCause(errorSignal.failCause) - .intoPromise(p) - }.forkIn(scope) - _ <- latch.await - } yield () - } - ) - .forever - .interruptible - .forkIn(scope) - } yield { - lazy val consumer: ZChannel[Env1, Any, Any, Any, OutErr1, OutElem2, OutDone] = - ZChannel.unwrap[Env1, Any, Any, Any, OutErr1, OutElem2, OutDone] { - queue.take.flatten.foldCause( - ZChannel.refailCause, - { - case Left(outDone) => ZChannel.succeedNow(outDone) - case Right(outElem) => ZChannel.write(outElem) *> consumer - } - ) - } + mapOutZIOPar[Env1, OutErr1, OutElem2](n, n)(f) - consumer.embedInput(input) - } - } - - final def mapOutZIOPar2[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int)( + final def mapOutZIOPar[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize : Int)( f: OutElem => ZIO[Env1, OutErr1, OutElem2] )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = { val z: ZIO[Any, Nothing, ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone]] = for { input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] queueReader = ZChannel.fromInput(input) - queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](n) + queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](bufferSize) permits <- zio.Semaphore.make(n) failureSignal <- Promise.make[Option[OutErr1], Nothing] outDoneSignal <- Promise.make[Nothing, OutDone] @@ -764,6 +716,104 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon ZChannel.unwrap(z) } + final def mapOutZIOParUnordered[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize : Int)( + f: OutElem => ZIO[Env1, OutErr1, OutElem2] + )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = { + val z0: ZIO[Any, Nothing, ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone]] = for { + input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] + queueReader = ZChannel.fromInput(input) + q <- zio.Queue.bounded[Any](bufferSize) + permits <- zio.Semaphore.make(n) + } yield { + def enqueue(a : OutElem): ZIO[Env1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => + for { + localScope <- zio.Scope.make + _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) + z1 = f(a) + offer = z1 + .foldCauseZIO( + c => q.offer(QRes.failCause(c)), + a2 => q.offer(a2) + ) + fib <- { + restore(offer) + .onExit(localScope.close(_)) + .unit + .fork + } + } yield () + } + + val foreachCh: ZChannel[Env1, OutErr, OutElem, OutDone, OutErr1, Nothing, OutDone] = { + lazy val proc : ZChannel[Env1, OutErr, OutElem, OutDone, OutErr1, Nothing, OutDone] = + ZChannel.readWithCause( + in => { + ZChannel.fromZIO(enqueue(in)) *> proc + }, + ZChannel.refailCause(_), + ZChannel.succeedNow(_) + ) + + proc + } + + val enqueuer = queueReader + .pipeTo(self) + .pipeTo(foreachCh) + .runScoped + .foldCauseZIO ( + //this message terminates processing so it's ok for it to race with in-flight computations + c => { + q.offer(QRes.failCause(c)) + }, + done => { + //make sure this is the last message in the queue + permits + .withPermits(n) { + q.offer(QRes(done)) + } + } + ) + .forkScoped + + val reader : ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = { + lazy val reader0 : ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = ZChannel + .fromZIO(q.take) + .flatMap { + case QRes(v) => + v match { + case c : Cause[OutErr1] @unchecked => + ZChannel.refailCause(c) + case done : OutDone @unchecked => + ZChannel.succeedNow(done) + } + case a2: OutElem2 @unchecked => + ZChannel.write(a2) *> reader0 + } + + reader0 + } + + val res0 = ZChannel + .scoped[Env1](enqueuer) + .concatMapWith{ fib => + reader + } ( + { + case (_, done) => done + }, + { + case (done, _) => done + } + ) + .embedInput(input) + + res0 + } + + ZChannel.unwrap(z0) + } + /** * Returns a new channel which creates a new channel for each emitted element * and merges some of them together. Different merge strategies control what @@ -2214,4 +2264,11 @@ object ZChannel { ): ZChannel[Env1, InErr, InElem, InDone, OutErr, OutElem, OutDone] = self.provideSomeEnvironment(_.updateAt(key)(f)) } + + private case class QRes[A](value : A) + + private object QRes { + val unit: QRes[Unit] = QRes(()) + def failCause[E](c : Cause[E]): QRes[Cause[E]] = QRes(c) + } } diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 47fd8eb784e..03ef5e4c370 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1932,7 +1932,17 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, trace: Trace ): ZStream[R1, E1, A2] = /*self >>> ZPipeline.mapZIOPar(n)(f)*/ - this.mapZIOPar2[R1, E1, A2](n)(f) + this.mapZIOPar[R1, E1, A2](n, n)(f) + + def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int, bufferSize : Int)(f: A => ZIO[R1, E1, A2])(implicit + trace: Trace + ): ZStream[R1, E1, A2] = { + self + .toChannel + .concatMap(ZChannel.writeChunk(_)) + .mapOutZIOPar[R1, E1, Chunk[A2]](n, bufferSize)(a => f(a).map(Chunk.single(_))) + .toStream + } def mapZIOPar2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace @@ -2010,16 +2020,6 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, .unwrap(z0) } - def mapZIOPar3[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit - trace: Trace - ): ZStream[R1, E1, A2] = { - self - .toChannel - .concatMap(ZChannel.writeChunk(_)) - .mapOutZIOPar2[R1, E1, Chunk[A2]](n)(a => f(a).map(Chunk.single(_))) - .toStream - } - /** * Maps over elements of the stream with the specified effectful function, * partitioned by `p` executing invocations of `f` concurrently. The number of @@ -2042,7 +2042,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOParUnordered[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - /*self >>> ZPipeline.mapZIOParUnordered(n)(f)*/ this.mapZIOParUnordered2[R1, E1, A2](n)(f) + /*self >>> ZPipeline.mapZIOParUnordered(n)(f)*/ this.mapZIOParUnordered3[R1, E1, A2](n)(f) def mapZIOParUnordered2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)( f: A => ZIO[R1, E1, A2] @@ -2133,6 +2133,15 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, ZStream.unwrap(z0) } + def mapZIOParUnordered3[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)( + f: A => ZIO[R1, E1, A2] + )(implicit trace: Trace): ZStream[R1, E1, A2] = + self + .toChannel + .concatMap(ZChannel.writeChunk(_)) + .mapOutZIOParUnordered[R1, E1, Chunk[A2]](n, bufferSize)(a => f(a).map(Chunk.single(_))) + .toStream + /** * Merges this stream and the specified stream together. * From 81189840d615d170ef8c55cc49f4b79cd70d8be8 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 6 May 2024 17:04:41 +0300 Subject: [PATCH 15/29] strm_mapZioPar_opt__chhannel: add stream+pl methods for mapZIOPar and mapZIOParUnordered with bufferSize parameter --- .../src/main/scala/zio/stream/ZPipeline.scala | 43 +++-- .../src/main/scala/zio/stream/ZStream.scala | 182 +----------------- 2 files changed, 32 insertions(+), 193 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index fd2763bfb38..a4e09fcf55f 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -496,6 +496,11 @@ final class ZPipeline[-Env, +Err, -In, +Out] private ( ): ZPipeline[Env2, Err2, In, Out2] = self >>> ZPipeline.mapZIOPar(n)(f) + def mapZIOPar[Env2 <: Env, Err2 >: Err, Out2](n: => Int, bufferSize : => Int)(f: Out => ZIO[Env2, Err2, Out2])(implicit + trace: Trace + ): ZPipeline[Env2, Err2, In, Out2] = + self >>> ZPipeline.mapZIOPar(n, bufferSize)(f) + /** * Maps over elements of the stream with the specified effectful function, * executing up to `n` invocations of `f` concurrently. The element order is @@ -506,6 +511,11 @@ final class ZPipeline[-Env, +Err, -In, +Out] private ( ): ZPipeline[Env2, Err2, In, Out2] = self >>> ZPipeline.mapZIOParUnordered(n)(f) + def mapZIOParUnordered[Env2 <: Env, Err2 >: Err, Out2](n: => Int, bufferSize : => Int)(f: Out => ZIO[Env2, Err2, Out2])(implicit + trace: Trace + ): ZPipeline[Env2, Err2, In, Out2] = + self >>> ZPipeline.mapZIOParUnordered(n, bufferSize)(f) + /** * Transforms the errors emitted by this pipeline using `f`. */ @@ -1801,15 +1811,19 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { ): ZPipeline[Env, Err, In, Out] = { ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => strm - /*.toChannel - .concatMap(ZChannel.writeChunk(_)) - .mapOutZIOPar(n)(f) - .mapOut(Chunk.single) - .toStream*/ .mapZIOPar(n)(f) } } + def mapZIOPar[Env, Err, In, Out](n: => Int, bufferSize : => Int)(f: In => ZIO[Env, Err, Out])(implicit + trace: Trace + ): ZPipeline[Env, Err, In, Out] = { + ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + strm + .mapZIOPar(n, bufferSize)(f) + } + } + /** * Maps over elements of the stream with the specified effectful function, * executing up to `n` invocations of `f` concurrently. The element order is @@ -1819,19 +1833,16 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { trace: Trace ): ZPipeline[Env, Err, In, Out] = { ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => - /*strm - .toChannel - .concatMap(ZChannel.writeChunk(_)) - .mergeMap(n, 16)(in => ZStream.fromZIO(f(in)).channel) - .toStream*/ strm.mapZIOParUnordered(n)(f) } - /*new ZPipeline( - ZChannel - .identity[Nothing, Chunk[In], Any] - .concatMap(ZChannel.writeChunk(_)) - .mergeMap(n, 16)(in => ZStream.fromZIO(f(in)).channel) - )*/ + } + + def mapZIOParUnordered[Env, Err, In, Out](n: => Int, bufferSize : => Int)(f: In => ZIO[Env, Err, Out])(implicit + trace: Trace + ): ZPipeline[Env, Err, In, Out] = { + ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + strm.mapZIOParUnordered(n, bufferSize)(f) + } } /** diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 03ef5e4c370..ea878e43490 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -21,7 +21,7 @@ import zio.internal.{PartitionedRingBuffer, SingleThreadedRingBuffer, UniqueKey} import zio.metrics.MetricLabel import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stm._ -import zio.stream.ZStream.{DebounceState, HandoffSignal, QRes, failCause, zipChunks} +import zio.stream.ZStream.{DebounceState, HandoffSignal, failCause, zipChunks} import zio.stream.internal.{ZInputStream, ZReader} import java.io.{IOException, InputStream} @@ -1944,82 +1944,6 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, .toStream } - def mapZIOPar2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)(f: A => ZIO[R1, E1, A2])(implicit - trace: Trace - ): ZStream[R1, E1, A2] = { - val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { - q <- zio.Queue.bounded[Fiber[Option[E1], A2]](bufferSize) - permits <- zio.Semaphore.make(n) - failureSignal <- zio.Promise.make[E1, Nothing] - //completionSignal <- zio.Promise.make[Nothing, Any] - } yield { - def forkF(a : A): ZIO[R1, Nothing, Fiber.Runtime[Option[E1], A2]] = ZIO.uninterruptibleMask { restore => - for { - localScope <- zio.Scope.make - _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - fib <- restore { - f(a) //.debug("forkF.f(a)") - .foldCauseZIO( - c => failureSignal.failCause(c) *> ZIO.refailCause(c.map(Some(_))), - u => ZIO.succeed(u) - ) - //.debug("forkF.folded") - .raceWith(failureSignal.await.mapError(Some(_)))( - { - case (leftEx, rightFib) => - rightFib.interrupt *> leftEx - }, - { - case (rightEx, leftFib) => - leftFib.interrupt *> rightEx - } - ) - //.debug("forkF.race") - } - .onExit(localScope.close(_)) - .fork - } yield fib - } - lazy val enqueueCh : ZChannel[R1, E, Chunk[A], Any, Nothing, Nothing, Any] = ZChannel - .readWithCause( - in => ZChannel.fromZIO(ZIO.foreachDiscard(in)(a => forkF(a).flatMap(q.offer(_)))/*.debug(s"enqueueCh.in(${in.size})")*/) *> enqueueCh, - err => ZChannel.fromZIO(failureSignal.failCause(err) *> q.offer(zio.Fiber.failCause(err.map(Some(_))))/*.debug(s"enqueueCh.err($err)")*/), - done => ZChannel.fromZIO(permits.withPermits(n)(q.offer(zio.Fiber.fail(None)))/*.debug("enqueueCh.done")*/) - ) - val enqueuer: ZIO[R1 with Scope, Nothing, Fiber.Runtime[Nothing, Any]] = self - .toChannel - .pipeTo(enqueueCh.unit) - .runScoped - .forkScoped - - lazy val readerCh : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = - ZChannel - .unwrap { - val z: ZIO[Any, Nothing, ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any]] = q - .take - .flatMap(_.join) - .foldCause( - c => Cause.flipCauseOption(c) - .map(ZChannel.refailCause(_)) - .getOrElse(ZChannel.unit), - a2 => - ZChannel.write(Chunk.single(a2)) *> readerCh - ) - - z - } - - ZStream - .scoped[R1](enqueuer) - .flatMap { fib => - readerCh.toStream - } - } - - ZStream - .unwrap(z0) - } - /** * Maps over elements of the stream with the specified effectful function, * partitioned by `p` executing invocations of `f` concurrently. The number of @@ -2042,100 +1966,11 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOParUnordered[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - /*self >>> ZPipeline.mapZIOParUnordered(n)(f)*/ this.mapZIOParUnordered3[R1, E1, A2](n)(f) - - def mapZIOParUnordered2[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)( - f: A => ZIO[R1, E1, A2] - )(implicit trace: Trace): ZStream[R1, E1, A2] = { - val z0: ZIO[Any, Nothing, ZStream[R1, E1, A2]] = for { - q <- zio.Queue.bounded[Any](bufferSize) - permits <- zio.Semaphore.make(n) - } yield { - def enqueue(a : A): ZIO[R1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => - for { - localScope <- zio.Scope.make - _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - z1 = f(a) - offer = //z1.flatMap(q.offer(_)) - z1.foldCauseZIO( - c => q.offer(QRes.failCause(c)), - a2 => q.offer(a2) - ) - fib <- { - offer - .interruptible - .onExit(localScope.close(_)) - .unit - .fork - } - } yield () - } - - val foreachCh: ZChannel[R1, E, Chunk[A], Any, E1, Nothing, Any] = { - lazy val proc : ZChannel[R1, E, Chunk[A], Any, E1, Nothing, Any] = - ZChannel.readWithCause( - in => { - ZChannel.fromZIO(ZIO.foreachDiscard(in)(enqueue)) *> proc - }, - ZChannel.refailCause(_), - _ => ZChannel.unit - ) - - proc - } - - val enqueuer = self - .toChannel - .pipeTo(foreachCh) - .runScoped - .foldCauseZIO ( - //this message terminates processing so it's ok for it to race with in-flight computations - c => { - q.offer(QRes.failCause(c)) - }, - _ => { - //make sure this is the last message in the queue - permits - .withPermits(n) { - q.offer(QRes.unit) - } - } - ) - .forkScoped - - val reader : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = { - lazy val reader0 : ZChannel[Any, Any, Any, Any, E1, Chunk[A2], Any] = ZChannel - .fromZIO(q.take/*.debug(s"reader(seen=$seenRows, n=$nRows")*/) - .flatMap { - case QRes(v) => - v match { - case () => - ZChannel.unit - case c : Cause[E1] @unchecked => - ZChannel.refailCause(c) - } - case a2: A2 @unchecked => - ZChannel.write(Chunk.single(a2)) *> reader0 - } - - reader0 - } - - val s0: ZStream[R1, E1, A2] = ZStream - .scoped[R1](enqueuer) - .flatMap { enqFib => - reader.toStream - } - - s0 - } + mapZIOParUnordered[R1, E1, A2](n, 16)(f) - ZStream.unwrap(z0) - } - - def mapZIOParUnordered3[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int = 16)( - f: A => ZIO[R1, E1, A2] - )(implicit trace: Trace): ZStream[R1, E1, A2] = + def mapZIOParUnordered[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int)(f: A => ZIO[R1, E1, A2])(implicit + trace: Trace + ): ZStream[R1, E1, A2] = self .toChannel .concatMap(ZChannel.writeChunk(_)) @@ -6244,11 +6079,4 @@ object ZStream extends ZStreamPlatformSpecificConstructors { (cl.take(cr.size).zipWith(cr)(f), Left(cl.drop(cr.size))) else (cl.zipWith(cr.take(cl.size))(f), Right(cr.drop(cl.size))) - - private case class QRes[A](value : A) - - private object QRes { - val unit: QRes[Unit] = QRes(()) - def failCause[E](c : Cause[E]): QRes[Cause[E]] = QRes(c) - } } From 5bbfe03ecf57400041ed2a7f086de2e2155cc3c5 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 6 May 2024 17:17:34 +0300 Subject: [PATCH 16/29] strm_mapZioPar_opt: fmt --- .../scala/zio/internal/FiberRuntime.scala | 40 +++-- .../src/main/scala/zio/stream/ZChannel.scala | 151 +++++++++--------- .../src/main/scala/zio/stream/ZPipeline.scala | 38 +++-- .../src/main/scala/zio/stream/ZStream.scala | 15 +- 4 files changed, 117 insertions(+), 127 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 4569e5e9acc..761c2618385 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -86,10 +86,11 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, ) } - private def childrenChunk = if(_children eq null) Chunk.empty else { + private def childrenChunk = if (_children eq null) Chunk.empty + else { val bldr = Chunk.newBuilder[Fiber.Runtime[_, _]] _children.forEach { child => - if((child ne null) && child.isAlive()) + if ((child ne null) && child.isAlive()) bldr.addOne(child) } bldr.result() @@ -104,26 +105,23 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, Chunk.fromJavaIterable(childs) } } - ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { - case (fib, _) => - if (fib eq self) - ZIO.succeed(self.childrenChunk) - else { - ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k => - ZIO.succeed { - this.tell { - FiberMessage.Stateful { - case (fib, _) => - k(ZIO.succeed(fib.childrenChunk)) - } + ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { case (fib, _) => + if (fib eq self) + ZIO.succeed(self.childrenChunk) + else { + ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k => + ZIO.succeed { + this.tell { + FiberMessage.Stateful { case (fib, _) => + k(ZIO.succeed(fib.childrenChunk)) } } } } + } } } - def fiberRefs(implicit trace: Trace): UIO[FiberRefs] = ZIO.succeed(_fiberRefs) def id: FiberId.Runtime = fiberId @@ -520,7 +518,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, */ private[zio] def getChildren(): JavaSet[Fiber.Runtime[_, _]] = { if (_children eq null) { - _children = Platform./*newConcurrentWeakSet*/newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) + _children = Platform. /*newConcurrentWeakSet*/ newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) } _children } @@ -688,17 +686,16 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, def skip() = { curr = null - while(iterator.hasNext && (curr eq null)) { + while (iterator.hasNext && (curr eq null)) { curr = iterator.next() - if((curr ne null) && !curr.isAlive()) + if ((curr ne null) && !curr.isAlive()) curr = null } } skip() - - if(null ne curr) { + if (null ne curr) { val body = () => { val c = curr skip() @@ -708,8 +705,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, // Now await all children to finish: ZIO .whileLoop(null ne curr)(body())(_ => ())(id.location) - } - else null + } else null } else null private[zio] def isAlive(): Boolean = diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 7d9b54d51a7..bb48e2b3b25 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -633,49 +633,51 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = mapOutZIOPar[Env1, OutErr1, OutElem2](n, n)(f) - final def mapOutZIOPar[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize : Int)( + final def mapOutZIOPar[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize: Int)( f: OutElem => ZIO[Env1, OutErr1, OutElem2] )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = { val z: ZIO[Any, Nothing, ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone]] = for { - input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] - queueReader = ZChannel.fromInput(input) - queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](bufferSize) - permits <- zio.Semaphore.make(n) + input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] + queueReader = ZChannel.fromInput(input) + queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](bufferSize) + permits <- zio.Semaphore.make(n) failureSignal <- Promise.make[Option[OutErr1], Nothing] outDoneSignal <- Promise.make[Nothing, OutDone] } yield { - def forkF(a : OutElem): ZIO[Env1, Nothing, Fiber.Runtime[Option[OutErr1], OutElem2]] = ZIO.uninterruptibleMask { restore => - for { - localScope <- zio.Scope.make - _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - fib <- restore { - f(a) //.debug("forkF.f(a)") - .foldCauseZIO( - c => failureSignal.failCause(c.map(Some(_))) *> ZIO.refailCause(c.map(Some(_))), - u => ZIO.succeed(u) - ) - //.debug("forkF.folded") - .raceWith(failureSignal.await)( - { - case (leftEx, rightFib) => - rightFib.interrupt *> leftEx - }, - { - case (rightEx, leftFib) => - leftFib.interrupt *> rightEx - } - ) - //.debug("forkF.race") - } - .onExit(localScope.close(_)) - .fork - } yield fib + def forkF(a: OutElem): ZIO[Env1, Nothing, Fiber.Runtime[Option[OutErr1], OutElem2]] = ZIO.uninterruptibleMask { + restore => + for { + localScope <- zio.Scope.make + _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) + fib <- restore { + f(a) //.debug("forkF.f(a)") + .foldCauseZIO( + c => failureSignal.failCause(c.map(Some(_))) *> ZIO.refailCause(c.map(Some(_))), + u => ZIO.succeed(u) + ) + //.debug("forkF.folded") + .raceWith(failureSignal.await)( + { case (leftEx, rightFib) => + rightFib.interrupt *> leftEx + }, + { case (rightEx, leftFib) => + leftFib.interrupt *> rightEx + } + ) + //.debug("forkF.race") + } + .onExit(localScope.close(_)) + .fork + } yield fib } - lazy val enqueueCh : ZChannel[Env1, OutErr, OutElem, OutDone, Nothing, Nothing, Any] = ZChannel + lazy val enqueueCh: ZChannel[Env1, OutErr, OutElem, OutDone, Nothing, Nothing, Any] = ZChannel .readWithCause( in => ZChannel.fromZIO(forkF(in).flatMap(queue.offer(_))) *> enqueueCh, - err => ZChannel.fromZIO(failureSignal.failCause(err.map(Some(_))) *> queue.offer(Fiber.failCause(err.map(Some(_))))), + err => + ZChannel.fromZIO( + failureSignal.failCause(err.map(Some(_))) *> queue.offer(Fiber.failCause(err.map(Some(_)))) + ), done => ZChannel.fromZIO(outDoneSignal.succeed(done) *> queue.offer(Fiber.fail(None))) ) @@ -685,29 +687,26 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon .runScoped .forkScoped - lazy val readerCh : ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = - ZChannel - .unwrap{ - val z0: URIO[Any, ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone]] = queue - .take - .flatMap(_.join) - .foldCause( - c => Cause.flipCauseOption(c) + lazy val readerCh: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = + ZChannel.unwrap { + val z0: URIO[Any, ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone]] = queue.take + .flatMap(_.join) + .foldCause( + c => + Cause + .flipCauseOption(c) .map(ZChannel.refailCause(_)) .getOrElse(ZChannel.fromZIO(outDoneSignal.await)), - out2 => - ZChannel.write(out2) *> readerCh - ) - z0 - } + out2 => ZChannel.write(out2) *> readerCh + ) + z0 + } val resCh: ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = ZChannel .scoped[Env1](enqueuer) .concatMapWith { enqueueFib => readerCh - } ( (_, o) => o, - (o, _) => o - ) + }((_, o) => o, (o, _) => o) .embedInput(input) resCh @@ -716,25 +715,25 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon ZChannel.unwrap(z) } - final def mapOutZIOParUnordered[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize : Int)( + final def mapOutZIOParUnordered[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize: Int)( f: OutElem => ZIO[Env1, OutErr1, OutElem2] )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = { val z0: ZIO[Any, Nothing, ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone]] = for { - input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] - queueReader = ZChannel.fromInput(input) - q <- zio.Queue.bounded[Any](bufferSize) - permits <- zio.Semaphore.make(n) + input <- SingleProducerAsyncInput.make[InErr, InElem, InDone] + queueReader = ZChannel.fromInput(input) + q <- zio.Queue.bounded[Any](bufferSize) + permits <- zio.Semaphore.make(n) } yield { - def enqueue(a : OutElem): ZIO[Env1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => + def enqueue(a: OutElem): ZIO[Env1, Nothing, Unit] = ZIO.uninterruptibleMask { restore => for { localScope <- zio.Scope.make - _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - z1 = f(a) + _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) + z1 = f(a) offer = z1 - .foldCauseZIO( - c => q.offer(QRes.failCause(c)), - a2 => q.offer(a2) - ) + .foldCauseZIO( + c => q.offer(QRes.failCause(c)), + a2 => q.offer(a2) + ) fib <- { restore(offer) .onExit(localScope.close(_)) @@ -745,7 +744,7 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon } val foreachCh: ZChannel[Env1, OutErr, OutElem, OutDone, OutErr1, Nothing, OutDone] = { - lazy val proc : ZChannel[Env1, OutErr, OutElem, OutDone, OutErr1, Nothing, OutDone] = + lazy val proc: ZChannel[Env1, OutErr, OutElem, OutDone, OutErr1, Nothing, OutDone] = ZChannel.readWithCause( in => { ZChannel.fromZIO(enqueue(in)) *> proc @@ -761,7 +760,7 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon .pipeTo(self) .pipeTo(foreachCh) .runScoped - .foldCauseZIO ( + .foldCauseZIO( //this message terminates processing so it's ok for it to race with in-flight computations c => { q.offer(QRes.failCause(c)) @@ -776,15 +775,15 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon ) .forkScoped - val reader : ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = { - lazy val reader0 : ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = ZChannel + val reader: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = { + lazy val reader0: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = ZChannel .fromZIO(q.take) .flatMap { case QRes(v) => v match { - case c : Cause[OutErr1] @unchecked => + case c: Cause[OutErr1] @unchecked => ZChannel.refailCause(c) - case done : OutDone @unchecked => + case done: OutDone @unchecked => ZChannel.succeedNow(done) } case a2: OutElem2 @unchecked => @@ -796,14 +795,14 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon val res0 = ZChannel .scoped[Env1](enqueuer) - .concatMapWith{ fib => + .concatMapWith { fib => reader - } ( - { - case (_, done) => done + }( + { case (_, done) => + done }, - { - case (done, _) => done + { case (done, _) => + done } ) .embedInput(input) @@ -2265,10 +2264,10 @@ object ZChannel { self.provideSomeEnvironment(_.updateAt(key)(f)) } - private case class QRes[A](value : A) + private case class QRes[A](value: A) private object QRes { - val unit: QRes[Unit] = QRes(()) - def failCause[E](c : Cause[E]): QRes[Cause[E]] = QRes(c) + val unit: QRes[Unit] = QRes(()) + def failCause[E](c: Cause[E]): QRes[Cause[E]] = QRes(c) } } diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index a4e09fcf55f..d3c47509c94 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -496,8 +496,8 @@ final class ZPipeline[-Env, +Err, -In, +Out] private ( ): ZPipeline[Env2, Err2, In, Out2] = self >>> ZPipeline.mapZIOPar(n)(f) - def mapZIOPar[Env2 <: Env, Err2 >: Err, Out2](n: => Int, bufferSize : => Int)(f: Out => ZIO[Env2, Err2, Out2])(implicit - trace: Trace + def mapZIOPar[Env2 <: Env, Err2 >: Err, Out2](n: => Int, bufferSize: => Int)(f: Out => ZIO[Env2, Err2, Out2])(implicit + trace: Trace ): ZPipeline[Env2, Err2, In, Out2] = self >>> ZPipeline.mapZIOPar(n, bufferSize)(f) @@ -511,8 +511,10 @@ final class ZPipeline[-Env, +Err, -In, +Out] private ( ): ZPipeline[Env2, Err2, In, Out2] = self >>> ZPipeline.mapZIOParUnordered(n)(f) - def mapZIOParUnordered[Env2 <: Env, Err2 >: Err, Out2](n: => Int, bufferSize : => Int)(f: Out => ZIO[Env2, Err2, Out2])(implicit - trace: Trace + def mapZIOParUnordered[Env2 <: Env, Err2 >: Err, Out2](n: => Int, bufferSize: => Int)( + f: Out => ZIO[Env2, Err2, Out2] + )(implicit + trace: Trace ): ZPipeline[Env2, Err2, In, Out2] = self >>> ZPipeline.mapZIOParUnordered(n, bufferSize)(f) @@ -1808,21 +1810,19 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { */ def mapZIOPar[Env, Err, In, Out](n: => Int)(f: In => ZIO[Env, Err, Out])(implicit trace: Trace - ): ZPipeline[Env, Err, In, Out] = { - ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + ): ZPipeline[Env, Err, In, Out] = + ZPipeline.fromFunction { (strm: ZStream[Any, Nothing, In]) => strm .mapZIOPar(n)(f) } - } - def mapZIOPar[Env, Err, In, Out](n: => Int, bufferSize : => Int)(f: In => ZIO[Env, Err, Out])(implicit - trace: Trace - ): ZPipeline[Env, Err, In, Out] = { - ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + def mapZIOPar[Env, Err, In, Out](n: => Int, bufferSize: => Int)(f: In => ZIO[Env, Err, Out])(implicit + trace: Trace + ): ZPipeline[Env, Err, In, Out] = + ZPipeline.fromFunction { (strm: ZStream[Any, Nothing, In]) => strm .mapZIOPar(n, bufferSize)(f) } - } /** * Maps over elements of the stream with the specified effectful function, @@ -1831,19 +1831,17 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { */ def mapZIOParUnordered[Env, Err, In, Out](n: => Int)(f: In => ZIO[Env, Err, Out])(implicit trace: Trace - ): ZPipeline[Env, Err, In, Out] = { - ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + ): ZPipeline[Env, Err, In, Out] = + ZPipeline.fromFunction { (strm: ZStream[Any, Nothing, In]) => strm.mapZIOParUnordered(n)(f) } - } - def mapZIOParUnordered[Env, Err, In, Out](n: => Int, bufferSize : => Int)(f: In => ZIO[Env, Err, Out])(implicit - trace: Trace - ): ZPipeline[Env, Err, In, Out] = { - ZPipeline.fromFunction{ (strm : ZStream[Any, Nothing, In]) => + def mapZIOParUnordered[Env, Err, In, Out](n: => Int, bufferSize: => Int)(f: In => ZIO[Env, Err, Out])(implicit + trace: Trace + ): ZPipeline[Env, Err, In, Out] = + ZPipeline.fromFunction { (strm: ZStream[Any, Nothing, In]) => strm.mapZIOParUnordered(n, bufferSize)(f) } - } /** * Emits the provided chunk before emitting any other value. diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index ea878e43490..cd87a5878d8 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1934,15 +1934,13 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, /*self >>> ZPipeline.mapZIOPar(n)(f)*/ this.mapZIOPar[R1, E1, A2](n, n)(f) - def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int, bufferSize : Int)(f: A => ZIO[R1, E1, A2])(implicit - trace: Trace - ): ZStream[R1, E1, A2] = { - self - .toChannel + def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: Int)(f: A => ZIO[R1, E1, A2])(implicit + trace: Trace + ): ZStream[R1, E1, A2] = + self.toChannel .concatMap(ZChannel.writeChunk(_)) .mapOutZIOPar[R1, E1, Chunk[A2]](n, bufferSize)(a => f(a).map(Chunk.single(_))) .toStream - } /** * Maps over elements of the stream with the specified effectful function, @@ -1969,10 +1967,9 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, mapZIOParUnordered[R1, E1, A2](n, 16)(f) def mapZIOParUnordered[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: => Int)(f: A => ZIO[R1, E1, A2])(implicit - trace: Trace + trace: Trace ): ZStream[R1, E1, A2] = - self - .toChannel + self.toChannel .concatMap(ZChannel.writeChunk(_)) .mapOutZIOParUnordered[R1, E1, Chunk[A2]](n, bufferSize)(a => f(a).map(Chunk.single(_))) .toStream From b228693d6ebd1c49e72d50da637044cb6460256a Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 6 May 2024 17:43:28 +0300 Subject: [PATCH 17/29] strm_mapZioPar_opt: remove ead code, add default prm --- .../src/main/scala/zio/internal/FiberRuntime.scala | 12 +----------- .../shared/src/main/scala/zio/stream/ZChannel.scala | 2 +- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 761c2618385..c33ef64e12b 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -95,16 +95,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } bldr.result() } - def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = { - - ZIO.succeed { - val childs = _children - if (childs == null) Chunk.empty - else - zio.internal.Sync(childs) { - Chunk.fromJavaIterable(childs) - } - } + def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { case (fib, _) => if (fib eq self) ZIO.succeed(self.childrenChunk) @@ -120,7 +111,6 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } } } - } def fiberRefs(implicit trace: Trace): UIO[FiberRefs] = ZIO.succeed(_fiberRefs) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index bb48e2b3b25..4b04fb7858d 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -715,7 +715,7 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon ZChannel.unwrap(z) } - final def mapOutZIOParUnordered[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize: Int)( + final def mapOutZIOParUnordered[Env1 <: Env, OutErr1 >: OutErr, OutElem2](n: Int, bufferSize: Int = 16)( f: OutElem => ZIO[Env1, OutErr1, OutElem2] )(implicit trace: Trace): ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone] = { val z0: ZIO[Any, Nothing, ZChannel[Env1, InErr, InElem, InDone, OutErr1, OutElem2, OutDone]] = for { From e8cc44fa8313735f8e51d5f25ab14d721bc9428e Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 6 May 2024 18:09:45 +0300 Subject: [PATCH 18/29] strm_mapZioPar_opt: fix CI issues --- .../test/scala/zio/stream/ZStreamSpec.scala | 29 ++++++++++--------- .../src/main/scala/zio/stream/ZChannel.scala | 2 ++ 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala index e5b9fba4751..b5713b7d403 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala @@ -2682,10 +2682,11 @@ object ZStreamSpec extends ZIOBaseSpec { for { interrupted <- Ref.make(false) latch <- Promise.make[Nothing, Unit] - fib <- ZStream(()) - .mapZIOParUnordered(1)(_ => (latch.succeed(()) *> ZIO.infinity).onInterrupt(interrupted.set(true))) - .runDrain - .fork + fib <- + ZStream(()) + .mapZIOParUnordered(1)(_ => (latch.succeed(()) *> ZIO.infinity).onInterrupt(interrupted.set(true))) + .runDrain + .fork _ <- latch.await _ <- fib.interrupt result <- interrupted.get @@ -2697,13 +2698,13 @@ object ZStreamSpec extends ZIOBaseSpec { latch1 <- Promise.make[Nothing, Unit] latch2 <- Promise.make[Nothing, Unit] result <- ZStream(1, 2, 3) - .mapZIOParUnordered(3) { - case 1 => (latch1.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1)) - case 2 => (latch2.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1)) - case 3 => latch1.await *> latch2.await *> ZIO.fail("Boom") - } - .runDrain - .exit + .mapZIOParUnordered(3) { + case 1 => (latch1.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1)) + case 2 => (latch2.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1)) + case 3 => latch1.await *> latch2.await *> ZIO.fail("Boom") + } + .runDrain + .exit count <- interrupted.get } yield assert(count)(equalTo(2)) && assert(result)(fails(equalTo("Boom"))) } @@ nonFlaky, @@ -2721,9 +2722,9 @@ object ZStreamSpec extends ZIOBaseSpec { test("propagates error of original stream") { for { fiber <- (ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) ++ ZStream.fail(new Throwable("Boom"))) - .mapZIOParUnordered(2)(_ => ZIO.sleep(1.second)) - .runDrain - .fork + .mapZIOParUnordered(2)(_ => ZIO.sleep(1.second)) + .runDrain + .fork _ <- TestClock.adjust(5.seconds) exit <- fiber.await } yield assert(exit)(fails(hasMessage(equalTo("Boom")))) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 4b04fb7858d..c6a462fe3b7 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -661,6 +661,8 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon rightFib.interrupt *> leftEx }, { case (rightEx, leftFib) => + //assist the scala3 compiler... + val rightEx1: Exit[Option[OutErr1], OutElem2] = rightEx leftFib.interrupt *> rightEx } ) From d6c9793dcdfb210b4f909e5e95175fe6176c30fa Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Mon, 6 May 2024 20:23:40 +0300 Subject: [PATCH 19/29] strm_mapZioPar_opt: fix scala3 compilation issue --- streams/shared/src/main/scala/zio/stream/ZChannel.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index c6a462fe3b7..7625d79962d 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -656,13 +656,11 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon u => ZIO.succeed(u) ) //.debug("forkF.folded") - .raceWith(failureSignal.await)( + .raceWith[Env1, Option[OutErr1], Option[OutErr1], Nothing, OutElem2](failureSignal.await)( { case (leftEx, rightFib) => rightFib.interrupt *> leftEx }, { case (rightEx, leftFib) => - //assist the scala3 compiler... - val rightEx1: Exit[Option[OutErr1], OutElem2] = rightEx leftFib.interrupt *> rightEx } ) From 6cf826507521c1503f3e8d186871d04b2133cbb7 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 7 May 2024 11:01:39 +0300 Subject: [PATCH 20/29] strm_mapZioPar_opt: address reviw comments --- .../scala/zio/internal/FiberRuntime.scala | 34 ++++++------------- .../src/main/scala/zio/stream/ZChannel.scala | 19 +++++------ .../src/main/scala/zio/stream/ZStream.scala | 3 +- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index c33ef64e12b..86e27c729d9 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -95,6 +95,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } bldr.result() } + def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { case (fib, _) => if (fib eq self) @@ -102,9 +103,10 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, else { ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k => ZIO.succeed { - this.tell { + self.tell { FiberMessage.Stateful { case (fib, _) => - k(ZIO.succeed(fib.childrenChunk)) + val childs = fib.childrenChunk + k(Exit.succeed(childs)) } } } @@ -506,9 +508,9 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, * * '''NOTE''': This method must be invoked by the fiber itself. */ - private[zio] def getChildren(): JavaSet[Fiber.Runtime[_, _]] = { + private def getChildren(): JavaSet[Fiber.Runtime[_, _]] = { if (_children eq null) { - _children = Platform. /*newConcurrentWeakSet*/ newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) + _children = Platform.newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) } _children } @@ -674,6 +676,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, var curr: Fiber.Runtime[_, _] = null + //this finds the next operable child fiber and stores it in the `curr` variable def skip() = { curr = null while (iterator.hasNext && (curr eq null)) { @@ -683,18 +686,13 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } } + //find the first operable child fiber + //if there isn't any we can simply return null and save ourselves an effect evaluation skip() if (null ne curr) { - val body = () => { - val c = curr - skip() - c.await(id.location) - } - - // Now await all children to finish: ZIO - .whileLoop(null ne curr)(body())(_ => ())(id.location) + .whileLoop(null ne curr)(curr.await(id.location))(_ => skip())(id.location) } else null } else null @@ -871,7 +869,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, * * '''NOTE''': This method must be invoked by the fiber itself. */ - private[zio] def removeChild(child: FiberRuntime[_, _]): Unit = + private def removeChild(child: FiberRuntime[_, _]): Unit = if (_children ne null) { _children.remove(child) () @@ -1198,16 +1196,6 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, done } - private def sendInterruptSignalToAllChildrenConcurrently(): Boolean = { - val childFibers = _children - - if (childFibers ne null) { - internal.Sync(childFibers) { - sendInterruptSignalToAllChildren(childFibers) - } - } else false - } - private def sendInterruptSignalToAllChildren( children: JavaSet[Fiber.Runtime[_, _]] ): Boolean = diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 7625d79962d..6e04bfc8b54 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -650,12 +650,11 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) fib <- restore { - f(a) //.debug("forkF.f(a)") + f(a) .foldCauseZIO( c => failureSignal.failCause(c.map(Some(_))) *> ZIO.refailCause(c.map(Some(_))), u => ZIO.succeed(u) ) - //.debug("forkF.folded") .raceWith[Env1, Option[OutErr1], Option[OutErr1], Nothing, OutElem2](failureSignal.await)( { case (leftEx, rightFib) => rightFib.interrupt *> leftEx @@ -664,7 +663,6 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon leftFib.interrupt *> rightEx } ) - //.debug("forkF.race") } .onExit(localScope.close(_)) .fork @@ -728,14 +726,15 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon for { localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) - z1 = f(a) - offer = z1 - .foldCauseZIO( - c => q.offer(QRes.failCause(c)), - a2 => q.offer(a2) - ) fib <- { - restore(offer) + restore { + val z1 = f(a) + z1 + .foldCauseZIO( + c => q.offer(QRes.failCause(c)), + a2 => q.offer(a2) + ) + } .onExit(localScope.close(_)) .unit .fork diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index cd87a5878d8..fa5dbe6feaa 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -1931,8 +1931,7 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace ): ZStream[R1, E1, A2] = - /*self >>> ZPipeline.mapZIOPar(n)(f)*/ - this.mapZIOPar[R1, E1, A2](n, n)(f) + self.mapZIOPar[R1, E1, A2](n, n)(f) def mapZIOPar[R1 <: R, E1 >: E, A2](n: => Int, bufferSize: Int)(f: A => ZIO[R1, E1, A2])(implicit trace: Trace From 918aa6c455aa55e2f9b1b3b433aafe71766c455f Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 7 May 2024 11:25:30 +0300 Subject: [PATCH 21/29] strm_mapZioPar_opt: address one more review comment --- streams/shared/src/main/scala/zio/stream/ZChannel.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 6e04bfc8b54..5ebd332727f 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -650,11 +650,9 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon localScope <- zio.Scope.make _ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) fib <- restore { - f(a) - .foldCauseZIO( - c => failureSignal.failCause(c.map(Some(_))) *> ZIO.refailCause(c.map(Some(_))), - u => ZIO.succeed(u) - ) + f(a).catchAllCause { c => + failureSignal.failCause(c.map(Some(_))) *> ZIO.refailCause(c.map(Some(_))) + } .raceWith[Env1, Option[OutErr1], Option[OutErr1], Nothing, OutElem2](failureSignal.await)( { case (leftEx, rightFib) => rightFib.interrupt *> leftEx From 11e09e173f7a426c9b51051b5ed835763faa4f33 Mon Sep 17 00:00:00 2001 From: eyal farago Date: Tue, 7 May 2024 14:06:34 +0300 Subject: [PATCH 22/29] Update core/shared/src/main/scala/zio/internal/FiberRuntime.scala reduce ObjectRef updates Co-authored-by: kyri-petrou <67301607+kyri-petrou@users.noreply.github.com> --- .../src/main/scala/zio/internal/FiberRuntime.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 86e27c729d9..327315016ed 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -678,12 +678,13 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, //this finds the next operable child fiber and stores it in the `curr` variable def skip() = { - curr = null - while (iterator.hasNext && (curr eq null)) { - curr = iterator.next() - if ((curr ne null) && !curr.isAlive()) - curr = null + var next = null + while (iterator.hasNext && (next eq null)) { + next = iterator.next() + if ((next ne null) && !next.isAlive()) + next = null } + curr = next } //find the first operable child fiber From 08d6510304f0c89af733f0b794806f21e0e4f10a Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Tue, 7 May 2024 17:17:06 +0300 Subject: [PATCH 23/29] strm_mapZioPar_opt: slight compilation fix --- core/shared/src/main/scala/zio/internal/FiberRuntime.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 327315016ed..ace860fbf12 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -678,7 +678,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, //this finds the next operable child fiber and stores it in the `curr` variable def skip() = { - var next = null + var next: Fiber.Runtime[_, _] = null while (iterator.hasNext && (next eq null)) { next = iterator.next() if ((next ne null) && !next.isAlive()) From a7246ad6243a4c33c7f1bd48e7c90e7186e88910 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Wed, 8 May 2024 09:28:10 +0300 Subject: [PATCH 24/29] strm_mapZioPar_opt: make QRes an AnyVal --- streams/shared/src/main/scala/zio/stream/ZChannel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 5ebd332727f..7387fb1986f 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -2261,7 +2261,7 @@ object ZChannel { self.provideSomeEnvironment(_.updateAt(key)(f)) } - private case class QRes[A](value: A) + private case class QRes[A](value: A) extends AnyVal private object QRes { val unit: QRes[Unit] = QRes(()) From 6f81f82e54f2e7d27f9f254a55a2d89daa0c66e6 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 10 May 2024 10:25:57 +0300 Subject: [PATCH 25/29] strm_mapZioPar_opt__childrenSet: directly expose the children set to other fibers, but use the observaton it is only mutated by the fiber itself in order to grnaularily control the required synchronization --- .../scala/zio/internal/FiberRuntime.scala | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 138717e6da9..dfe25cd7c0f 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -98,18 +98,15 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { case (fib, _) => - if (fib eq self) - ZIO.succeed(self.childrenChunk) - else { - ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k => - ZIO.succeed { - self.tell { - FiberMessage.Stateful { case (fib, _) => - val childs = fib.childrenChunk - k(Exit.succeed(childs)) - } - } - } + if (fib eq self) { + //read by the fiber itself, no need to synchronize as only the fiber itself can mutate the children set + Exit.succeed(self.childrenChunk) + } else if(self._children eq null) { + Exit.succeed(Chunk.empty) + } else { + //read by another fiber, must synchronize + zio.internal.Sync(_children) { + Exit.succeed(self.childrenChunk) } } } @@ -168,15 +165,23 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } private[zio] def addChild(child: Fiber.Runtime[_, _]): Unit = - if (isAlive()) { - getChildren().add(child) + if(child.isAlive()) { + if (isAlive()) { - if (isInterrupted()) + val childs = getChildren() + //any mutation to the children set must be synchronized + zio.internal.Sync(childs) { + childs.add(child) + } + + if (isInterrupted()) + child.tellInterrupt(getInterruptedCause()) + } else { child.tellInterrupt(getInterruptedCause()) - } else { - child.tellInterrupt(getInterruptedCause()) + } } + /** * Adds an interruptor to the set of interruptors that are interrupting this * fiber. @@ -873,7 +878,10 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, */ private def removeChild(child: FiberRuntime[_, _]): Unit = if (_children ne null) { - _children.remove(child) + //any mutation to the children set must be synchronized + zio.internal.Sync(_children) { + _children.remove(child) + } () } @@ -1406,6 +1414,12 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, private[zio] def transferChildren(scope: FiberScope): Unit = { val children = _children if ((children ne null) && !children.isEmpty) { + val childs = _children + //we're effectively clearing this set, seems cheaper to 'drop' it and allocate a new one if we spawn more fibers + //a concurrent children call might get the stale set, but this method (and its primary usage for dumping threads) + //is racy by definition + _children = null + //not mutating the set, so no need to synchronize val iterator = children.iterator() val flags = _runtimeFlags @@ -1416,10 +1430,10 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, // Unless we forked fibers and didn't await them, we shouldn't have any alive children in the set. if ((next ne null) && next.isAlive()) { scope.add(self, flags, next)(location, Unsafe.unsafe) - iterator.remove() } } } + } /** From 9fd7ff32d0cb4ace2d64c4b61eaa7b26de90cc70 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 10 May 2024 11:23:36 +0300 Subject: [PATCH 26/29] strm_mapZioPar_opt__childrenSet: optimize transferChildren by batching it --- core/shared/src/main/scala/zio/Fiber.scala | 2 + .../scala/zio/internal/FiberRuntime.scala | 57 +++++++++++++------ .../main/scala/zio/internal/FiberScope.scala | 46 +++++++++++++++ 3 files changed, 88 insertions(+), 17 deletions(-) diff --git a/core/shared/src/main/scala/zio/Fiber.scala b/core/shared/src/main/scala/zio/Fiber.scala index 17875b9a301..d93b700dcf5 100644 --- a/core/shared/src/main/scala/zio/Fiber.scala +++ b/core/shared/src/main/scala/zio/Fiber.scala @@ -530,6 +530,7 @@ object Fiber extends FiberPlatformSpecific { * '''NOTE''': This method must be invoked by the fiber itself. */ private[zio] def addChild(child: Fiber.Runtime[_, _]): Unit + private[zio] def addChildren(children: Iterable[Fiber.Runtime[_, _]]): Unit /** * Deletes the specified fiber ref. @@ -619,6 +620,7 @@ object Fiber extends FiberPlatformSpecific { * Adds a message to add a child to this fiber. */ private[zio] def tellAddChild(child: Fiber.Runtime[_, _]): Unit + private[zio] def tellAddChildren(children: Iterable[Fiber.Runtime[_, _]]): Unit /** * Adds a message to interrupt this fiber. diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index dfe25cd7c0f..ba06ceee198 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -181,6 +181,39 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } } + private[zio] def addChildren(children: Iterable[Fiber.Runtime[_, _]]): Unit = { + val iter = children.iterator + if(isAlive()) { + val childs = getChildren() + //any mutation to the children set must be synchronized + zio.internal.Sync(childs) { + if (isInterrupted()) { + val cause = getInterruptedCause() + while (iter.hasNext) { + val child = iter.next() + if (child.isAlive()) { + childs.add(child) + child.tellInterrupt(cause) + } + } + } else { + while (iter.hasNext) { + val child = iter.next() + if (child.isAlive()) + childs.add(child) + } + } + } + } else { + val cause = getInterruptedCause() + while (iter.hasNext) { + val child = iter.next() + if (child.isAlive()) + child.tellInterrupt(cause) + } + } + } + /** * Adds an interruptor to the set of interruptors that are interrupting this @@ -1401,6 +1434,9 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, private[zio] def tellAddChild(child: Fiber.Runtime[_, _]): Unit = tell(FiberMessage.Stateful((parentFiber, _) => parentFiber.addChild(child))) + private[zio] def tellAddChildren(children: Iterable[Fiber.Runtime[_, _]]): Unit = + tell(FiberMessage.Stateful((parentFiber, _) => parentFiber.addChildren(children))) + private[zio] def tellInterrupt(cause: Cause[Nothing]): Unit = tell(FiberMessage.InterruptSignal(cause)) @@ -1412,28 +1448,15 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, * evaluated the effects but prior to exiting */ private[zio] def transferChildren(scope: FiberScope): Unit = { - val children = _children - if ((children ne null) && !children.isEmpty) { - val childs = _children + if ((_children ne null) && !_children.isEmpty) { + val childs = childrenChunk //we're effectively clearing this set, seems cheaper to 'drop' it and allocate a new one if we spawn more fibers - //a concurrent children call might get the stale set, but this method (and its primary usage for dumping threads) + //a concurrent children call might get the stale set, but this method (and its primary usage for dumping fibers) //is racy by definition _children = null - //not mutating the set, so no need to synchronize - val iterator = children.iterator() val flags = _runtimeFlags - - while (iterator.hasNext) { - val next = iterator.next() - - // Only move alive children. - // Unless we forked fibers and didn't await them, we shouldn't have any alive children in the set. - if ((next ne null) && next.isAlive()) { - scope.add(self, flags, next)(location, Unsafe.unsafe) - } - } + scope.addAll(self, flags, childs)(location, Unsafe.unsafe) } - } /** diff --git a/core/shared/src/main/scala/zio/internal/FiberScope.scala b/core/shared/src/main/scala/zio/internal/FiberScope.scala index 3dddfb07e78..61ff5b012fc 100644 --- a/core/shared/src/main/scala/zio/internal/FiberScope.scala +++ b/core/shared/src/main/scala/zio/internal/FiberScope.scala @@ -36,6 +36,12 @@ private[zio] sealed trait FiberScope { trace: Trace, unsafe: Unsafe ): Unit + + private[zio] def addAll(currentFiber: Fiber.Runtime[_, _], runtimeFlags: RuntimeFlags, children: Iterable[Fiber.Runtime[_, _]])( + implicit + trace: Trace, + unsafe: Unsafe + ): Unit } private[zio] object FiberScope { @@ -56,6 +62,18 @@ private[zio] object FiberScope { if (RuntimeFlags.fiberRoots(runtimeFlags)) { Fiber._roots.add(child) } + + private[zio] def addAll(currentFiber: Fiber.Runtime[_, _], runtimeFlags: RuntimeFlags, children: Iterable[Fiber.Runtime[_, _]])( + implicit + trace: Trace, + unsafe: Unsafe + ): Unit = { + if (RuntimeFlags.fiberRoots(runtimeFlags)) { + children.foreach { + Fiber._roots.add(_) + } + } + } } private final class Local(val fiberId: FiberId, parentRef: WeakReference[Fiber.Runtime[_, _]]) extends FiberScope { @@ -85,6 +103,34 @@ private[zio] object FiberScope { child.tellInterrupt(Cause.interrupt(currentFiber.id)) } } + + private[zio] def addAll(currentFiber: Fiber.Runtime[_, _], runtimeFlags: RuntimeFlags, children: Iterable[Fiber.Runtime[_, _]])( + implicit + trace: Trace, + unsafe: Unsafe + ): Unit = if(children.nonEmpty) { + val parent = parentRef.get() + + if (parent ne null) { + // Parent is not GC'd. Let's check to see if the parent is the current + // fiber: + if (currentFiber eq parent) { + // The parent is the current fiber so it is safe to directly add the + // child to the parent: + parent.addChildren(children) + } else { + // The parent is not the current fiber. So we need to send a message + // to the parent so it will add the child to itself: + parent.tellAddChildren(children) + } + } else { + // Parent was GC'd. We immediately interrupt the child fiber using the id + // of the current fiber (which is adding the child to the parent): + children.foreach( + _.tellInterrupt(Cause.interrupt(currentFiber.id)) + ) + } + } } private[zio] def make(fiber: FiberRuntime[_, _]): FiberScope = From 3bdea625f0d18fda60818238dc655e8445f33abc Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 10 May 2024 11:24:18 +0300 Subject: [PATCH 27/29] strm_mapZioPar_opt__childrenSet: fmt --- .../scala/zio/internal/FiberRuntime.scala | 12 ++++----- .../main/scala/zio/internal/FiberScope.scala | 26 ++++++++++++------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index ba06ceee198..3ff1dde66ee 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -101,7 +101,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, if (fib eq self) { //read by the fiber itself, no need to synchronize as only the fiber itself can mutate the children set Exit.succeed(self.childrenChunk) - } else if(self._children eq null) { + } else if (self._children eq null) { Exit.succeed(Chunk.empty) } else { //read by another fiber, must synchronize @@ -165,7 +165,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } private[zio] def addChild(child: Fiber.Runtime[_, _]): Unit = - if(child.isAlive()) { + if (child.isAlive()) { if (isAlive()) { val childs = getChildren() @@ -183,7 +183,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, private[zio] def addChildren(children: Iterable[Fiber.Runtime[_, _]]): Unit = { val iter = children.iterator - if(isAlive()) { + if (isAlive()) { val childs = getChildren() //any mutation to the children set must be synchronized zio.internal.Sync(childs) { @@ -214,7 +214,6 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } } - /** * Adds an interruptor to the set of interruptors that are interrupting this * fiber. @@ -1447,17 +1446,16 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, * '''NOTE''': This method must be invoked by the fiber itself after it has * evaluated the effects but prior to exiting */ - private[zio] def transferChildren(scope: FiberScope): Unit = { + private[zio] def transferChildren(scope: FiberScope): Unit = if ((_children ne null) && !_children.isEmpty) { val childs = childrenChunk //we're effectively clearing this set, seems cheaper to 'drop' it and allocate a new one if we spawn more fibers //a concurrent children call might get the stale set, but this method (and its primary usage for dumping fibers) //is racy by definition _children = null - val flags = _runtimeFlags + val flags = _runtimeFlags scope.addAll(self, flags, childs)(location, Unsafe.unsafe) } - } /** * Updates a fiber ref belonging to this fiber by using the provided update diff --git a/core/shared/src/main/scala/zio/internal/FiberScope.scala b/core/shared/src/main/scala/zio/internal/FiberScope.scala index 61ff5b012fc..9a35c3ff56f 100644 --- a/core/shared/src/main/scala/zio/internal/FiberScope.scala +++ b/core/shared/src/main/scala/zio/internal/FiberScope.scala @@ -37,8 +37,11 @@ private[zio] sealed trait FiberScope { unsafe: Unsafe ): Unit - private[zio] def addAll(currentFiber: Fiber.Runtime[_, _], runtimeFlags: RuntimeFlags, children: Iterable[Fiber.Runtime[_, _]])( - implicit + private[zio] def addAll( + currentFiber: Fiber.Runtime[_, _], + runtimeFlags: RuntimeFlags, + children: Iterable[Fiber.Runtime[_, _]] + )(implicit trace: Trace, unsafe: Unsafe ): Unit @@ -63,17 +66,19 @@ private[zio] object FiberScope { Fiber._roots.add(child) } - private[zio] def addAll(currentFiber: Fiber.Runtime[_, _], runtimeFlags: RuntimeFlags, children: Iterable[Fiber.Runtime[_, _]])( - implicit + private[zio] def addAll( + currentFiber: Fiber.Runtime[_, _], + runtimeFlags: RuntimeFlags, + children: Iterable[Fiber.Runtime[_, _]] + )(implicit trace: Trace, unsafe: Unsafe - ): Unit = { + ): Unit = if (RuntimeFlags.fiberRoots(runtimeFlags)) { children.foreach { Fiber._roots.add(_) } } - } } private final class Local(val fiberId: FiberId, parentRef: WeakReference[Fiber.Runtime[_, _]]) extends FiberScope { @@ -104,11 +109,14 @@ private[zio] object FiberScope { } } - private[zio] def addAll(currentFiber: Fiber.Runtime[_, _], runtimeFlags: RuntimeFlags, children: Iterable[Fiber.Runtime[_, _]])( - implicit + private[zio] def addAll( + currentFiber: Fiber.Runtime[_, _], + runtimeFlags: RuntimeFlags, + children: Iterable[Fiber.Runtime[_, _]] + )(implicit trace: Trace, unsafe: Unsafe - ): Unit = if(children.nonEmpty) { + ): Unit = if (children.nonEmpty) { val parent = parentRef.get() if (parent ne null) { From eb1ef45f785876b8875432fd262a89bff83612ed Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Fri, 10 May 2024 20:37:03 +0300 Subject: [PATCH 28/29] strm_mapZioPar_opt: FiberRuntime.children, extra safety measure to protect from race with transferChildren --- .../scala/zio/internal/FiberRuntime.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 3ff1dde66ee..26129431dad 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -86,14 +86,18 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, ) } - private def childrenChunk = if (_children eq null) Chunk.empty - else { - val bldr = Chunk.newBuilder[Fiber.Runtime[_, _]] - _children.forEach { child => - if ((child ne null) && child.isAlive()) - bldr.addOne(child) + private def childrenChunk = { + //may be executed by a foreign fiber (under Sync), hence we're risking a race over the _children variable being set back to null by a concurrent transferChildren call + val childs = _children + if (childs eq null) Chunk.empty + else { + val bldr = Chunk.newBuilder[Fiber.Runtime[_, _]] + childs.forEach { child => + if ((child ne null) && child.isAlive()) + bldr.addOne(child) + } + bldr.result() } - bldr.result() } def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = @@ -101,12 +105,16 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, if (fib eq self) { //read by the fiber itself, no need to synchronize as only the fiber itself can mutate the children set Exit.succeed(self.childrenChunk) - } else if (self._children eq null) { - Exit.succeed(Chunk.empty) } else { - //read by another fiber, must synchronize - zio.internal.Sync(_children) { - Exit.succeed(self.childrenChunk) + //may be racing with the fiber running transferChildren, hence must save _children in a local val + val childs = _children + if (childs eq null) { + Exit.succeed(Chunk.empty) + } else { + //read by another fiber, must synchronize + zio.internal.Sync(childs) { + Exit.succeed(self.childrenChunk) + } } } } @@ -538,6 +546,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, * '''NOTE''': This method must be invoked by the fiber itself. */ private def getChildren(): JavaSet[Fiber.Runtime[_, _]] = { + //executed by the fiber itself, no risk of racing with transferChildren if (_children eq null) { _children = Platform.newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) } From b8d2dbdca8de4eae271a259cc86f9b046ed78e0f Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 12 May 2024 22:16:32 +0300 Subject: [PATCH 29/29] strm_mapZioPar_opt: fiber runtime, back to synchronized child fibers set --- .../scala/zio/internal/FiberRuntime.scala | 62 ++++++------------- 1 file changed, 18 insertions(+), 44 deletions(-) diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 26129431dad..9171ed67005 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -101,23 +101,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] = - ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { case (fib, _) => - if (fib eq self) { - //read by the fiber itself, no need to synchronize as only the fiber itself can mutate the children set - Exit.succeed(self.childrenChunk) - } else { - //may be racing with the fiber running transferChildren, hence must save _children in a local val - val childs = _children - if (childs eq null) { - Exit.succeed(Chunk.empty) - } else { - //read by another fiber, must synchronize - zio.internal.Sync(childs) { - Exit.succeed(self.childrenChunk) - } - } - } - } + ZIO.succeed(self.childrenChunk) def fiberRefs(implicit trace: Trace): UIO[FiberRefs] = ZIO.succeed(_fiberRefs) @@ -175,12 +159,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, private[zio] def addChild(child: Fiber.Runtime[_, _]): Unit = if (child.isAlive()) { if (isAlive()) { - - val childs = getChildren() - //any mutation to the children set must be synchronized - zio.internal.Sync(childs) { - childs.add(child) - } + getChildren().add(child) if (isInterrupted()) child.tellInterrupt(getInterruptedCause()) @@ -193,24 +172,22 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, val iter = children.iterator if (isAlive()) { val childs = getChildren() - //any mutation to the children set must be synchronized - zio.internal.Sync(childs) { - if (isInterrupted()) { - val cause = getInterruptedCause() - while (iter.hasNext) { - val child = iter.next() - if (child.isAlive()) { - childs.add(child) - child.tellInterrupt(cause) - } - } - } else { - while (iter.hasNext) { - val child = iter.next() - if (child.isAlive()) - childs.add(child) + + if (isInterrupted()) { + val cause = getInterruptedCause() + while (iter.hasNext) { + val child = iter.next() + if (child.isAlive()) { + childs.add(child) + child.tellInterrupt(cause) } } + } else { + while (iter.hasNext) { + val child = iter.next() + if (child.isAlive()) + childs.add(child) + } } } else { val cause = getInterruptedCause() @@ -548,7 +525,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, private def getChildren(): JavaSet[Fiber.Runtime[_, _]] = { //executed by the fiber itself, no risk of racing with transferChildren if (_children eq null) { - _children = Platform.newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) + _children = Platform.newConcurrentWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) } _children } @@ -919,10 +896,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, */ private def removeChild(child: FiberRuntime[_, _]): Unit = if (_children ne null) { - //any mutation to the children set must be synchronized - zio.internal.Sync(_children) { - _children.remove(child) - } + _children.remove(child) () }