diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index 524a2def1f..4d08927e33 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -247,16 +247,22 @@ object Dispatcher { supervisor <- Supervisor[F](await, Some((_: Outcome[F, Throwable, _]) => true)) _ <- { - def step(state: Array[AtomicReference[List[Registration]]], await: F[Unit]): F[Unit] = + def step( + state: Array[AtomicReference[List[Registration]]], + await: F[Unit], + doneR: AtomicBoolean): F[Unit] = for { + done <- F.delay(doneR.get()) regs <- F delay { val buffer = mutable.ListBuffer.empty[Registration] var i = 0 while (i < workers) { val st = state(i) - if (st.get() ne Nil) { - val list = st.getAndSet(Nil) - buffer ++= list.reverse // FIFO order here is a form of fairness + if (st.get() ne null) { + val list = if (done) st.getAndSet(null) else st.getAndSet(Nil) + if ((list ne null) && (list ne Nil)) { + buffer ++= list.reverse // FIFO order here is a form of fairness + } } i += 1 } @@ -294,7 +300,7 @@ object Dispatcher { F.delay(latch.set(Noop)) *> // reset latch // if we're marked as done, yield immediately to give other fibers a chance to shut us down // we might loop on this a few times since we're marked as done before the supervisor is canceled - F.delay(doneR.get()).ifM(F.cede, step(state, await)) + F.delay(doneR.get()).ifM(F.cede, step(state, await, doneR)) } 0.until(workers).toList traverse_ { n => @@ -303,15 +309,11 @@ object Dispatcher { val worker = dispatcher(doneR, latch, states(n)) val release = F.delay(latch.getAndSet(Open)()) Resource.make(supervisor.supervise(worker)) { _ => - F.delay(doneR.set(true)) *> step(states(n), F.unit) *> release + F.delay(doneR.set(true)) *> step(states(n), F.unit, doneR) *> release } } } } - - // Alive is the innermost resource so that when releasing - // the very first thing we do is set dispatcher to un-alive - alive <- Resource.make(F.delay(new AtomicBoolean(true)))(ref => F.delay(ref.set(false))) } yield { new Dispatcher[F] { override def unsafeRunAndForget[A](fa: F[A]): Unit = { @@ -361,65 +363,57 @@ object Dispatcher { @tailrec def enqueue(state: AtomicReference[List[Registration]], reg: Registration): Unit = { val curr = state.get() - val next = reg :: curr - - if (!state.compareAndSet(curr, next)) enqueue(state, reg) + if (curr eq null) { + throw new IllegalStateException("dispatcher already shutdown") + } else { + val next = reg :: curr + if (!state.compareAndSet(curr, next)) enqueue(state, reg) + } } - if (alive.get()) { - val (state, lt) = if (workers > 1) { - val rand = ThreadLocalRandom.current() - val dispatcher = rand.nextInt(workers) - val inner = rand.nextInt(workers) + val (state, lt) = if (workers > 1) { + val rand = ThreadLocalRandom.current() + val dispatcher = rand.nextInt(workers) + val inner = rand.nextInt(workers) - (states(dispatcher)(inner), latches(dispatcher)) - } else { - (states(0)(0), latches(0)) - } + (states(dispatcher)(inner), latches(dispatcher)) + } else { + (states(0)(0), latches(0)) + } - val reg = Registration(action, registerCancel _) - enqueue(state, reg) + val reg = Registration(action, registerCancel _) + enqueue(state, reg) - if (lt.get() ne Open) { - val f = lt.getAndSet(Open) - f() - } + if (lt.get() ne Open) { + val f = lt.getAndSet(Open) + f() + } - val cancel = { () => - reg.lazySet(false) - - @tailrec - def loop(): Future[Unit] = { - val state = cancelState.get() - state match { - case CancelInit => - val promise = Promise[Unit]() - if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) { - loop() - } else { - promise.future - } - case CanceledNoToken(promise) => + val cancel = { () => + reg.lazySet(false) + + @tailrec + def loop(): Future[Unit] = { + val state = cancelState.get() + state match { + case CancelInit => + val promise = Promise[Unit]() + if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) { + loop() + } else { promise.future - case CancelToken(cancelToken) => - cancelToken() - } + } + case CanceledNoToken(promise) => + promise.future + case CancelToken(cancelToken) => + cancelToken() } - - loop() } - // double-check after we already put things in the structure - if (alive.get()) { - (promise.future, cancel) - } else { - // we were shutdown *during* the enqueue - cancel() - throw new IllegalStateException("dispatcher already shutdown") - } - } else { - throw new IllegalStateException("dispatcher already shutdown") + loop() } + + (promise.future, cancel) } } }