From 5ae5aeddd8c13f830cfc381928a78d1b8cf69a57 Mon Sep 17 00:00:00 2001 From: Sam Pillsworth Date: Tue, 23 May 2023 12:43:37 -0400 Subject: [PATCH 1/3] naive attempt to have workers close their queues --- .../scala/cats/effect/std/Dispatcher.scala | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 524a2def1f..dbf349627d 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -247,16 +247,24 @@ object Dispatcher { supervisor <- Supervisor[F](await, Some((_: Outcome[F, Throwable, _]) => true)) _ <- { - def step(state: Array[AtomicReference[List[Registration]]], await: F[Unit]): F[Unit] = + def step( + state: Array[AtomicReference[List[Registration]]], + await: F[Unit], + doneR: AtomicBoolean): F[Unit] = for { + done <- F.delay(doneR.get()) regs <- F delay { val buffer = mutable.ListBuffer.empty[Registration] var i = 0 while (i < workers) { val st = state(i) - if (st.get() ne Nil) { - val list = st.getAndSet(Nil) - buffer ++= list.reverse // FIFO order here is a form of fairness + if ((st.get() ne Nil) & (st.get() ne null)) { + val list = if (done) st.getAndSet(null) else st.getAndSet(Nil) + if (list ne null) { + buffer ++= list.reverse // FIFO order here is a form of fairness + } else { + println("SRP :: st.getAndSet returned null") // TODO + } } i += 1 } @@ -294,7 +302,7 @@ object Dispatcher { F.delay(latch.set(Noop)) *> // reset latch // if we're marked as done, yield immediately to give other fibers a chance to shut us down // we might loop on this a few times since we're marked as done before the supervisor is canceled - F.delay(doneR.get()).ifM(F.cede, step(state, await)) + F.delay(doneR.get()).ifM(F.cede, step(state, await, doneR)) } 0.until(workers).toList traverse_ { n => @@ -303,7 +311,7 @@ object Dispatcher { val worker = dispatcher(doneR, latch, states(n)) val release = F.delay(latch.getAndSet(Open)()) Resource.make(supervisor.supervise(worker)) { _ => - F.delay(doneR.set(true)) *> step(states(n), F.unit) *> release + F.delay(doneR.set(true)) *> step(states(n), F.unit, doneR) *> release } } } @@ -361,9 +369,12 @@ object Dispatcher { @tailrec def enqueue(state: AtomicReference[List[Registration]], reg: Registration): Unit = { val curr = state.get() - val next = reg :: curr - - if (!state.compareAndSet(curr, next)) enqueue(state, reg) + if (curr eq null) { + throw new IllegalStateException("dispatcher already shutdown") + } else { + val next = reg :: curr + if (!state.compareAndSet(curr, next)) enqueue(state, reg) + } } if (alive.get()) { @@ -410,13 +421,13 @@ object Dispatcher { } // double-check after we already put things in the structure - if (alive.get()) { - (promise.future, cancel) - } else { - // we were shutdown *during* the enqueue - cancel() - throw new IllegalStateException("dispatcher already shutdown") - } + // if (alive.get()) { + (promise.future, cancel) + // } else { + // // we were shutdown *during* the enqueue + // cancel() + // throw new IllegalStateException("dispatcher already shutdown") + // } } else { throw new IllegalStateException("dispatcher already shutdown") } From 34a44d77630df749ed7a430e3ad14a203d697672 Mon Sep 17 00:00:00 2001 From: Sam Pillsworth Date: Wed, 24 May 2023 14:08:57 -0400 Subject: [PATCH 2/3] set state to null if done, even if state is empty --- std/shared/src/main/scala/cats/effect/std/Dispatcher.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index dbf349627d..5f60fe6b1f 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -258,12 +258,10 @@ object Dispatcher { var i = 0 while (i < workers) { val st = state(i) - if ((st.get() ne Nil) & (st.get() ne null)) { + if (st.get() ne null) { val list = if (done) st.getAndSet(null) else st.getAndSet(Nil) - if (list ne null) { + if ((list ne null) && (list ne Nil)) { buffer ++= list.reverse // FIFO order here is a form of fairness - } else { - println("SRP :: st.getAndSet returned null") // TODO } } i += 1 From e3ea3aca2b4177c2922b2f7c2a986adad2fa53c0 Mon Sep 17 00:00:00 2001 From: Sam Pillsworth Date: Wed, 24 May 2023 14:13:35 -0400 Subject: [PATCH 3/3] remove `alive` --- .../scala/cats/effect/std/Dispatcher.scala | 83 ++++++++----------- 1 file changed, 34 insertions(+), 49 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 5f60fe6b1f..4d08927e33 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -314,10 +314,6 @@ object Dispatcher { } } } - - // Alive is the innermost resource so that when releasing - // the very first thing we do is set dispatcher to un-alive - alive <- Resource.make(F.delay(new AtomicBoolean(true)))(ref => F.delay(ref.set(false))) } yield { new Dispatcher[F] { override def unsafeRunAndForget[A](fa: F[A]): Unit = { @@ -375,60 +371,49 @@ object Dispatcher { } } - if (alive.get()) { - val (state, lt) = if (workers > 1) { - val rand = ThreadLocalRandom.current() - val dispatcher = rand.nextInt(workers) - val inner = rand.nextInt(workers) + val (state, lt) = if (workers > 1) { + val rand = ThreadLocalRandom.current() + val dispatcher = rand.nextInt(workers) + val inner = rand.nextInt(workers) - (states(dispatcher)(inner), latches(dispatcher)) - } else { - (states(0)(0), latches(0)) - } + (states(dispatcher)(inner), latches(dispatcher)) + } else { + (states(0)(0), latches(0)) + } - val reg = Registration(action, registerCancel _) - enqueue(state, reg) + val reg = Registration(action, registerCancel _) + enqueue(state, reg) - if (lt.get() ne Open) { - val f = lt.getAndSet(Open) - f() - } + if (lt.get() ne Open) { + val f = lt.getAndSet(Open) + f() + } + + val cancel = { () => + reg.lazySet(false) - val cancel = { () => - reg.lazySet(false) - - @tailrec - def loop(): Future[Unit] = { - val state = cancelState.get() - state match { - case CancelInit => - val promise = Promise[Unit]() - if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) { - loop() - } else { - promise.future - } - case CanceledNoToken(promise) => + @tailrec + def loop(): Future[Unit] = { + val state = cancelState.get() + state match { + case CancelInit => + val promise = Promise[Unit]() + if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) { + loop() + } else { promise.future - case CancelToken(cancelToken) => - cancelToken() - } + } + case CanceledNoToken(promise) => + promise.future + case CancelToken(cancelToken) => + cancelToken() } - - loop() } - // double-check after we already put things in the structure - // if (alive.get()) { - (promise.future, cancel) - // } else { - // // we were shutdown *during* the enqueue - // cancel() - // throw new IllegalStateException("dispatcher already shutdown") - // } - } else { - throw new IllegalStateException("dispatcher already shutdown") + loop() } + + (promise.future, cancel) } } }