Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize ZStream's mapZIOPar and mapZIOParUnordered #8819

Merged
merged 36 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
411f287
strm_mapZioPar_opt: introduce mapZioPar and mapZioParUnordered 'direc…
eyalfa Apr 26, 2024
6efc2d7
strm_mapZioPar_opt: stream.mapZIOParUnordered
eyalfa Apr 26, 2024
d911350
strm_mapZioPar_opt: avoid mapZIOPar atm, add tests for mapZIOParUnord…
eyalfa Apr 28, 2024
59533ad
Merge branch 'series/2.x' into strm_mapZioPar_opt
eyalfa Apr 28, 2024
277bf35
strm_mapZioPar_opt: mapZIOPar2
eyalfa Apr 28, 2024
58cabb4
strm_mapZioPar_opt: use queue of Exit rather than a queue of Take
eyalfa Apr 30, 2024
0c31b9d
strm_mapZioPar_opt: strm.mapZIOParUnordered, run upstream in a scoped…
eyalfa May 1, 2024
b7cb92e
strm_mapZioPar_opt__fiberChildren: make fiber's children collection f…
eyalfa May 1, 2024
6596142
strm_mapZioPar_opt: mapZIOParUnordered2, record counting impl
eyalfa May 3, 2024
560ebcd
strm_mapZioPar_opt: strm.mapZIOParUnordered, ditch counting, rely on …
eyalfa May 3, 2024
b9eb3ac
strm_mapZioPar_opt: make sure strm.mapZIOParUnordered doesn't break w…
eyalfa May 3, 2024
42dfe12
Merge branch 'strm_mapZioPar_opt' into strm_mapZioPar_opt__fiberChildren
eyalfa May 3, 2024
6ad59d8
strm_mapZioPar_opt__fiberChildren: strm.mapZIOPar, use queue of fiber…
eyalfa May 5, 2024
7ad415b
strm_mapZioPar_opt__fiberChildren: fix issues in mapZIOPar
eyalfa May 5, 2024
fe66a4d
strm_mapZioPar_opt__chhannel: introduce a channel.mapZIOPar implement…
eyalfa May 5, 2024
1e2bfd8
strm_mapZioPar_opt__chhannel: duplicate the impls to the channel level
eyalfa May 6, 2024
8118984
strm_mapZioPar_opt__chhannel: add stream+pl methods for mapZIOPar and…
eyalfa May 6, 2024
077898f
Merge branch 'series/2.x' into strm_mapZioPar_opt
eyalfa May 6, 2024
5bbfe03
strm_mapZioPar_opt: fmt
eyalfa May 6, 2024
b228693
strm_mapZioPar_opt: remove ead code, add default prm
eyalfa May 6, 2024
e8cc44f
strm_mapZioPar_opt: fix CI issues
eyalfa May 6, 2024
d6c9793
strm_mapZioPar_opt: fix scala3 compilation issue
eyalfa May 6, 2024
6cf8265
strm_mapZioPar_opt: address reviw comments
eyalfa May 7, 2024
918aa6c
strm_mapZioPar_opt: address one more review comment
eyalfa May 7, 2024
11e09e1
Update core/shared/src/main/scala/zio/internal/FiberRuntime.scala
eyalfa May 7, 2024
08d6510
strm_mapZioPar_opt: slight compilation fix
eyalfa May 7, 2024
a7246ad
strm_mapZioPar_opt: make QRes an AnyVal
eyalfa May 8, 2024
552a6b4
Merge branch 'series/2.x' into strm_mapZioPar_opt
eyalfa May 8, 2024
6f81f82
strm_mapZioPar_opt__childrenSet: directly expose the children set to …
eyalfa May 10, 2024
9fd7ff3
strm_mapZioPar_opt__childrenSet: optimize transferChildren by batchin…
eyalfa May 10, 2024
3bdea62
strm_mapZioPar_opt__childrenSet: fmt
eyalfa May 10, 2024
d921095
Merge branch 'series/2.x' into strm_mapZioPar_opt
eyalfa May 10, 2024
eb1ef45
strm_mapZioPar_opt: FiberRuntime.children, extra safety measure to pr…
eyalfa May 10, 2024
ba2e2ef
Merge branch 'series/2.x' into strm_mapZioPar_opt
eyalfa May 12, 2024
b8d2dbd
strm_mapZioPar_opt: fiber runtime, back to synchronized child fibers set
eyalfa May 12, 2024
6edd498
Merge branch 'series/2.x' into strm_mapZioPar_opt
eyalfa May 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 45 additions & 26 deletions core/shared/src/main/scala/zio/internal/FiberRuntime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,32 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs,
)
}

private def childrenChunk = if (_children eq null) Chunk.empty
else {
val bldr = Chunk.newBuilder[Fiber.Runtime[_, _]]
_children.forEach { child =>
if ((child ne null) && child.isAlive())
bldr.addOne(child)
}
bldr.result()
}
eyalfa marked this conversation as resolved.
Show resolved Hide resolved

def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]] =
ZIO.succeed {
val childs = _children
if (childs == null) Chunk.empty
else
zio.internal.Sync(childs) {
Chunk.fromJavaIterable(childs)
ZIO.withFiberRuntime[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { case (fib, _) =>
if (fib eq self)
ZIO.succeed(self.childrenChunk)
else {
ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k =>
ZIO.succeed {
self.tell {
FiberMessage.Stateful { case (fib, _) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this optimization will cause .children to block if the underlying fiber is blocked. This means that operations like fiber dump will not work properly on Loom.

I would like to eliminate the synchronization on the children set, too, but I'm afraid it is probably going to require a new lock-free set implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u please elaborate on this? under the current implementation the fiber is 'waking up' on queued messages, what's going to happen in loom? r u concerned about scenarios where the fiber is blocking on something like promises? does this mean FiberMessage.Stateful is going away? will FiberRuntime even have a queue like it does today?
furthermore, current implementation relies on FiberMessage.Stateful for common operations like tellAddChild (used by zio.internal.FiberScope.Local#add), so I don't think this specific change introduces any new blocking risk.

seems like going 'loom' will require substantial changes to zio fibers...

val childs = fib.childrenChunk
k(Exit.succeed(childs))
}
}
}
}
}
}

def fiberRefs(implicit trace: Trace): UIO[FiberRefs] = ZIO.succeed(_fiberRefs)
Expand Down Expand Up @@ -482,9 +500,9 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs,
*
* '''NOTE''': This method must be invoked by the fiber itself.
*/
private[zio] def getChildren(): JavaSet[Fiber.Runtime[_, _]] = {
private def getChildren(): JavaSet[Fiber.Runtime[_, _]] = {
if (_children eq null) {
_children = Platform.newConcurrentWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe)
_children = Platform.newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a big optimization potential here. Currently we're initializing the Set using the default underlying size (size 16, can accommodate 12 entries before resizing).

Since now we can add children in bulk, we therefore know how many entries the set is meant to hold (at least initially). So we can instantiate it with a size large enough to accommodate all the entries without the need to dynamically resize.

Note that to calculate the size based on the number of expected entries you need to use this equation: size = Math.ceil(nEntries / 0.75d)

I would also advice to use 16 as the minimum size as we don't get much benefit sizing it smaller, but can be really bad if we need to keep resizing later (since resizing happens in Pow2 increments)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about this potential, I think it goes beyond that since some of the spawned fibers ae very short lived and may already e finished by the time they're added to the fiber scope.
another thought I had, when forking multiple 'very similar' fibers (foreachParXXX), is it possible to come up with some kind of 'template' for the inherited fiber refs? they are all forked from the same parent so all should have very similar fiber refs except from the few which have a specialized 'fork' logic which perhaps can be pre-identified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, is it mandatory for _children to be a set? the remove operation is currently not used, we also never add the same fiber twice so the collection is effectively guaranteed to be a set.
I suspect an array based impl with 'occasional' GC can be sufficient here

Copy link
Contributor Author

@eyalfa eyalfa May 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyri-petrou I suspect I may have to restore the synchronized weak set, seems like the set's isEmpty method effectively modifies it.

if there was a way to get the 'naive' set's size we'd still be ok, but given the current situation I think we must restore the synchronized weak set and seek alternative (I still think this doesn't have to be a set at all).

another alternative is to restore the message based children impl and address @jdegoes comment once ZIO actually goes loom.

}
_children
}
Expand Down Expand Up @@ -651,18 +669,29 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs,
private def interruptAllChildren(): UIO[Any] =
if (sendInterruptSignalToAllChildren(_children)) {
val iterator = _children.iterator()

_children = null

val body = () => {
val next = iterator.next()
var curr: Fiber.Runtime[_, _] = null

if (next != null) next.await(id.location) else Exit.unit
//this finds the next operable child fiber and stores it in the `curr` variable
def skip() = {
var next: Fiber.Runtime[_, _] = null
while (iterator.hasNext && (next eq null)) {
next = iterator.next()
if ((next ne null) && !next.isAlive())
next = null
}
curr = next
}

// Now await all children to finish:
ZIO
.whileLoop(iterator.hasNext)(body())(_ => ())(id.location)
//find the first operable child fiber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

//if there isn't any we can simply return null and save ourselves an effect evaluation
skip()

if (null ne curr) {
ZIO
.whileLoop(null ne curr)(curr.await(id.location))(_ => skip())(id.location)
} else null
} else null

private[zio] def isAlive(): Boolean =
Expand Down Expand Up @@ -842,7 +871,7 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs,
*
* '''NOTE''': This method must be invoked by the fiber itself.
*/
private[zio] def removeChild(child: FiberRuntime[_, _]): Unit =
private def removeChild(child: FiberRuntime[_, _]): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

if (_children ne null) {
_children.remove(child)
()
Expand Down Expand Up @@ -1169,16 +1198,6 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs,
done
}

private def sendInterruptSignalToAllChildrenConcurrently(): Boolean = {
val childFibers = _children

if (childFibers ne null) {
internal.Sync(childFibers) {
sendInterruptSignalToAllChildren(childFibers)
}
} else false
}

private def sendInterruptSignalToAllChildren(
children: JavaSet[Fiber.Runtime[_, _]]
): Boolean =
Expand Down
53 changes: 52 additions & 1 deletion streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2677,7 +2677,58 @@ object ZStreamSpec extends ZIOBaseSpec {
val stream =
ZStream.fromIterable(0 to 3).mapZIOParUnordered(10)(_ => ZIO.fail("fail"))
assertZIO(stream.runDrain.exit)(fails(equalTo("fail")))
} @@ nonFlaky @@ TestAspect.diagnose(10.seconds)
} @@ nonFlaky @@ TestAspect.diagnose(10.seconds),
test("interruption propagation") {
for {
interrupted <- Ref.make(false)
latch <- Promise.make[Nothing, Unit]
fib <-
ZStream(())
.mapZIOParUnordered(1)(_ => (latch.succeed(()) *> ZIO.infinity).onInterrupt(interrupted.set(true)))
.runDrain
.fork
_ <- latch.await
_ <- fib.interrupt
result <- interrupted.get
} yield assert(result)(isTrue)
},
test("interrupts pending tasks when one of the tasks fails U") {
for {
interrupted <- Ref.make(0)
latch1 <- Promise.make[Nothing, Unit]
latch2 <- Promise.make[Nothing, Unit]
result <- ZStream(1, 2, 3)
.mapZIOParUnordered(3) {
case 1 => (latch1.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1))
case 2 => (latch2.succeed(()) *> ZIO.never).onInterrupt(interrupted.update(_ + 1))
case 3 => latch1.await *> latch2.await *> ZIO.fail("Boom")
}
.runDrain
.exit
count <- interrupted.get
} yield assert(count)(equalTo(2)) && assert(result)(fails(equalTo("Boom")))
} @@ nonFlaky,
test("awaits children fibers properly") {
assertZIO(
ZStream
.fromIterable((0 to 100))
.interruptWhen(ZIO.never)
.mapZIOParUnordered(8)(_ => ZIO.succeed(1).repeatN(2000))
.runDrain
.exit
.map(_.isInterrupted)
)(equalTo(false))
},
test("propagates error of original stream") {
for {
fiber <- (ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) ++ ZStream.fail(new Throwable("Boom")))
.mapZIOParUnordered(2)(_ => ZIO.sleep(1.second))
.runDrain
.fork
_ <- TestClock.adjust(5.seconds)
exit <- fiber.await
} yield assert(exit)(fails(hasMessage(equalTo("Boom"))))
}
),
suite("mergeLeft/Right")(
test("mergeLeft with HaltStrategy.Right terminates as soon as the right stream terminates") {
Expand Down