Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combinator inference #935 #981

Merged
merged 10 commits into from Jul 26, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Expand Up @@ -249,7 +249,7 @@ lazy val sharedSourcesSettings = Seq(
lazy val root = project
.in(file("."))
.disablePlugins(MimaPlugin)
.aggregate(coreJVM, coreJS, lawsJVM, lawsJS, tracingTests)
.aggregate(coreJVM, coreJS, lawsJVM, lawsJS, runtimeTests)
.settings(skipOnPublishSettings)

lazy val core = crossProject(JSPlatform, JVMPlatform)
Expand Down Expand Up @@ -310,8 +310,8 @@ lazy val lawsJS = laws.js

lazy val FullTracingTest = config("fulltracing").extend(Test)

lazy val tracingTests = project
.in(file("tracing-tests"))
lazy val runtimeTests = project
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runtimeTests or systemTests or something else was what I had in mind

.in(file("runtime-tests"))
.dependsOn(coreJVM)
.settings(commonSettings ++ skipOnPublishSettings)
.settings(
Expand Down
61 changes: 17 additions & 44 deletions core/shared/src/main/scala/cats/effect/IO.scala
Expand Up @@ -103,9 +103,9 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] {
*/
final def map[B](f: A => B): IO[B] = {
val trace = if (isCachedStackTracing) {
IOTracing.cached(4, f.getClass)
IOTracing.cached(f.getClass)
} else if (isFullStackTracing) {
IOTracing.uncached(4)
IOTracing.uncached()
} else {
null
}
Expand All @@ -130,9 +130,9 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] {
*/
final def flatMap[B](f: A => IO[B]): IO[B] = {
val trace = if (isCachedStackTracing) {
IOTracing.cached(3, f.getClass)
IOTracing.cached(f.getClass)
} else if (isFullStackTracing) {
IOTracing.uncached(3)
IOTracing.uncached()
} else {
null
}
Expand Down Expand Up @@ -1143,7 +1143,7 @@ object IO extends IOInstances {
def delay[A](body: => A): IO[A] = {
val nextIo = Delay(() => body)
if (isFullStackTracing) {
IOTracing.decorated(nextIo, 1)
IOTracing.decorated(nextIo)
} else {
nextIo
}
Expand All @@ -1160,7 +1160,7 @@ object IO extends IOInstances {
def suspend[A](thunk: => IO[A]): IO[A] = {
val nextIo = Suspend(() => thunk)
if (isFullStackTracing) {
IOTracing.decorated(nextIo, 2)
IOTracing.decorated(nextIo)
} else {
nextIo
}
Expand All @@ -1179,7 +1179,7 @@ object IO extends IOInstances {
def pure[A](a: A): IO[A] = {
val nextIo = Pure(a)
if (isFullStackTracing) {
IOTracing.decorated(nextIo, 0)
IOTracing.decorated(nextIo)
} else {
nextIo
}
Expand Down Expand Up @@ -1248,21 +1248,12 @@ object IO extends IOInstances {
*
* @see [[asyncF]] and [[cancelable]]
*/
def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
val trace = if (isCachedStackTracing) {
IOTracing.cached(5, k.getClass)
} else if (isFullStackTracing) {
IOTracing.uncached(5)
} else {
null
}

Async[A]((_, _, cb) => {
def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] =
IOAsync[A]((_, _, cb) => {
val cb2 = Callback.asyncIdempotent(null, cb)
try k(cb2)
catch { case NonFatal(t) => cb2(Left(t)) }
}, trace = trace)
}
}, traceKey = k)

/**
* Suspends an asynchronous side effect in `IO`, this being a variant
Expand All @@ -1288,16 +1279,8 @@ object IO extends IOInstances {
*
* @see [[async]] and [[cancelable]]
*/
def asyncF[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = {
val trace = if (isCachedStackTracing) {
IOTracing.cached(6, k.getClass)
} else if (isFullStackTracing) {
IOTracing.uncached(6)
} else {
null
}

Async[A](
def asyncF[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] =
IOAsync[A](
(conn, _, cb) => {
// Must create new connection, otherwise we can have a race
// condition b/t the bind continuation and `startCancelable` below
Expand All @@ -1310,9 +1293,8 @@ object IO extends IOInstances {
catch { case NonFatal(t) => IO(cb2(Left(t))) }
IORunLoop.startCancelable(fa, conn2, Callback.report)
},
trace = trace
traceKey = k
)
}

/**
* Builds a cancelable `IO`.
Expand Down Expand Up @@ -1353,16 +1335,8 @@ object IO extends IOInstances {
* @see [[asyncF]] for a more potent version that does hook into
* the underlying cancelation model
*/
def cancelable[A](k: (Either[Throwable, A] => Unit) => CancelToken[IO]): IO[A] = {
val trace = if (isCachedStackTracing) {
IOTracing.cached(7, k.getClass)
} else if (isFullStackTracing) {
IOTracing.uncached(7)
} else {
null
}

Async[A](
def cancelable[A](k: (Either[Throwable, A] => Unit) => CancelToken[IO]): IO[A] =
IOAsync[A](
(conn, _, cb) => {
val cb2 = Callback.asyncIdempotent(conn, cb)
val ref = ForwardCancelable()
Expand All @@ -1382,9 +1356,8 @@ object IO extends IOInstances {
else
ref.complete(IO.unit)
},
trace = trace
traceKey = k
)
}

/**
* Constructs an `IO` which sequences the specified exception.
Expand All @@ -1399,7 +1372,7 @@ object IO extends IOInstances {
def raiseError[A](e: Throwable): IO[A] = {
val nextIo = RaiseError(e)
if (isFullStackTracing) {
IOTracing.decorated(nextIo, 8)
IOTracing.decorated(nextIo)
} else {
nextIo
}
Expand Down
40 changes: 40 additions & 0 deletions core/shared/src/main/scala/cats/effect/internals/IOAsync.scala
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.internals

import cats.effect.IO
import cats.effect.internals.TracingPlatform.{isCachedStackTracing, isFullStackTracing}

private[effect] object IOAsync {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably move this into IO companion object, but I'm not sure what to name it


// Conveniennce function for internal Async calls that intend
// to opt into tracing so the following code isn't repeated.
def apply[A](k: (IOConnection, IOContext, Either[Throwable, A] => Unit) => Unit,
trampolineAfter: Boolean = false,
traceKey: AnyRef = null): IO[A] = {
val trace = if (isCachedStackTracing) {
IOTracing.cached(traceKey.getClass)
} else if (isFullStackTracing) {
IOTracing.uncached()
} else {
null
}

IO.Async((conn, ctx, cb) => k(conn, ctx, cb), trampolineAfter, trace)
}

}
56 changes: 29 additions & 27 deletions core/shared/src/main/scala/cats/effect/internals/IOBracket.scala
Expand Up @@ -30,25 +30,28 @@ private[effect] object IOBracket {
* Implementation for `IO.bracketCase`.
*/
def apply[A, B](acquire: IO[A])(use: A => IO[B])(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] =
IO.Async[B] { (conn, ctx, cb) =>
// Placeholder for the future finalizer
val deferredRelease = ForwardCancelable()
conn.push(deferredRelease.cancel)
// Race-condition check, avoiding starting the bracket if the connection
// was cancelled already, to ensure that `cancel` really blocks if we
// start `acquire` — n.b. `isCanceled` is visible here due to `push`
if (!conn.isCanceled) {
// Note `acquire` is uncancelable due to usage of `IORunLoop.restart`
// (in other words it is disconnected from our IOConnection)
// We don't need to explicitly pass back a reference to `ctx` because
// it is held in `RestartCallback` and `BracketStart`.
// Updates to it in the run-loop will be visible when the callback is
// invoked, even across asynchronous boundaries.
IORunLoop.restart(acquire, ctx, new BracketStart(use, release, ctx, conn, deferredRelease, cb))
} else {
deferredRelease.complete(IO.unit)
}
}
IOAsync[B](
{ (conn, ctx, cb) =>
// Placeholder for the future finalizer
val deferredRelease = ForwardCancelable()
conn.push(deferredRelease.cancel)
// Race-condition check, avoiding starting the bracket if the connection
// was cancelled already, to ensure that `cancel` really blocks if we
// start `acquire` — n.b. `isCanceled` is visible here due to `push`
if (!conn.isCanceled) {
// Note `acquire` is uncancelable due to usage of `IORunLoop.restart`
// (in other words it is disconnected from our IOConnection)
// We don't need to explicitly pass back a reference to `ctx` because
// it is held in `RestartCallback` and `BracketStart`.
// Updates to it in the run-loop will be visible when the callback is
// invoked, even across asynchronous boundaries.
IORunLoop.restart(acquire, ctx, new BracketStart(use, release, ctx, conn, deferredRelease, cb))
} else {
deferredRelease.complete(IO.unit)
}
},
traceKey = use
)

// Internals of `IO.bracketCase`.
final private class BracketStart[A, B](
Expand Down Expand Up @@ -106,10 +109,10 @@ private[effect] object IOBracket {
* Implementation for `IO.guaranteeCase`.
*/
def guaranteeCase[A](source: IO[A], release: ExitCase[Throwable] => IO[Unit]): IO[A] =
IO.Async { (conn, ctx, cb) =>
// Light async boundary, otherwise this will trigger a StackOverflowException
ec.execute(new Runnable {
def run(): Unit = {
IOAsync[A](
(conn, ctx, cb) =>
// Light async boundary, otherwise this will trigger a StackOverflowException
ec.execute { () =>
val frame = new EnsureReleaseFrame[A](release)
val onNext = source.flatMap(frame)
// Registering our cancelable token ensures that in case
Expand All @@ -121,10 +124,9 @@ private[effect] object IOBracket {
if (!conn.isCanceled) {
IORunLoop.restartCancelable(onNext, conn, ctx, cb)
}
}
})
// TODO: Trace here?
}
},
traceKey = release
)

final private class BracketReleaseFrame[A, B](a: A, releaseFn: (A, ExitCase[Throwable]) => IO[Unit])
extends BaseReleaseFrame[A, B] {
Expand Down
20 changes: 10 additions & 10 deletions core/shared/src/main/scala/cats/effect/internals/IOTracing.scala
Expand Up @@ -24,28 +24,28 @@ import cats.effect.tracing.IOEvent

private[effect] object IOTracing {

def decorated[A](source: IO[A], tag: Int): IO[A] =
Trace(source, buildFrame(tag))
def decorated[A](source: IO[A]): IO[A] =
Trace(source, buildFrame())

def uncached(tag: Int): IOEvent =
buildFrame(tag)
def uncached(): IOEvent =
buildFrame()

def cached(tag: Int, clazz: Class[_]): IOEvent =
buildCachedFrame(tag, clazz)
def cached(clazz: Class[_]): IOEvent =
buildCachedFrame(clazz)

private def buildCachedFrame(tag: Int, clazz: Class[_]): IOEvent = {
private def buildCachedFrame(clazz: Class[_]): IOEvent = {
val currentFrame = frameCache.get(clazz)
if (currentFrame eq null) {
val newFrame = buildFrame(tag)
val newFrame = buildFrame()
frameCache.put(clazz, newFrame)
newFrame
} else {
currentFrame
}
}

private def buildFrame(tag: Int): IOEvent =
IOEvent.StackTrace(tag, new Throwable())
private def buildFrame(): IOEvent =
IOEvent.StackTrace(new Throwable())

/**
* Global cache for trace frames. Keys are references to lambda classes.
Expand Down
Expand Up @@ -20,7 +20,7 @@ sealed abstract class IOEvent

object IOEvent {

final case class StackTrace(tag: Int, throwable: Throwable) extends IOEvent {
final case class StackTrace(throwable: Throwable) extends IOEvent {
def stackTrace: List[StackTraceElement] =
throwable.getStackTrace().toList
}
Expand Down