Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove final fields startIO and startEC #1773

Merged
merged 9 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/js/src/main/scala/cats/effect/IOFiberConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ private[effect] object IOFiberConstants {

// resume ids
val ExecR: Byte = 0
val AsyncContinueR: Byte = 1
val BlockingR: Byte = 2
val AfterBlockingSuccessfulR: Byte = 3
val AfterBlockingFailedR: Byte = 4
val EvalOnR: Byte = 5
val CedeR: Byte = 6
val AutoCedeR: Byte = 7
val DoneR: Byte = 8
val AsyncContinueSuccessfulR: Byte = 1
val AsyncContinueFailedR: Byte = 2
val BlockingR: Byte = 3
val AfterBlockingSuccessfulR: Byte = 4
val AfterBlockingFailedR: Byte = 5
val EvalOnR: Byte = 6
val CedeR: Byte = 7
val AutoCedeR: Byte = 8
val DoneR: Byte = 9

// ContState tags
val ContStateInitial: Int = 0
Expand Down
17 changes: 9 additions & 8 deletions core/jvm/src/main/java/cats/effect/IOFiberConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ final class IOFiberConstants {

// resume ids
public static final byte ExecR = 0;
public static final byte AsyncContinueR = 1;
public static final byte BlockingR = 2;
public static final byte AfterBlockingSuccessfulR = 3;
public static final byte AfterBlockingFailedR = 4;
public static final byte EvalOnR = 5;
public static final byte CedeR = 6;
public static final byte AutoCedeR = 7;
public static final byte DoneR = 8;
public static final byte AsyncContinueSuccessfulR = 1;
public static final byte AsyncContinueFailedR = 2;
public static final byte BlockingR = 3;
public static final byte AfterBlockingSuccessfulR = 4;
public static final byte AfterBlockingFailedR = 5;
public static final byte EvalOnR = 6;
public static final byte CedeR = 7;
public static final byte AutoCedeR = 8;
public static final byte DoneR = 9;

// ContState tags
public static final int ContStateInitial = 0;
Expand Down
89 changes: 47 additions & 42 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private final class IOFiber[A](
private[this] val objectState = new ArrayStack[AnyRef](16)

/* fast-path to head */
private[this] var currentCtx: ExecutionContext = _
private[this] var currentCtx: ExecutionContext = startEC
private[this] var ctxs: ArrayStack[ExecutionContext] = _

private[this] var canceled: Boolean = false
Expand All @@ -113,6 +113,7 @@ private final class IOFiber[A](

/* mutable state for resuming the fiber in different states */
private[this] var resumeTag: Byte = ExecR
private[this] var resumeIO: IO[Any] = startIO

/* prefetch for Right(()) */
private[this] val RightUnit = IOFiber.RightUnit
Expand All @@ -132,14 +133,15 @@ private final class IOFiber[A](
try {
(resumeTag: @switch) match {
case 0 => execR()
case 1 => asyncContinueR()
case 2 => blockingR()
case 3 => afterBlockingSuccessfulR()
case 4 => afterBlockingFailedR()
case 5 => evalOnR()
case 6 => cedeR()
case 7 => autoCedeR()
case 8 => ()
case 1 => asyncContinueSuccessfulR()
case 2 => asyncContinueFailedR()
case 3 => blockingR()
case 4 => afterBlockingSuccessfulR()
case 5 => afterBlockingFailedR()
case 6 => evalOnR()
case 7 => cedeR()
case 8 => autoCedeR()
case 9 => ()
}
} catch {
case t: Throwable =>
Expand Down Expand Up @@ -235,8 +237,9 @@ private final class IOFiber[A](
if (shouldFinalize()) {
asyncCancel(null)
} else if (iteration >= autoYieldThreshold) {
objectState.push(cur0)
autoCede()
resumeIO = cur0
resumeTag = AutoCedeR
rescheduleFiber(currentCtx)(this)
} else {
// This is a modulo operation in disguise. `iteration` is reset every time
// the runloop yields automatically by the runloop always starting from
Expand Down Expand Up @@ -563,8 +566,14 @@ private final class IOFiber[A](
if (!shouldFinalize()) {
/* we weren't cancelled, so schedule the runloop for execution */
val ec = currentCtx
resumeTag = AsyncContinueR
objectState.push(e)
e match {
case Left(t) =>
resumeTag = AsyncContinueFailedR
objectState.push(t)
case Right(a) =>
resumeTag = AsyncContinueSuccessfulR
objectState.push(a.asInstanceOf[AnyRef])
}
execute(ec)(this)
} else {
/*
Expand Down Expand Up @@ -735,7 +744,8 @@ private final class IOFiber[A](

/* Cede */
case 16 =>
cede()
resumeTag = CedeR
rescheduleFiber(currentCtx)(this)

case 17 =>
val cur = cur0.asInstanceOf[Start[Any]]
Expand Down Expand Up @@ -781,7 +791,7 @@ private final class IOFiber[A](
conts.push(EvalOnK)

resumeTag = EvalOnR
objectState.push(cur.ioa)
resumeIO = cur.ioa
Comment on lines -784 to +794
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait… what? How? Doesn't this break down if you nest multiple evalOns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each evalOn relinquishes the runloop immediately with execute(ec)(this), so the pushing to the objectState is immediately followed by a pop in evalOnR. It's the exact same logic.

execute(ec)(this)
}

Expand All @@ -791,7 +801,7 @@ private final class IOFiber[A](

if (cur.hint eq TypeBlocking) {
resumeTag = BlockingR
objectState.push(cur)
resumeIO = cur
runtime.blocking.execute(this)
} else {
runLoop(interruptibleImpl(cur, runtime.blocking), nextIteration)
Expand Down Expand Up @@ -824,6 +834,7 @@ private final class IOFiber[A](
masks = initMask

resumeTag = DoneR
resumeIO = null
/*
* Write barrier to publish masks. The thread which owns the runloop is
* effectively a single writer, so lazy set can be utilized for relaxed
Expand Down Expand Up @@ -870,16 +881,6 @@ private final class IOFiber[A](
}
}

private[this] def cede(): Unit = {
resumeTag = CedeR
rescheduleFiber(currentCtx)(this)
}

private[this] def autoCede(): Unit = {
resumeTag = AutoCedeR
rescheduleFiber(currentCtx)(this)
}

/*
* We should attempt finalization if all of the following are true:
* 1) We own the runloop
Expand Down Expand Up @@ -970,7 +971,7 @@ private final class IOFiber[A](
var k: Byte = -1

/*
* short circuit on error by dropping map, flatMap, and auto-cede continuations
* short circuit on error by dropping map and flatMap continuations
* until we hit a continuation that needs to deal with errors.
*/
while (i >= 0 && k < 0) {
Expand Down Expand Up @@ -1058,26 +1059,28 @@ private final class IOFiber[A](
conts.push(RunTerminusK)

ctxs = new ArrayStack[ExecutionContext](2)
currentCtx = startEC
ctxs.push(startEC)
ctxs.push(currentCtx)

runLoop(startIO, 0)
val io = resumeIO
resumeIO = null
runLoop(io, 0)
}
}

private[this] def asyncContinueR(): Unit = {
val e = objectState.pop().asInstanceOf[Either[Throwable, Any]]
val next = e match {
case Left(t) => failed(t, 0)
case Right(a) => succeeded(a, 0)
}
private[this] def asyncContinueSuccessfulR(): Unit = {
val a = objectState.pop().asInstanceOf[Any]
runLoop(succeeded(a, 0), 0)
}

runLoop(next, 0)
private[this] def asyncContinueFailedR(): Unit = {
val t = objectState.pop().asInstanceOf[Throwable]
runLoop(failed(t, 0), 0)
}

private[this] def blockingR(): Unit = {
var error: Throwable = null
val cur = objectState.pop().asInstanceOf[Blocking[Any]]
val cur = resumeIO.asInstanceOf[Blocking[Any]]
resumeIO = null
val r =
try cur.thunk()
catch {
Expand All @@ -1086,7 +1089,7 @@ private final class IOFiber[A](

if (error == null) {
resumeTag = AfterBlockingSuccessfulR
objectState.push(r.asInstanceOf[Object])
objectState.push(r.asInstanceOf[AnyRef])
} else {
resumeTag = AfterBlockingFailedR
objectState.push(error)
Expand All @@ -1105,7 +1108,8 @@ private final class IOFiber[A](
}

private[this] def evalOnR(): Unit = {
val ioa = objectState.pop().asInstanceOf[IO[Any]]
val ioa = resumeIO
resumeIO = null
runLoop(ioa, 0)
}

Expand All @@ -1114,7 +1118,8 @@ private final class IOFiber[A](
}

private[this] def autoCedeR(): Unit = {
val io = objectState.pop().asInstanceOf[IO[Any]]
val io = resumeIO
resumeIO = null
runLoop(io, 0)
}

Expand Down Expand Up @@ -1189,7 +1194,7 @@ private final class IOFiber[A](

if (!shouldFinalize()) {
resumeTag = AfterBlockingSuccessfulR
objectState.push(result.asInstanceOf[Object])
objectState.push(result.asInstanceOf[AnyRef])
execute(ec)(this)
} else {
asyncCancel(null)
Expand Down