diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 079791a42..79ebee035 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,7 +73,7 @@ jobs: run: sbt ++${{ matrix.scala }} docs/mdoc - name: Compress target directories - run: tar cf targets.tar modules/logging/interop/shapeless/target modules/kernel/target modules/logging/target examples/target modules/concurrent/target modules/logging/layout/target modules/optics/core/target modules/kernelCE2Interop/target modules/doobie/core/target modules/zio/target tofu-docs/target modules/zio/logging/target modules/core3/target modules/kernel/interop/cats-mtl/target modules/config/target modules/optics/macro/target modules/kernelCE3Interop/target modules/zio/core/target modules/streams/target modules/logging/structured/target modules/logging/interop/log4cats/target modules/doobie/logging/target modules/enums/target target modules/fs2/target modules/logging/derivation/target modules/logging/interop/refined/target modules/higherKindCore/target modules/derivation/target modules/core/target modules/observable/target modules/optics/interop/target modules/memo/target modules/env/target project/target + run: tar cf targets.tar modules/logging/interop/shapeless/target modules/kernel/target modules/logging/target examples/target modules/doobie/core-ce3/target modules/concurrent/target modules/logging/layout/target modules/optics/core/target modules/kernelCE2Interop/target modules/doobie/core/target modules/zio/target examples-ce3/target tofu-docs/target modules/zio/logging/target modules/core3/target modules/kernel/interop/cats-mtl/target modules/config/target modules/optics/macro/target modules/doobie/logging-ce3/target modules/kernelCE3Interop/target modules/zio/core/target modules/streams/target modules/logging/structured/target modules/logging/interop/log4cats/target modules/doobie/logging/target modules/enums/target target modules/fs2/target modules/logging/derivation/target modules/logging/interop/refined/target modules/higherKindCore/target modules/derivation/target modules/core/target modules/observable/target modules/optics/interop/target modules/memo/target modules/env/target project/target - name: Upload target directories uses: actions/upload-artifact@v2 diff --git a/.scalafix.conf b/.scalafix.conf index 84a0d28fa..73a2fac58 100644 --- a/.scalafix.conf +++ b/.scalafix.conf @@ -1,4 +1,5 @@ OrganizeImports { - groupedImports = Merge removeUnused = true + groups = ["re:javax?\\.", "scala.", "tofu." "*"] + groupedImports = AggressiveMerge } \ No newline at end of file diff --git a/build.sbt b/build.sbt index 4bcfc03c7..d462d3b6d 100644 --- a/build.sbt +++ b/build.sbt @@ -300,12 +300,28 @@ lazy val doobie = project lazy val doobieLogging = project .in(file("modules/doobie/logging")) .settings( - libraryDependencies ++= List(doobieCore), defaultSettings, name := "tofu-doobie-logging", ) .dependsOn(doobie, loggingStr) +lazy val doobieCE3 = project + .in(file("modules/doobie/core-ce3")) + .settings( + libraryDependencies ++= List(doobieCoreCE3, derevo), + defaultSettings, + name := "tofu-doobie-ce3", + ) + .dependsOn(core3, derivation) + +lazy val doobieLoggingCE3 = project + .in(file("modules/doobie/logging-ce3")) + .settings( + defaultSettings, + name := "tofu-doobie-logging-ce3", + ) + .dependsOn(doobieCE3, loggingStr) + lazy val examples = project .in(file("examples")) .settings( @@ -317,6 +333,16 @@ lazy val examples = project ) .dependsOn(mainModuleDeps: _*) +lazy val examplesCE3 = project + .in(file("examples-ce3")) + .settings( + libraryDependencies ++= List(doobieCoreCE3, doobieH2CE3, derevo, groovy), + defaultSettings, + name := "tofu-examples-ce3", + noPublishSettings, + ) + .dependsOn(ce3MainModuleDeps: _*) + lazy val streams = project .in(file("modules/streams")) .settings( @@ -350,9 +376,15 @@ lazy val ce3CoreModules = Vector( lazy val commonModules = Vector(observable, opticsInterop, logging, enums, config, zioInterop, fs2Interop, doobie, doobieLogging) +lazy val ce3CommonModules = + Vector(loggingStr, loggingDer, loggingLayout, doobieCE3, doobieLoggingCE3) + lazy val allModuleRefs = (coreModules ++ commonModules).map(x => x: ProjectReference) lazy val mainModuleDeps = (coreModules ++ commonModules).map(x => x: ClasspathDep[ProjectReference]) +lazy val ce3AllModuleRefs = (ce3CoreModules ++ ce3CommonModules).map(x => x: ProjectReference) +lazy val ce3MainModuleDeps = (ce3CoreModules ++ ce3CommonModules).map(x => x: ClasspathDep[ProjectReference]) + lazy val docs = project // new documentation project .in(file("tofu-docs")) .settings( @@ -375,7 +407,11 @@ lazy val tofu = project defaultSettings, name := "tofu" ) - .aggregate((coreModules ++ commonModules ++ ce3CoreModules :+ docs :+ examples).map(x => x: ProjectReference): _*) + .aggregate( + (coreModules ++ commonModules ++ ce3CoreModules ++ ce3CommonModules :+ docs :+ examples :+ examplesCE3).map(x => + x: ProjectReference + ): _* + ) .dependsOn(coreModules.map(x => x: ClasspathDep[ProjectReference]): _*) lazy val defaultScalacOptions = scalacOptions := { diff --git a/examples-ce3/src/main/resources/logback.groovy b/examples-ce3/src/main/resources/logback.groovy new file mode 100644 index 000000000..18ab22e3e --- /dev/null +++ b/examples-ce3/src/main/resources/logback.groovy @@ -0,0 +1,19 @@ +import tofu.logging.logback.ConsoleContextLayout +import tofu.logging.ELKLayout + +//puts messages as plain text +appender("PLAIN-COLORED", ConsoleAppender) { + encoder(LayoutWrappingEncoder) { + layout(ConsoleContextLayout) { + pattern = "%cyan(%d{HH:mm:ss} %-5level %logger{36} - %msg%n [%mdc]%n)" + } + } +} +//puts messages as JSONs +appender("STRUCTURED", ConsoleAppender) { + encoder(LayoutWrappingEncoder) { + layout(ELKLayout) + } +} + +root(DEBUG, ["PLAIN-COLORED", "STRUCTURED"]) \ No newline at end of file diff --git a/examples-ce3/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala b/examples-ce3/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala new file mode 100644 index 000000000..a3dd7a77b --- /dev/null +++ b/examples-ce3/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala @@ -0,0 +1,149 @@ +package tofu.example.doobie + +import cats.data.ReaderT +import cats.effect.std.Dispatcher +import cats.effect.{Async, IO, IOApp, Sync} +import cats.tagless.syntax.functorK._ +import cats.{Apply, Monad} +import derevo.derive +import doobie._ +import doobie.implicits._ +import doobie.util.log.LogHandler +import tofu.doobie.LiftConnectionIO +import tofu.doobie.log.{EmbeddableLogHandler, LogHandlerF} +import tofu.doobie.transactor.Txr +import tofu.higherKind.derived.representableK +import tofu.kernel.types.PerformThrow +import tofu.lift.Lift +import tofu.logging.derivation.{loggable, loggingMidTry} +import tofu.logging.{Logging, LoggingCompanion} +import tofu.syntax.context._ +import tofu.syntax.doobie.log.handler._ +import tofu.syntax.doobie.log.string._ +import tofu.syntax.monadic._ +import tofu.{Delay, Tries, WithContext, WithLocal, WithRun} + +import scala.annotation.unused + +// Simple context +@derive(loggable) +final case class Ctx(traceId: String) + +// Model +@derive(loggable) +final case class Person(id: Long, name: String, deptId: Long) + +@derive(loggable) +final case class Dept(id: Long, name: String) + +// Person SQL algebra +@derive(representableK, loggingMidTry) +trait PersonSql[F[_]] { + def init: F[Unit] + def create(person: Person): F[Unit] + def read(id: Long): F[Option[Person]] +} + +object PersonSql extends LoggingCompanion[PersonSql] { + def make[DB[_]: Monad: LiftConnectionIO: EmbeddableLogHandler]: PersonSql[DB] = { + EmbeddableLogHandler[DB].embedLift(implicit lh => new Impl) + } + + final class Impl(implicit lh: LogHandler) extends PersonSql[ConnectionIO] { + def init: ConnectionIO[Unit] = + lsql"create table if not exists person(id int8, name varchar(50), dept_id int8)".update.run.void + def create(p: Person): ConnectionIO[Unit] = + lsql"insert into person values(${p.id}, ${p.name}, ${p.deptId})".update.run.void + def read(id: Long): ConnectionIO[Option[Person]] = + lsql"select id, name, dept_id from person where id = $id" + .query[Person] + .option + } +} + +// Department SQL algebra +@derive(representableK, loggingMidTry) +trait DeptSql[F[_]] { + def init: F[Unit] + def create(dept: Dept): F[Unit] + def read(id: Long): F[Option[Dept]] +} + +object DeptSql extends LoggingCompanion[DeptSql] { + def make[DB[_]: Monad: LiftConnectionIO: EmbeddableLogHandler]: DeptSql[DB] = { + EmbeddableLogHandler[DB].embedLift(implicit lh => new Impl) + } + + final class Impl(implicit lh: LogHandler) extends DeptSql[ConnectionIO] { + def init: ConnectionIO[Unit] = + lsql"create table if not exists department(id int8, name varchar(50))".update.run.void + def create(d: Dept): ConnectionIO[Unit] = + lsql"insert into department values(${d.id}, ${d.name})".update.run.void + def read(id: Long): ConnectionIO[Option[Dept]] = + lsql"select id, name from department where id = $id" + .query[Dept] + .option + } +} + +// Storage algebra encapsulates database transactional logic +@derive(representableK, loggingMidTry) +trait PersonStorage[F[_]] { + def init: F[Unit] + def store(person: Person, dept: Dept): F[Unit] +} + +object PersonStorage extends LoggingCompanion[PersonStorage] { + def make[F[_]: Apply, DB[_]: Monad: Txr[F, *[_]]]( + persSql: PersonSql[DB], + deptSql: DeptSql[DB] + ): PersonStorage[F] = { + val impl = new Impl[DB](persSql, deptSql): PersonStorage[DB] + val tx = Txr[F, DB].trans + impl.mapK(tx) + } + + final class Impl[DB[_]: Monad](persSql: PersonSql[DB], deptSql: DeptSql[DB]) extends PersonStorage[DB] { + def init: DB[Unit] = + deptSql.init >> persSql.init + def store(person: Person, dept: Dept): DB[Unit] = + deptSql.create(dept) >> persSql.create(person) + } +} + +object TofuDoobieExample extends IOApp.Simple { + + val run: IO[Unit] = Dispatcher[IO].use { (disp: Dispatcher[IO]) => + @unused implicit val withDispatcher: WithContext[ReaderT[IO, Ctx, *], Dispatcher[IO]] = WithContext.const(disp) + runF[IO, ReaderT[IO, Ctx, *]] + } + + def runF[I[_]: Async, F[_]: Sync: PerformThrow: WithRun[*[_], I, Ctx]]: I[Unit] = { + // Simplified wiring below + implicit val loggingF = Logging.Make.contextual[F, Ctx] + + val transactor = Transactor.fromDriverManager[I]( + driver = "org.h2.Driver", + url = "jdbc:h2:./test" + ) + implicit val txr = Txr.continuational(transactor.mapK(Lift.trans[I, F])) + + def initStorage[ + DB[_]: Tries: Txr[F, *[_]]: Delay: Monad: LiftConnectionIO: WithLocal[*[_], Ctx]: PerformThrow + ]: PersonStorage[F] = { + implicit val loggingDB = Logging.Make.contextual[DB, Ctx] + + implicit val elh = EmbeddableLogHandler.async(LogHandlerF.loggable[DB](Logging.Debug)) + + val personSql = PersonSql.make[DB].attachErrLogs + val deptSql = DeptSql.make[DB].attachErrLogs + + PersonStorage.make[F, DB](personSql, deptSql).attachErrLogs + } + + val storage = initStorage[txr.DB] + val program = storage.init >> storage.store(Person(13L, "Alex", 42L), Dept(42L, "Marketing")) + val launch = runContext(program)(Ctx("715a-562a-4da5-a6e0")) + launch + } +} diff --git a/examples/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala b/examples/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala index fa11ea726..2bbec50bf 100644 --- a/examples/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala +++ b/examples/src/main/scala/tofu/example/doobie/TofuDoobieExample.scala @@ -1,24 +1,25 @@ package tofu.example.doobie import cats.data.ReaderT -import cats.effect.{ContextShift, Effect, IO, IOApp, Sync} -import cats.{Apply, Monad} +import cats.effect.{Async, ContextShift, IO, IOApp, Sync} import cats.tagless.syntax.functorK._ +import cats.{Apply, Monad} import derevo.derive import doobie._ import doobie.implicits._ import doobie.util.log.LogHandler +import tofu.doobie.LiftConnectionIO import tofu.doobie.log.{EmbeddableLogHandler, LogHandlerF} import tofu.doobie.transactor.Txr -import tofu.doobie.LiftConnectionIO import tofu.higherKind.derived.representableK -import tofu.lift.{Lift, UnliftIO} +import tofu.kernel.types.PerformThrow +import tofu.lift.Lift import tofu.logging.derivation.{loggable, loggingMidTry} -import tofu.logging.{Logging, LoggingCompanion, Logs} +import tofu.logging.{Logging, LoggingCompanion} import tofu.syntax.context._ -import tofu.syntax.monadic._ import tofu.syntax.doobie.log.handler._ import tofu.syntax.doobie.log.string._ +import tofu.syntax.monadic._ import tofu.{Delay, Tries, WithLocal, WithRun} // Simple context @@ -108,12 +109,11 @@ object PersonStorage extends LoggingCompanion[PersonStorage] { } object TofuDoobieExample extends IOApp.Simple { - val run: IO[Unit] = - runF[IO, ReaderT[IO, Ctx, *]] + val run: IO[Unit] = runF[IO, ReaderT[IO, Ctx, *]] - def runF[I[_]: Effect: ContextShift, F[_]: Sync: UnliftIO: WithRun[*[_], I, Ctx]]: I[Unit] = { + def runF[I[_]: Async: ContextShift, F[_]: Sync: PerformThrow: WithRun[*[_], I, Ctx]]: I[Unit] = { // Simplified wiring below - implicit val loggingF = Logs.contextual[F, Ctx] + implicit val loggingF = Logging.Make.contextual[F, Ctx] val transactor = Transactor.fromDriverManager[I]( driver = "org.h2.Driver", @@ -122,11 +122,11 @@ object TofuDoobieExample extends IOApp.Simple { implicit val txr = Txr.continuational(transactor.mapK(Lift.trans[I, F])) def initStorage[ - DB[_]: Tries: Txr[F, *[_]]: Delay: Monad: LiftConnectionIO: WithLocal[*[_], Ctx]: UnliftIO + DB[_]: Tries: Txr[F, *[_]]: Delay: Monad: LiftConnectionIO: WithLocal[*[_], Ctx]: PerformThrow ]: PersonStorage[F] = { - implicit val loggingDB = Logs.contextual[DB, Ctx] + implicit val loggingDB = Logging.Make.contextual[DB, Ctx] - implicit val elh = EmbeddableLogHandler.sync(LogHandlerF.loggable[DB](Logging.Debug)) + implicit val elh = EmbeddableLogHandler.async(LogHandlerF.loggable[DB](Logging.Debug)) val personSql = PersonSql.make[DB].attachErrLogs val deptSql = DeptSql.make[DB].attachErrLogs diff --git a/modules/core3/src/test/scala/tofu/concurrent/PerformSuite.scala b/modules/core3/src/test/scala/tofu/concurrent/PerformSuite.scala new file mode 100644 index 000000000..bb506fe29 --- /dev/null +++ b/modules/core3/src/test/scala/tofu/concurrent/PerformSuite.scala @@ -0,0 +1,67 @@ +package tofu.concurrent + +import scala.annotation.unchecked.uncheckedVariance +import scala.concurrent.duration._ + +import cats.Monad +import cats.data.ReaderT +import cats.effect.std.Dispatcher +import cats.effect.{IO, Resource} +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.must.Matchers +import tofu.generate.GenRandom +import tofu.syntax.monadic._ +import tofu.syntax.scoped._ +import tofu.syntax.time._ +import tofu.time.Sleep +import tofu.{Execute, PerformThrow, WithContext} + +case class MyContext(dispatcher: Dispatcher[IO], traceId: Long) +object MyContext { + type Reader[+A] = ReaderT[IO, MyContext, A @uncheckedVariance] + implicit val withDispatcher: WithContext[Reader, Dispatcher[IO]] = + WithContext[Reader, MyContext].extract(_.dispatcher) +} + +class PerformSuite extends AnyFunSuite with Matchers { + type Eff[+A] = MyContext.Reader[A] + + val p = PerformThrow[Eff] + + val genTraceId = for { + random <- GenRandom.instance[IO, IO]() + traceId <- random.nextLong + } yield traceId + + val init = for { + dispatcher <- Dispatcher[IO] + traceId <- Resource.eval(genTraceId) + } yield MyContext(dispatcher, traceId) + + def waitUpdate[F[_]: Sleep: Monad](atom: Atom[F, Int]): F[Int] = for { + v <- atom.get + _ <- sleep(100.millis) + _ <- atom.update(_ + 1) + } yield v + + def program[F[_]: PerformThrow: Monad: Execute, A](fa: F[A]) = for { + performer <- PerformThrow[F].performer + (fut1, _) = performer.toFuture(fa) + (fut2, _) = performer.toFuture(fa) + (fut3, _) = performer.toFuture(fa) + res1 <- deferFuture(_ => fut1) + res2 <- deferFuture(_ => fut2) + res3 <- deferFuture(_ => fut3) + } yield List(res1, res2, res3) + + val run = for { + atom <- MakeAtom[Eff, Eff].of(1) + ress <- program(waitUpdate(atom)) + } yield ress + + test("parallize over performed futures") { + import cats.effect.unsafe.implicits.global + val ts = init.use(run.run).unsafeRunSync() + all(ts) mustEqual 1 + } +} diff --git a/modules/doobie/core-ce3/src/main/scala-2.12/tofu/doobie/ConnectionCIOCrossVersion.scala b/modules/doobie/core-ce3/src/main/scala-2.12/tofu/doobie/ConnectionCIOCrossVersion.scala new file mode 100644 index 000000000..f873cc18e --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala-2.12/tofu/doobie/ConnectionCIOCrossVersion.scala @@ -0,0 +1,15 @@ +package tofu.doobie + +import cats.Functor + +import tofu.kernel.types.Perform +import tofu.PerformOf +import tofu.PerformVia + +trait ConnectionCIOCrossVersion { + // A hint to overcome 2.12 incompentence in implicit resolution + @inline final implicit def readerPerformer[F[_]: Functor, E](implicit + FP: Perform[F, E] + ): Perform[ConnectionCIO[F, *], E] = + PerformVia.performReader[F, PerformOf.ExitCont[E, *], ConnectionCIO.Cont[F], Unit] +} diff --git a/modules/doobie/core-ce3/src/main/scala-2.13/tofu/doobie/ConnectionCIOCrossVersion.scala b/modules/doobie/core-ce3/src/main/scala-2.13/tofu/doobie/ConnectionCIOCrossVersion.scala new file mode 100644 index 000000000..2f7ab8a9e --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala-2.13/tofu/doobie/ConnectionCIOCrossVersion.scala @@ -0,0 +1,4 @@ +package tofu.doobie + +// dummy trait, see also scala-2.12 version +trait ConnectionCIOCrossVersion diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/doobie/ConnectionCIO.scala b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/ConnectionCIO.scala new file mode 100644 index 000000000..eb7963ad8 --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/ConnectionCIO.scala @@ -0,0 +1,18 @@ +package tofu.doobie + +import cats.data.Kleisli +import cats.~> +import doobie.ConnectionIO +import tofu.kernel.types.AnyK + +object ConnectionCIO extends ConnectionCIOCrossVersion { + trait Cont[F[_]] extends (ConnectionIO ~> F) + object Cont { + private val liftConnectionIOToConnectionCIOAny: LiftConnectionIO[ConnectionCIO[AnyK, *]] = + new LiftConnectionIO[ConnectionCIO[AnyK, *]] { + def lift[A](ca: ConnectionIO[A]): ConnectionCIO[AnyK, A] = Kleisli(k => k(ca)) + } + @inline final implicit def liftConnectionIOToConnectionCIO[F[_]]: LiftConnectionIO[ConnectionCIO[F, *]] = + liftConnectionIOToConnectionCIOAny.asInstanceOf[LiftConnectionIO[ConnectionCIO[F, *]]] + } +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/doobie/LiftConnectionIO.scala b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/LiftConnectionIO.scala new file mode 100644 index 000000000..1ee22173d --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/LiftConnectionIO.scala @@ -0,0 +1,5 @@ +package tofu.doobie + +object LiftConnectionIO { + def apply[DB[_]](implicit ev: LiftConnectionIO[DB]): LiftConnectionIO[DB] = ev +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala new file mode 100644 index 000000000..52cad9ac0 --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala @@ -0,0 +1,80 @@ +package tofu.doobie.log + +import _root_.doobie.LogHandler +import cats.tagless.FunctorK +import cats.tagless.syntax.functorK._ +import cats.{Applicative, FlatMap, Functor, ~>} +import tofu.concurrent.Exit +import tofu.higherKind.Embed +import tofu.kernel.types.PerformThrow +import tofu.lift.Lift +import tofu.syntax.embed._ +import tofu.syntax.monadic._ + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Promise} + +/** A holder for a [[doobie.util.log.LogHandler]] instance wrapped in an effect `F[_]`. Only useful when `F[_]` is + * contextual. Allows one to create a context-aware `LogHandler` and embed it into a database algebra that requires it + * for logging SQL execution events. + */ +final class EmbeddableLogHandler[F[_]](val self: F[LogHandler]) extends AnyVal { + def embedMapK[A[_[_]]: Embed: FunctorK, G[_]](impl: LogHandler => A[G])(fk: G ~> F)(implicit F: FlatMap[F]): A[F] = + self.map(impl(_).mapK(fk)).embed + + def embedLift[A[_[_]]: Embed: FunctorK, G[_]]( + impl: LogHandler => A[G] + )(implicit F: FlatMap[F], L: Lift[G, F]): A[F] = embedMapK(impl)(L.liftF) + + def embed[A[_[_]]: Embed](impl: LogHandler => A[F])(implicit F: FlatMap[F]): A[F] = + self.map(impl).embed +} + +/** `EmbeddableLogHandler[F]` has two main constructors from `LogHandlerF[F]`: `async` and `sync`. Both require + * `UnliftIO[F]` and need to perform IO unsafely under the hood due to the impurity of `doobie.util.log.LogHandler`. + */ +object EmbeddableLogHandler { + def apply[F[_]](implicit elh: EmbeddableLogHandler[F]): EmbeddableLogHandler[F] = elh + + /** Preferably use `async` as its underlying unsafe run is potentially less harmful. + */ + def async[F[_]: Functor](logHandlerF: LogHandlerF[F])(implicit P: PerformThrow[F]): EmbeddableLogHandler[F] = + new EmbeddableLogHandler(P.performer.map { perf => + LogHandler { event => + val _ = perf.perform((exit: Exit[Throwable, Unit]) => + exit match { + case Exit.Canceled => throw Exit.CanceledException + case Exit.Error(e) => throw e + case Exit.Completed(_) => () + } + )(logHandlerF.run(event)) + } + }) + + /** Only use `sync` if you are sure your logging logic doesn't contain async computations and cannot block the + * execution thread. + */ + def sync[F[_]: Functor](logHandlerF: LogHandlerF[F])(implicit P: PerformThrow[F]): EmbeddableLogHandler[F] = + new EmbeddableLogHandler(P.performer.map { performer => + LogHandler { event => + val promise = Promise[Unit]() + val _ = performer.perform((exit: Exit[Throwable, Unit]) => + exit match { + case Exit.Canceled => promise.failure(Exit.CanceledException) + case Exit.Error(e) => promise.failure(e) + case Exit.Completed(_) => promise.success(()) + } + )(logHandlerF.run(event)) + Await.result(promise.future, Duration.Inf) + } + }) + + /** Use `nop` in tests. + */ + def nop[F[_]: Applicative]: EmbeddableLogHandler[F] = new EmbeddableLogHandler(LogHandler.nop.pure[F]) + + implicit val embeddableLogHandlerFunctorK: FunctorK[EmbeddableLogHandler] = new FunctorK[EmbeddableLogHandler] { + def mapK[F[_], G[_]](af: EmbeddableLogHandler[F])(fk: F ~> G): EmbeddableLogHandler[G] = + new EmbeddableLogHandler(fk(af.self)) + } +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/doobie/log/LogHandlerF.scala b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/log/LogHandlerF.scala new file mode 100644 index 000000000..24bd45240 --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/log/LogHandlerF.scala @@ -0,0 +1,16 @@ +package tofu.doobie.log + +import derevo.derive +import doobie.util.log.LogEvent +import tofu.higherKind.derived.representableK + +/** A pure analog of [[doobie.util.log.LogHandler]] that logs SQL execution events in an effect `F[_]`. + */ +@derive(representableK) +trait LogHandlerF[F[_]] { + def run(event: LogEvent): F[Unit] +} + +object LogHandlerF { + def apply[F[_]](run: LogEvent => F[Unit]): LogHandlerF[F] = run(_) +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/doobie/package.scala b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/package.scala new file mode 100644 index 000000000..b607bfc22 --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/package.scala @@ -0,0 +1,14 @@ +package tofu + +import _root_.doobie.ConnectionIO +import cats.data.Kleisli +import tofu.lift.Lift + +package object doobie { + + /** A continuational form of `ConnectionIO` equivalent to `[x] =>> ConnectionIO ~> F => F[x]`. */ + type ConnectionCIO[F[_], A] = Kleisli[F, ConnectionCIO.Cont[F], A] + + /** A typeclass for lifting `ConnectionIO` into extended database effects such as `ConnectionRIO`. */ + type LiftConnectionIO[DB[_]] = Lift[ConnectionIO, DB] +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/doobie/transactor/Txr.scala b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/transactor/Txr.scala new file mode 100644 index 000000000..7b31524b0 --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/doobie/transactor/Txr.scala @@ -0,0 +1,85 @@ +package tofu.doobie +package transactor + +import cats.data.Kleisli +import cats.effect.{MonadCancelThrow, Resource} +import cats.~> +import doobie.{ConnectionIO, Transactor} +import fs2.Stream +import tofu.syntax.funk._ +import tofu.syntax.monadic._ + +/** A simple facade for [[doobie.Transactor]] that holds an inner database effect type `DB[_]` and provides natural + * transformations from this effect to the target effect `F[_]`. + * + * The motivation for using this facade instead of `Transactor` is to: + * + * - initialize all its natural transformations early and remove the need for additional constraints on `F[_]` later + * (e.g. `Bracket`); + * + * - be able to use another `DB[_]` effect besides `ConnectionIO` and build a layer of transactional logic with it. + */ +trait Txr[F[_], DB0[_]] { + type DB[x] >: DB0[x] <: DB0[x] + + /** Interprets `DB` into `F`, applying the transactional strategy. */ + def trans: DB ~> F + + /** Interprets `DB` into `F`, no strategy applied. */ + def rawTrans: DB ~> F + + /** Translates the stream, applying the transactional strategy. */ + def transP: Stream[DB, *] ~> Stream[F, *] + + /** Translates the stream, no strategy applied. */ + def rawTransP: Stream[DB, *] ~> Stream[F, *] +} + +object Txr { + type Plain[F[_]] = Txr[F, ConnectionIO] + type Continuational[F[_]] = Txr[F, ConnectionCIO[F, *]] + + def apply[F[_], DB[_]](implicit ev: Txr[F, DB]): Txr[F, DB] = ev + def Plain[F[_]](implicit ev: Plain[F]): Plain[F] = ev + def Continuational[F[_]](implicit ev: Continuational[F]): Continuational[F] = ev + + /** Creates a plain facade that preserves the effect of `Transactor` with `ConnectionIO` as the database effect. + */ + def plain[F[_]: MonadCancelThrow](t: Transactor[F]): Txr.Plain[F] = + new Txr.Plain[F] { + val trans: ConnectionIO ~> F = t.trans + val rawTrans: ConnectionIO ~> F = t.rawTrans + + val transP: Stream[ConnectionIO, *] ~> Stream[F, *] = t.transP + val rawTransP: Stream[ConnectionIO, *] ~> Stream[F, *] = t.rawTransP + } + + /** Creates a facade that uses `ConnectionCIO` as the database effect. + */ + def continuational[F[_]: MonadCancelThrow](t: Transactor[F]): Txr.Continuational[F] = + new Txr.Continuational[F] { + val trans: ConnectionCIO[F, *] ~> F = makeTrans(true) + val rawTrans: ConnectionCIO[F, *] ~> F = makeTrans(false) + + val transP: Stream[ConnectionCIO[F, *], *] ~> Stream[F, *] = makeTransP(true) + val rawTransP: Stream[ConnectionCIO[F, *], *] ~> Stream[F, *] = makeTransP(false) + + private def interpret(withStrategy: Boolean): Resource[F, ConnectionCIO.Cont[F]] = for { + c <- t.connect(t.kernel) + f = new ConnectionCIO.Cont[F] { + def apply[A](ca: ConnectionIO[A]): F[A] = ca.foldMap(t.interpret).run(c) + } + _ <- withStrategy.when_(t.strategy.resource.mapK(f)) + } yield f + + private def makeTrans(withStrategy: Boolean): ConnectionCIO[F, *] ~> F = + funK(ccio => interpret(withStrategy).use(ccio.run)) + + private def makeTransP(withStrategy: Boolean): Stream[ConnectionCIO[F, *], *] ~> Stream[F, *] = + funK(s => + Stream + .resource(interpret(withStrategy)) + .flatMap(fk => s.translate(Kleisli.applyK[F, ConnectionCIO.Cont[F]](fk))) + ) + } +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/syntax/doobie/package.scala b/modules/doobie/core-ce3/src/main/scala/tofu/syntax/doobie/package.scala new file mode 100644 index 000000000..7f3bf0c2b --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/syntax/doobie/package.scala @@ -0,0 +1,5 @@ +package tofu.syntax + +package object doobie { + object txr extends TxrSyntax +} diff --git a/modules/doobie/core-ce3/src/main/scala/tofu/syntax/doobie/txr.scala b/modules/doobie/core-ce3/src/main/scala/tofu/syntax/doobie/txr.scala new file mode 100644 index 000000000..bf4bbb35c --- /dev/null +++ b/modules/doobie/core-ce3/src/main/scala/tofu/syntax/doobie/txr.scala @@ -0,0 +1,19 @@ +package tofu.syntax.doobie + +import fs2.Stream +import tofu.doobie.transactor.Txr + +private[doobie] trait TxrSyntax { + implicit def toTxrOps[DB[_], A](dba: DB[A]): TxrOps[DB, A] = new TxrOps(dba) + implicit def toTxrStreamOps[DB[_], A](dba: Stream[DB, A]): TxrStreamOps[DB, A] = new TxrStreamOps(dba) +} + +private[doobie] final class TxrOps[DB[_], A](private val dba: DB[A]) extends AnyVal { + def trans[F[_]](implicit txr: Txr[F, DB]): F[A] = txr.trans(dba) + def rawTrans[F[_]](implicit txr: Txr[F, DB]): F[A] = txr.rawTrans(dba) +} + +private[doobie] final class TxrStreamOps[DB[_], A](private val sdba: Stream[DB, A]) { + def trans[F[_]](implicit txr: Txr[F, DB]): Stream[F, A] = txr.transP(sdba) + def rawTrans[F[_]](implicit txr: Txr[F, DB]): Stream[F, A] = txr.rawTransP(sdba) +} diff --git a/modules/doobie/core-ce3/src/test/scala/tofu/doobie/DoobieInstancesSuite.scala b/modules/doobie/core-ce3/src/test/scala/tofu/doobie/DoobieInstancesSuite.scala new file mode 100644 index 000000000..e84cd8a03 --- /dev/null +++ b/modules/doobie/core-ce3/src/test/scala/tofu/doobie/DoobieInstancesSuite.scala @@ -0,0 +1,17 @@ +package tofu.doobie + +import doobie.ConnectionIO +import tofu.lift.Lift + +object DoobieInstancesSuite { + + def summonLiftConnectionIO[R, F[_]](): Any = { + LiftConnectionIO[ConnectionIO] + LiftConnectionIO[ConnectionCIO[F, *]] + } + + def summonLiftToConnectionCIO[F[_]](): Any = { + Lift[F, ConnectionCIO[F, *]] + } + +} diff --git a/modules/doobie/core-ce3/src/test/scala/tofu/doobie/TxrSuite.scala b/modules/doobie/core-ce3/src/test/scala/tofu/doobie/TxrSuite.scala new file mode 100644 index 000000000..b3b7df5e0 --- /dev/null +++ b/modules/doobie/core-ce3/src/test/scala/tofu/doobie/TxrSuite.scala @@ -0,0 +1,18 @@ +package tofu.doobie + +import tofu.doobie.transactor.Txr +import tofu.syntax.doobie.txr._ + +object TxrSuite { + + def transSyntaxTest[F[_], DB[_]](dba: DB[Int])(implicit txr: Txr[F, DB]): Any = { + dba.trans + dba.rawTrans + } + + def transStreamSyntaxTest[F[_], DB[_]](sdba: fs2.Stream[DB, Int])(implicit txr: Txr[F, DB]): Any = { + sdba.trans + sdba.rawTrans + } + +} diff --git a/modules/doobie/core/src/main/scala-2.12/tofu/doobie/ConnectionCIOCrossVersion.scala b/modules/doobie/core/src/main/scala-2.12/tofu/doobie/ConnectionCIOCrossVersion.scala new file mode 100644 index 000000000..f873cc18e --- /dev/null +++ b/modules/doobie/core/src/main/scala-2.12/tofu/doobie/ConnectionCIOCrossVersion.scala @@ -0,0 +1,15 @@ +package tofu.doobie + +import cats.Functor + +import tofu.kernel.types.Perform +import tofu.PerformOf +import tofu.PerformVia + +trait ConnectionCIOCrossVersion { + // A hint to overcome 2.12 incompentence in implicit resolution + @inline final implicit def readerPerformer[F[_]: Functor, E](implicit + FP: Perform[F, E] + ): Perform[ConnectionCIO[F, *], E] = + PerformVia.performReader[F, PerformOf.ExitCont[E, *], ConnectionCIO.Cont[F], Unit] +} diff --git a/modules/doobie/core/src/main/scala-2.13/tofu/doobie/ConnectionCIOCrossVersion.scala b/modules/doobie/core/src/main/scala-2.13/tofu/doobie/ConnectionCIOCrossVersion.scala new file mode 100644 index 000000000..2f7ab8a9e --- /dev/null +++ b/modules/doobie/core/src/main/scala-2.13/tofu/doobie/ConnectionCIOCrossVersion.scala @@ -0,0 +1,4 @@ +package tofu.doobie + +// dummy trait, see also scala-2.12 version +trait ConnectionCIOCrossVersion diff --git a/modules/doobie/core/src/main/scala/tofu/doobie/ConnectionCIO.scala b/modules/doobie/core/src/main/scala/tofu/doobie/ConnectionCIO.scala index 87b147fd0..eb7963ad8 100644 --- a/modules/doobie/core/src/main/scala/tofu/doobie/ConnectionCIO.scala +++ b/modules/doobie/core/src/main/scala/tofu/doobie/ConnectionCIO.scala @@ -5,7 +5,7 @@ import cats.~> import doobie.ConnectionIO import tofu.kernel.types.AnyK -object ConnectionCIO { +object ConnectionCIO extends ConnectionCIOCrossVersion { trait Cont[F[_]] extends (ConnectionIO ~> F) object Cont { private val liftConnectionIOToConnectionCIOAny: LiftConnectionIO[ConnectionCIO[AnyK, *]] = diff --git a/modules/doobie/core/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala b/modules/doobie/core/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala index 340662bc4..52cad9ac0 100644 --- a/modules/doobie/core/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala +++ b/modules/doobie/core/src/main/scala/tofu/doobie/log/EmbeddableLogHandler.scala @@ -1,15 +1,19 @@ package tofu.doobie.log import _root_.doobie.LogHandler -import cats.effect.IO import cats.tagless.FunctorK import cats.tagless.syntax.functorK._ import cats.{Applicative, FlatMap, Functor, ~>} +import tofu.concurrent.Exit import tofu.higherKind.Embed -import tofu.lift.{Lift, UnliftIO} +import tofu.kernel.types.PerformThrow +import tofu.lift.Lift import tofu.syntax.embed._ import tofu.syntax.monadic._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Promise} + /** A holder for a [[doobie.util.log.LogHandler]] instance wrapped in an effect `F[_]`. Only useful when `F[_]` is * contextual. Allows one to create a context-aware `LogHandler` and embed it into a database algebra that requires it * for logging SQL execution events. @@ -34,27 +38,41 @@ object EmbeddableLogHandler { /** Preferably use `async` as its underlying unsafe run is potentially less harmful. */ - def async[F[_]: Functor: UnliftIO](logHandlerF: LogHandlerF[F]): EmbeddableLogHandler[F] = - fromLogHandlerF(logHandlerF)(_.unsafeRunAsyncAndForget()) + def async[F[_]: Functor](logHandlerF: LogHandlerF[F])(implicit P: PerformThrow[F]): EmbeddableLogHandler[F] = + new EmbeddableLogHandler(P.performer.map { perf => + LogHandler { event => + val _ = perf.perform((exit: Exit[Throwable, Unit]) => + exit match { + case Exit.Canceled => throw Exit.CanceledException + case Exit.Error(e) => throw e + case Exit.Completed(_) => () + } + )(logHandlerF.run(event)) + } + }) /** Only use `sync` if you are sure your logging logic doesn't contain async computations and cannot block the * execution thread. */ - def sync[F[_]: Functor: UnliftIO](logHandlerF: LogHandlerF[F]): EmbeddableLogHandler[F] = - fromLogHandlerF(logHandlerF) { io => - io.unsafeRunSync() - () - } + def sync[F[_]: Functor](logHandlerF: LogHandlerF[F])(implicit P: PerformThrow[F]): EmbeddableLogHandler[F] = + new EmbeddableLogHandler(P.performer.map { performer => + LogHandler { event => + val promise = Promise[Unit]() + val _ = performer.perform((exit: Exit[Throwable, Unit]) => + exit match { + case Exit.Canceled => promise.failure(Exit.CanceledException) + case Exit.Error(e) => promise.failure(e) + case Exit.Completed(_) => promise.success(()) + } + )(logHandlerF.run(event)) + Await.result(promise.future, Duration.Inf) + } + }) /** Use `nop` in tests. */ def nop[F[_]: Applicative]: EmbeddableLogHandler[F] = new EmbeddableLogHandler(LogHandler.nop.pure[F]) - private def fromLogHandlerF[F[_]: Functor]( - logHandlerF: LogHandlerF[F] - )(unsafeRunIO_ : IO[_] => Unit)(implicit U: UnliftIO[F]): EmbeddableLogHandler[F] = - new EmbeddableLogHandler(U.unlift.map(toIO => LogHandler(event => unsafeRunIO_(toIO(logHandlerF.run(event)))))) - implicit val embeddableLogHandlerFunctorK: FunctorK[EmbeddableLogHandler] = new FunctorK[EmbeddableLogHandler] { def mapK[F[_], G[_]](af: EmbeddableLogHandler[F])(fk: F ~> G): EmbeddableLogHandler[G] = new EmbeddableLogHandler(fk(af.self)) diff --git a/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/LoggableSql.scala b/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/LoggableSql.scala new file mode 100644 index 000000000..5e1b33c11 --- /dev/null +++ b/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/LoggableSql.scala @@ -0,0 +1,6 @@ +package tofu.doobie.log + +trait LoggableSql { + implicit def toLoggableSqlInterpolator(ctx: StringContext): LoggableSqlInterpolator = + new LoggableSqlInterpolator(ctx) +} diff --git a/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/LoggableSqlInterpolator.scala b/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/LoggableSqlInterpolator.scala new file mode 100644 index 000000000..4cd7bca3e --- /dev/null +++ b/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/LoggableSqlInterpolator.scala @@ -0,0 +1,44 @@ +package tofu.doobie.log +import doobie.syntax.SqlInterpolator + +import scala.{specialized => sp} +import doobie.syntax.SqlInterpolator.SingleFragment +import doobie.util.Put +import doobie.util.fragment.Fragment +import doobie.util.pos.Pos +import tofu.doobie.log.LoggableSqlInterpolator.LoggableSingleFragment +import tofu.logging.{LogRenderer, Loggable, LoggedValue} + +final class LoggableSqlInterpolator(private val ctx: StringContext) extends AnyVal { + def lsql(parts: LoggableSingleFragment*)(implicit pos: Pos): Fragment = + new SqlInterpolator(ctx).sql(parts.map(_.toSingleFragment): _*) + def lfr(parts: LoggableSingleFragment*)(implicit pos: Pos): Fragment = + new SqlInterpolator(ctx).fr(parts.map(_.toSingleFragment): _*) + def lfr0(parts: LoggableSingleFragment*)(implicit pos: Pos): Fragment = + new SqlInterpolator(ctx).fr0(parts.map(_.toSingleFragment): _*) +} + +object LoggableSqlInterpolator { + final case class LoggableArg[A](value: A)(implicit val log: Loggable[A]) extends LoggedValue { + def logFields[I, V, @sp(Unit) R, @sp M](input: I)(implicit r: LogRenderer[I, V, R, M]): R = log.fields(value, input) + override def putValue[I, V, R, S](v: V)(implicit r: LogRenderer[I, V, R, S]): S = log.putValue(value, v) + override def toString: String = log.logShow(value) + } + object LoggableArg { + implicit def put[A: Put]: Put[LoggableArg[A]] = Put[A].contramap(_.value) + } + + final case class LoggableSingleFragment(fr: Fragment) extends AnyVal { + def toSingleFragment: SingleFragment[Nothing] = SingleFragment(fr) + } + object LoggableSingleFragment { + def of[A](sf: SingleFragment[A]): LoggableSingleFragment = new LoggableSingleFragment(sf.fr) + + implicit def fromPut[A: Loggable: Put](a: A): LoggableSingleFragment = + LoggableSingleFragment.of(LoggableArg(a)) + implicit def fromPutOption[A: Loggable: Put](oa: Option[A]): LoggableSingleFragment = + LoggableSingleFragment.of(oa.map(LoggableArg(_))) + implicit def fromFragment(fr: Fragment): LoggableSingleFragment = + LoggableSingleFragment(fr) + } +} diff --git a/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/instances.scala b/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/instances.scala new file mode 100644 index 000000000..73b5357cf --- /dev/null +++ b/modules/doobie/logging-ce3/src/main/scala/tofu/doobie/log/instances.scala @@ -0,0 +1,71 @@ +package tofu.doobie.log + +import cats.syntax.monoid._ +import doobie.util.log.{ExecFailure, LogEvent, ProcessingFailure, Success} +import tofu.logging.{DictLoggable, LogRenderer, Loggable, LoggedValue} +import tofu.syntax.logRenderer._ + +object instances { + implicit val logEventLoggable: Loggable[LogEvent] = new DictLoggable[LogEvent] { + private def oneline(s: String): String = + s.linesIterator.map(_.trim).filterNot(_.isEmpty).mkString(" ") + private def multiline(s: String, indent: String = " "): String = + s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n" + indent) + + private def loggedArgs(args: List[Any]): LoggedValue = + args.map { + case x: LoggedValue => x + case _ => "...": LoggedValue // erase if passed to simple interpolator + } + + def logShow(ev: LogEvent): String = ev match { + case Success(s, a, e1, e2) => + s"""Successful Statement Execution: + | + | ${multiline(s)} + | + | arguments = ${loggedArgs(a)} + | elapsed = ${e1.toMillis} ms exec + ${e2.toMillis} ms processing (${(e1 + e2).toMillis} ms total) + """.stripMargin + case ProcessingFailure(s, a, e1, e2, _) => + s"""Failed Resultset Processing: + | + | ${multiline(s)} + | + | arguments = ${loggedArgs(a)} + | elapsed = ${e1.toMillis} ms exec + ${e2.toMillis} ms processing (failed) (${(e1 + e2).toMillis} ms total) + """.stripMargin + case ExecFailure(s, a, e1, _) => + s"""Failed Statement Execution: + | + | ${multiline(s)} + | + | arguments = ${loggedArgs(a)} + | elapsed = ${e1.toMillis} ms exec (failed) + """.stripMargin + } + + def fields[I, V, R, S](ev: LogEvent, i: I)(implicit r: LogRenderer[I, V, R, S]): R = + ev match { + case Success(s, a, e1, e2) => + i.field("sql-event-type", "Success") |+| + i.field("sql-statement", oneline(s)) |+| + i.field("sql-args", loggedArgs(a)) |+| + i.field("sql-exec-ms", e1.toMillis) |+| + i.field("sql-processing-ms", e2.toMillis) |+| + i.field("sql-total-ms", (e1 + e2).toMillis) + case ProcessingFailure(s, a, e1, e2, _) => + i.field("sql-event-type", "ProcessingFailure") |+| + i.field("sql-statement", oneline(s)) |+| + i.field("sql-args", loggedArgs(a)) |+| + i.field("sql-exec-ms", e1.toMillis) |+| + i.field("sql-processing-ms", e2.toMillis) |+| + i.field("sql-total-ms", (e1 + e2).toMillis) + case ExecFailure(s, a, e1, _) => + i.field("sql-event-type", "ExecFailure") |+| + i.field("sql-statement", oneline(s)) |+| + i.field("sql-args", loggedArgs(a)) |+| + i.field("sql-exec-ms", e1.toMillis) + } + } +} diff --git a/modules/doobie/logging-ce3/src/main/scala/tofu/syntax/doobie/log/handler.scala b/modules/doobie/logging-ce3/src/main/scala/tofu/syntax/doobie/log/handler.scala new file mode 100644 index 000000000..438f081b4 --- /dev/null +++ b/modules/doobie/logging-ce3/src/main/scala/tofu/syntax/doobie/log/handler.scala @@ -0,0 +1,20 @@ +package tofu.syntax.doobie.log + +import doobie.util.log._ +import tofu.doobie.log.LogHandlerF +import tofu.doobie.log.instances._ +import tofu.kernel.types.AnyK +import tofu.logging.Logging + +object handler { + implicit class MkLogHandlerF(private val lhf: LogHandlerF.type) extends AnyVal { + def loggable[F[_]: Logging.Make](level: Logging.Level): LogHandlerF[F] = { evt => + val logging: Logging[F] = Logging.Make[F].forService[LogHandlerF[AnyK]] + evt match { + case _: Success => logging.write(level, "{}", evt) + case e: ProcessingFailure => logging.errorCause("{}", e.failure, evt) + case e: ExecFailure => logging.errorCause("{}", e.failure, evt) + } + } + } +} diff --git a/modules/doobie/logging-ce3/src/main/scala/tofu/syntax/doobie/log/string.scala b/modules/doobie/logging-ce3/src/main/scala/tofu/syntax/doobie/log/string.scala new file mode 100644 index 000000000..198db0f76 --- /dev/null +++ b/modules/doobie/logging-ce3/src/main/scala/tofu/syntax/doobie/log/string.scala @@ -0,0 +1,5 @@ +package tofu.syntax.doobie.log + +import tofu.doobie.log.LoggableSql + +object string extends LoggableSql diff --git a/modules/doobie/logging/src/main/scala/tofu/syntax/doobie/log/handler.scala b/modules/doobie/logging/src/main/scala/tofu/syntax/doobie/log/handler.scala index 6f53552ca..438f081b4 100644 --- a/modules/doobie/logging/src/main/scala/tofu/syntax/doobie/log/handler.scala +++ b/modules/doobie/logging/src/main/scala/tofu/syntax/doobie/log/handler.scala @@ -1,16 +1,15 @@ package tofu.syntax.doobie.log -import cats.Id import doobie.util.log._ import tofu.doobie.log.LogHandlerF import tofu.doobie.log.instances._ import tofu.kernel.types.AnyK -import tofu.logging.{Logging, Logs} +import tofu.logging.Logging object handler { implicit class MkLogHandlerF(private val lhf: LogHandlerF.type) extends AnyVal { - def loggable[F[_]: Logs.Universal](level: Logging.Level): LogHandlerF[F] = { evt => - val logging: Logging[F] = Logs[Id, F].forService[LogHandlerF[AnyK]] + def loggable[F[_]: Logging.Make](level: Logging.Level): LogHandlerF[F] = { evt => + val logging: Logging[F] = Logging.Make[F].forService[LogHandlerF[AnyK]] evt match { case _: Success => logging.write(level, "{}", evt) case e: ProcessingFailure => logging.errorCause("{}", e.failure, evt) diff --git a/modules/kernel/src/main/scala/tofu/Perform.scala b/modules/kernel/src/main/scala/tofu/Perform.scala new file mode 100644 index 000000000..a223b40bb --- /dev/null +++ b/modules/kernel/src/main/scala/tofu/Perform.scala @@ -0,0 +1,71 @@ +package tofu +import cats.Functor +import cats.data.ReaderT +import cats.tagless.ContravariantK +import tofu.internal.carriers.{PerformCarrier2, PerformCarrier2Context, PerformCarrier3} +import tofu.internal.instances._ +import tofu.internal.{Effect2Comp, EffectComp} +import tofu.kernel.types.{PerformCont, PerformExitCont, PerformOf, PerformThrow} +import scala.concurrent.Promise +import tofu.concurrent.Exit +import scala.concurrent.Future +import scala.annotation.implicitNotFound + +trait Performer[F[_], -Cont[_], Cancel] { + def perform[A](cont: Cont[A])(fa: F[A]): F[Cancel] + + def toFuture[A](fa: F[A])(implicit ev: (Exit[Throwable, A] => Unit) <:< Cont[A]): (Future[A], F[Cancel]) = { + val p = Promise[A]() + val cont: Exit[Throwable, A] => Unit = ex => p.complete(ex.toTry) + + (p.future, perform[A](cont)(fa)) + } +} + +object Performer { + type OfExit[F[_], E] = Performer[F, PerformOf.ExitCont[E, *], Unit] + + implicit def contravariantK[F[_], Cancel]: ContravariantK[Performer[F, *[_], Cancel]] = + new PerformerContravariantK[F, Cancel] +} + +@implicitNotFound("""can not find Perform instance for functor ${F} +with continuation ${Cont} and cancel result ${Cancel} +if you are using cats-effect 3.0 make sure you have an implicit instance of WithContext[F, Dispatcher[F]] +or in case of F[A] = ReaderT[G, C, A], you have an instance of WithContext[F, Dispatcher[G]]""") +trait PerformVia[F[_], Cont[_], Cancel] extends WithContext[F, Performer[F, Cont, Cancel]] { + def performer: F[Performer[F, Cont, Cancel]] + final def context: F[Performer[F, Cont, Cancel]] = performer +} + +object PerformVia extends PerformInterop { + def apply[F[_], Cont[_], Cancel](implicit instance: PerformVia[F, Cont, Cancel]): PerformVia[F, Cont, Cancel] = + instance + + implicit def contravariantK[F[_], Cancel]: ContravariantK[PerformVia[F, *[_], Cancel]] = + new PerformViaContravariantK[F, Cancel] + + implicit def performReader[F[_]: Functor, Cont[_], R, Cancel](implicit + RP: PerformVia[F, Cont, Cancel] + ): PerformVia[ReaderT[F, R, *], Cont, Cancel] = new PerformViaReader(RP) +} + +class PerformInterop extends PerformInterop1 { + final implicit def interopCE3[F[_]](implicit carrier: PerformCarrier3[F]): PerformThrow[F] = carrier +} +class PerformInterop1 extends PerformInterop2 { + final implicit def interopCE2[F[_]](implicit carrier: PerformCarrier2[F]): PerformThrow[F] = carrier +} + +class PerformInterop2 { + final implicit def interopCE2Contextual[F[_]](implicit + carrier: PerformCarrier2Context[F] + ): PerformThrow[F] = carrier +} + +object PerformOf extends Effect2Comp[PerformOf] { + type Cont[Ex[_], A] = PerformCont[Ex, A] + type ExitCont[E, A] = PerformExitCont[E, A] +} + +object PerformThrow extends EffectComp[PerformThrow] diff --git a/modules/concurrent/src/main/scala/tofu/concurrent/Exit.scala b/modules/kernel/src/main/scala/tofu/concurrent/Exit.scala similarity index 67% rename from modules/concurrent/src/main/scala/tofu/concurrent/Exit.scala rename to modules/kernel/src/main/scala/tofu/concurrent/Exit.scala index 93641311a..2773a49af 100644 --- a/modules/concurrent/src/main/scala/tofu/concurrent/Exit.scala +++ b/modules/kernel/src/main/scala/tofu/concurrent/Exit.scala @@ -1,26 +1,43 @@ package tofu.concurrent import cats.{Applicative, Eval, Traverse} -import cats.effect.ExitCase import tofu.control.ApplicativeZip import tofu.syntax.monadic._ +import scala.util.Try +import tofu.concurrent.Exit.Canceled +import tofu.concurrent.Exit.Completed +import tofu.concurrent.Exit.Error sealed trait Exit[+E, +A] { - def exitCase: ExitCase[E] + def toTry(implicit ev: E <:< Throwable): Try[A] = this match { + case Canceled => util.Failure(new InterruptedException) + case Error(e) => util.Failure(e) + case Completed(a) => util.Success(a) + } + + def toEither: Either[Option[E], A] = this match { + case Canceled => Left(None) + case Error(e) => Left(Some(e)) + case Completed(a) => Right(a) + } } object Exit { sealed trait Incomplete[+E] extends Exit[E, Nothing] - case object Canceled extends Incomplete[Nothing] { - def exitCase = ExitCase.Canceled - } - final case class Error[+E](e: E) extends Incomplete[E] { - def exitCase = ExitCase.Error(e) + case object Canceled extends Incomplete[Nothing] + final case class Error[+E](e: E) extends Incomplete[E] + final case class Completed[+A](a: A) extends Exit[Nothing, A] + + def fromEither[E, A](e: Either[E, A]): Exit[E, A] = e match { + case Left(err) => Error(err) + case Right(res) => Completed(res) } - final case class Completed[+A](a: A) extends Exit[Nothing, A] { - override def exitCase = ExitCase.Completed + + def fromTry[A](t: util.Try[A]): Exit[Throwable, A] = t match { + case util.Failure(ex) => Error(ex) + case util.Success(res) => Completed(res) } private[this] object exitInstanceAny extends Traverse[Exit[Any, *]] with ApplicativeZip[Exit[Any, *]] { @@ -63,4 +80,6 @@ object Exit { implicit def exitInstance[E]: Traverse[Exit[E, *]] with Applicative[Exit[E, *]] = exitInstanceAny.asInstanceOf[Traverse[Exit[E, *]] with Applicative[Exit[E, *]]] + + object CanceledException extends InterruptedException } diff --git a/modules/kernel/src/main/scala/tofu/internal/Interop.scala b/modules/kernel/src/main/scala/tofu/internal/Interop.scala index af5f84d00..b6aca704f 100644 --- a/modules/kernel/src/main/scala/tofu/internal/Interop.scala +++ b/modules/kernel/src/main/scala/tofu/internal/Interop.scala @@ -26,6 +26,7 @@ class Interop(val c: blackbox.Context) { c.Expr[R](delegateParamTree[N](Seq(ps))(ts)) def delegate[R: WTT, F[_]: WTTU, N: WTT]: c.Expr[R] = delegateImpl[R, N](tc[F]) + def delegate1[R: WTT, F[_]: WTTU, E: WTT, N: WTT]: c.Expr[R] = delegateImpl[R, N](tc[F], t[E]) def delegate2[R: WTT, I[_]: WTTU, F[_]: WTTU, N: WTT]: c.Expr[R] = delegateImpl[R, N](tc[I], tc[F]) def delegate1p1[R: WTT, T: WTT, F[_]: WTTU, N: WTT](p1: Tree): c.Expr[R] = delegateParamImpl[R, N](p1)(t[T], tc[F]) } diff --git a/modules/kernel/src/main/scala/tofu/internal/carriers/FibersCarrier2.scala b/modules/kernel/src/main/scala/tofu/internal/carriers/fibers.scala similarity index 100% rename from modules/kernel/src/main/scala/tofu/internal/carriers/FibersCarrier2.scala rename to modules/kernel/src/main/scala/tofu/internal/carriers/fibers.scala diff --git a/modules/kernel/src/main/scala/tofu/internal/carriers/FinallyCarrier2.scala b/modules/kernel/src/main/scala/tofu/internal/carriers/finally.scala similarity index 100% rename from modules/kernel/src/main/scala/tofu/internal/carriers/FinallyCarrier2.scala rename to modules/kernel/src/main/scala/tofu/internal/carriers/finally.scala diff --git a/modules/kernel/src/main/scala/tofu/internal/carriers/perform.scala b/modules/kernel/src/main/scala/tofu/internal/carriers/perform.scala new file mode 100644 index 000000000..0616e6b35 --- /dev/null +++ b/modules/kernel/src/main/scala/tofu/internal/carriers/perform.scala @@ -0,0 +1,30 @@ +package tofu.internal.carriers + +import tofu.kernel.types.Perform +import tofu.internal.Interop + +trait PerformCarrier2[F[_]] extends Perform[F, Throwable] + +object PerformCarrier2 { + final implicit def interop2Effect[F[_]]: PerformCarrier2[F] = + macro Interop.delegate[PerformCarrier2[F], F, { val `tofu.interop.CE2Kernel.performEffect`: Unit }] +} + +trait PerformCarrier2Context[F[_]] extends Perform[F, Throwable] + +object PerformCarrier2Context { + final implicit def interop2ContextEffect[F[_]]: PerformCarrier2Context[F] = + macro Interop.delegate[ + PerformCarrier2Context[F], + F, { + val `tofu.interop.CE2Kernel.performContextConcurrentEffect`: Unit + } + ] +} + +trait PerformCarrier3[F[_]] extends Perform[F, Throwable] + +object PerformCarrier3 { + final implicit def interop3IO[F[_]]: PerformCarrier3[F] = + macro Interop.delegate[PerformCarrier3[F], F, { val `tofu.interop.CE3Kernel.performDispatchContext`: Unit }] +} diff --git a/modules/kernel/src/main/scala/tofu/internal/carriers/UnliftCarrier2.scala b/modules/kernel/src/main/scala/tofu/internal/carriers/unlift.scala similarity index 100% rename from modules/kernel/src/main/scala/tofu/internal/carriers/UnliftCarrier2.scala rename to modules/kernel/src/main/scala/tofu/internal/carriers/unlift.scala diff --git a/modules/kernel/src/main/scala/tofu/internal/instances/perform.scala b/modules/kernel/src/main/scala/tofu/internal/instances/perform.scala new file mode 100644 index 000000000..067f2b0ab --- /dev/null +++ b/modules/kernel/src/main/scala/tofu/internal/instances/perform.scala @@ -0,0 +1,75 @@ +package tofu.internal.instances + +import cats.data.{Kleisli, ReaderT} +import cats.tagless.{ContravariantK, FunctorK} +import cats.{Apply, Functor, ~>} +import tofu.PerformOf.Cont +import tofu.lift.{Lift, Unlift} +import tofu.syntax.funk._ +import tofu.syntax.monadic._ +import tofu.{PerformVia, Performer} +import tofu.kernel.types.PerformOf + +final class PerformerContravariantK[F[_], Cancel] extends ContravariantK[Performer[F, *[_], Cancel]] { + def contramapK[C1[_], C2[_]](af: Performer[F, C1, Cancel])(fk: C2 ~> C1): Performer[F, C2, Cancel] = + new Performer[F, C2, Cancel] { + def perform[A](cont: C2[A])(f: F[A]): F[Cancel] = af.perform(fk(cont))(f) + } +} + +final class PerformViaContravariantK[F[_], Cancel] extends ContravariantK[PerformVia[F, *[_], Cancel]] { + def contramapK[C1[_], C2[_]](af: PerformVia[F, C1, Cancel])(fk: C2 ~> C1) = + new PerformViaMappedPerformer(af, fk) +} + +class PerformViaMappedPerformer[F[_], C1[_], C2[_], Cancel]( + af: PerformVia[F, C1, Cancel], + fk: C2 ~> C1, +) extends PerformVia[F, C2, Cancel] { + private[this] val pcontra = Performer.contravariantK[F, Cancel] + + def performer: F[Performer[F, C2, Cancel]] = af.performer.map(pcontra.contramapK(_)(fk)) + implicit def functor: Functor[F] = af.functor +} + +class PerformOfMappedPerformer[F[_], Ex1[_], Ex2[_]]( + af: PerformOf[F, Ex1], + fk: Ex1 ~> Ex2, +) extends PerformViaMappedPerformer[F, Cont[Ex1, *], Cont[Ex2, *], Unit]( + af, + funK[Cont[Ex2, *], Cont[Ex1, *]](c1 => ex1 => c1(fk(ex1))), + ) with PerformOf[F, Ex2] + +final class PerformOfFunctorK[F[_]] extends FunctorK[PerformOf[F, *[_]]] { + def mapK[Ex1[_], Ex2[_]](af: PerformOf[F, Ex1])(fk: Ex1 ~> Ex2): PerformOf[F, Ex2] = + new PerformOfMappedPerformer(af, fk) +} + +final class ReaderTPerformer[F[_], R, C[_], Cancel](p: Performer[F, C, Cancel], r: R) + extends Performer[ReaderT[F, R, *], C, Cancel] { + def perform[A](cont: C[A])(f: Kleisli[F, R, A]): Kleisli[F, R, Cancel] = + ReaderT.liftF(p.perform(cont)(f.run(r))) +} + +final class UnliftPerformer[F[_], B[_], C[_], Cancel](p: Performer[B, C, Cancel], unlifter: F ~> B, lift: Lift[B, F]) + extends Performer[F, C, Cancel] { + def perform[A](cont: C[A])(f: F[A]): F[Cancel] = lift.lift(p.perform(cont)(unlifter(f))) +} +class PerformViaReader[F[_]: Functor, R, C[_], Cancel]( + p: PerformVia[F, C, Cancel] +) extends PerformVia[ReaderT[F, R, *], C, Cancel] { + val functor: Functor[ReaderT[F, R, *]] = implicitly + + def performer: ReaderT[F, R, Performer[ReaderT[F, R, *], C, Cancel]] = + ReaderT(r => p.performer.map(new ReaderTPerformer(_, r))) +} + +class PerformViaUnlift[F[_], B[_], C[_], Cancel](implicit + p: PerformVia[B, C, Cancel], + unlift: Unlift[B, F], + val functor: Apply[F] +) extends PerformVia[F, C, Cancel] { + + def performer: F[Performer[F, C, Cancel]] = + unlift.lift(p.performer).map2(unlift.unlift)(new UnliftPerformer[F, B, C, Cancel](_, _, unlift)) +} diff --git a/modules/kernel/src/main/scala/tofu/kernel/types.scala b/modules/kernel/src/main/scala/tofu/kernel/types.scala index 3e1e5cd64..1034b48d7 100644 --- a/modules/kernel/src/main/scala/tofu/kernel/types.scala +++ b/modules/kernel/src/main/scala/tofu/kernel/types.scala @@ -3,6 +3,7 @@ package kernel import cats.ApplicativeError import cats.MonadError +import tofu.concurrent.Exit object types extends KernelTypes @@ -39,4 +40,10 @@ trait KernelTypes extends Any { type Calculates[F[_]] = Scoped[Scoped.Calculation, F] type CalcExec[F[_]] = ScopedExecute[Scoped.Calculation, F] + + type PerformCont[Ex[_], A] = Ex[A] => Unit + type PerformExitCont[E, A] = Exit[E, A] => Unit + type PerformOf[F[_], Ex[_]] = PerformVia[F, PerformCont[Ex, *], Unit] + type Perform[F[_], E] = PerformOf[F, Exit[E, *]] + type PerformThrow[F[_]] = Perform[F, Throwable] } diff --git a/modules/kernel/src/main/scala/tofu/lift/Unlift.scala b/modules/kernel/src/main/scala/tofu/lift/Unlift.scala index 9675012a0..9399c3abd 100644 --- a/modules/kernel/src/main/scala/tofu/lift/Unlift.scala +++ b/modules/kernel/src/main/scala/tofu/lift/Unlift.scala @@ -31,13 +31,16 @@ object Lift extends LiftInstances1 { } implicit def liftIdentity[F[_]]: Lift[F, F] = liftIdentityAny.asInstanceOf[Lift[F, F]] - private val liftReaderTAny: Lift[AnyK, ReaderT[Any, Any, *]] = { + private val unliftReaderTAny: Unlift[AnyK, ReaderT[Any, Any, *]] = { type RT[a] = ReaderT[Any, Any, a] - new Lift[Any, RT] { + new Unlift[Any, RT] { def lift[A](fa: Any): RT[A] = ReaderT.liftF(fa) + + val unlift: RT[RT ~> Any] = ReaderT[Any, Any, RT ~> Any](r => funK[RT, Any](f => f.run(r))) } } - implicit def liftReaderT[F[_], R]: Lift[F, ReaderT[F, R, *]] = liftReaderTAny.asInstanceOf[Lift[F, ReaderT[F, R, *]]] + implicit def liftReaderT[F[_], R]: Lift[F, ReaderT[F, R, *]] = + unliftReaderTAny.asInstanceOf[Unlift[F, ReaderT[F, R, *]]] } private[lift] trait LiftInstances1 extends LiftInstances2 { diff --git a/modules/kernelCE2Interop/src/main/scala/tofu/interop/CE2Kernel.scala b/modules/kernelCE2Interop/src/main/scala/tofu/interop/CE2Kernel.scala index c997ee73f..530f67e8a 100644 --- a/modules/kernelCE2Interop/src/main/scala/tofu/interop/CE2Kernel.scala +++ b/modules/kernelCE2Interop/src/main/scala/tofu/interop/CE2Kernel.scala @@ -1,8 +1,24 @@ package tofu package interop +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} import cats.effect.concurrent.{MVar, Ref} -import cats.effect.{Async, Blocker, Bracket, Concurrent, ContextShift, Effect, ExitCase, Fiber, IO, Sync, Timer} +import cats.effect.{ + Async, + Blocker, + Bracket, + Concurrent, + ConcurrentEffect, + ContextShift, + Effect, + ExitCase, + Fiber, + IO, + Sync, + Timer +} import cats.{Functor, Id, Monad, ~>} import tofu.compat.unused import tofu.concurrent._ @@ -10,10 +26,7 @@ import tofu.internal.NonTofu import tofu.internal.carriers._ import tofu.lift.Lift import tofu.syntax.monadic._ - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} +import tofu.internal.instances.PerformViaUnlift object CE2Kernel { // 2.12 sometimes gets mad on Const partial alias during implicit search @@ -110,10 +123,26 @@ object CE2Kernel { def sleep(duration: FiniteDuration): F[Unit] = T.sleep(duration) } + final def performConcurrentEffect[F[_]](implicit + F: ConcurrentEffect[F], + @unused _nt: NonTofu[F] + ): PerformCarrier2[F] = + new ConcurrentEffectPerformer[F] + + final def performContextConcurrentEffect[F[_]](implicit + F: ContextConcurrentEffect[F], + @unused _nt: NonTofu[F] + ): PerformCarrier2Context[F] = + new PerformViaUnlift[F, F.Base, PerformOf.ExitCont[Throwable, *], Unit]()( + performConcurrentEffect[F.Base](F.concurrentEffect, NonTofu.refute), + F.unlift, + F.apply + ) with PerformCarrier2Context[F] + final def agentByRefAndSemaphore[I[_]: Monad, F[_]: Fire: Monad](implicit makeRef: MakeRef[I, F], makeSemaphore: MakeSemaphore[I, F] - ): MkAgentCE2Carrier[I, F] = { + ): MkAgentCE2Carrier[I, F] = new MkAgentCE2Carrier[I, F] { def agentOf[A](a: A): I[Agent[F, A]] = for { @@ -121,7 +150,6 @@ object CE2Kernel { sem <- makeSemaphore.semaphore(1) } yield SemRef(ref, sem) } - } final def serialAgentByRefAndSemaphore[I[_]: Monad, F[_]: Monad](implicit makeRef: MakeRef[I, F], diff --git a/modules/kernelCE2Interop/src/main/scala/tofu/interop/ConcurrentEffectPerformer.scala b/modules/kernelCE2Interop/src/main/scala/tofu/interop/ConcurrentEffectPerformer.scala new file mode 100644 index 000000000..adcb8a969 --- /dev/null +++ b/modules/kernelCE2Interop/src/main/scala/tofu/interop/ConcurrentEffectPerformer.scala @@ -0,0 +1,14 @@ +package tofu.interop +import tofu.internal.carriers.PerformCarrier2 +import tofu.Performer +import tofu.concurrent.Exit +import cats.effect.{ConcurrentEffect, IO} + +final class ConcurrentEffectPerformer[F[_]](implicit val functor: ConcurrentEffect[F]) + extends PerformCarrier2[F] with Performer.OfExit[F, Throwable] { + + def performer: F[Performer.OfExit[F, Throwable]] = functor.pure(this) + + def perform[A](cont: Exit[Throwable, A] => Unit)(f: F[A]): F[Unit] = + functor.runCancelable(f)(e => IO(cont(Exit.fromEither(e)))).unsafeRunSync() +} diff --git a/modules/kernelCE2Interop/src/main/scala/tofu/interop/ContextConcurrentEffect.scala b/modules/kernelCE2Interop/src/main/scala/tofu/interop/ContextConcurrentEffect.scala new file mode 100644 index 000000000..d2de10b32 --- /dev/null +++ b/modules/kernelCE2Interop/src/main/scala/tofu/interop/ContextConcurrentEffect.scala @@ -0,0 +1,27 @@ +package tofu.interop + +import cats.Apply +import cats.data.ReaderT +import cats.effect.ConcurrentEffect +import tofu.lift.Unlift + +abstract class ContextConcurrentEffect[F[_]] { + type Base[_] + + implicit def concurrentEffect: ConcurrentEffect[Base] + implicit def unlift: Unlift[Base, F] + implicit def apply: Apply[F] +} + +object ContextConcurrentEffect { + final class Impl[F[_], B[_]](implicit + val concurrentEffect: ConcurrentEffect[B], + val unlift: Unlift[B, F], + val apply: Apply[F] + ) extends ContextConcurrentEffect[F] { + final type Base[A] = B[A] + } + + implicit def resolveReaderTConcurrentEffect[F[_]: ConcurrentEffect, C]: Impl[ReaderT[F, C, *], F] = + new Impl[ReaderT[F, C, *], F] +} diff --git a/modules/kernelCE3Interop/src/main/scala/tofu/interop/CE3Kernel.scala b/modules/kernelCE3Interop/src/main/scala/tofu/interop/CE3Kernel.scala index 1eab4b4cb..507b04198 100644 --- a/modules/kernelCE3Interop/src/main/scala/tofu/interop/CE3Kernel.scala +++ b/modules/kernelCE3Interop/src/main/scala/tofu/interop/CE3Kernel.scala @@ -1,23 +1,24 @@ package tofu.interop +import java.util.concurrent.TimeUnit + +import scala.annotation.unused +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} + import cats.effect.kernel._ import cats.effect.std.Dispatcher import cats.effect.unsafe.IORuntime import cats.effect.{Async, Fiber, IO, Sync} import cats.{Functor, Monad} -import tofu.concurrent.impl.QVarSM import tofu.concurrent._ +import tofu.concurrent.impl.QVarSM import tofu.internal.NonTofu import tofu.internal.carriers._ import tofu.lift.Lift import tofu.syntax.monadic._ import tofu.{Fire, Scoped, WithContext} -import java.util.concurrent.TimeUnit -import scala.annotation.unused -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} - object CE3Kernel { def delayViaSync[K[_]](implicit KS: Sync[K]): DelayCarrier3[K] = new DelayCarrier3[K] { @@ -80,9 +81,15 @@ object CE3Kernel { } final def asyncExecute[F[_]](implicit - ec: ExecutionContext, F: Async[F] - ): ScopedCarrier3[Scoped.Main, F] = makeExecute[Scoped.Main, F](ec) + ): ScopedCarrier3[Scoped.Main, F] = new ScopedCarrier3[Scoped.Main, F] { + def runScoped[A](fa: F[A]): F[A] = fa + + def executionContext: F[ExecutionContext] = F.executionContext + + def deferFutureAction[A](f: ExecutionContext => Future[A]): F[A] = + F.fromFuture(executionContext.flatMap(ec => F.delay(f(ec)))) + } final def blockerExecute[F[_]](implicit blocker: Blocker[F], @@ -116,10 +123,15 @@ object CE3Kernel { def sleep(duration: FiniteDuration): F[Unit] = T.sleep(duration) } + final def performDispatchContext[F[_]](implicit + F: ContextDispatch[F], + ): PerformCarrier3[F] = new DispatchPerform[F, F.Base]()(F.async, F.apply, F.dispatcher, F.unlift) + with PerformCarrier3[F] + final def agentByRefAndSemaphore[I[_]: Monad, F[_]: MonadCancelThrow: Fire](implicit makeRef: MakeRef[I, F], makeSemaphore: MakeSemaphore[I, F] - ): MkAgentCE3Carrier[I, F] = { + ): MkAgentCE3Carrier[I, F] = new MkAgentCE3Carrier[I, F] { def agentOf[A](a: A): I[Agent[F, A]] = for { @@ -127,7 +139,6 @@ object CE3Kernel { sem <- makeSemaphore.semaphore(1) } yield SemRef(ref, sem) } - } final def serialAgentByRefAndSemaphore[I[_]: Monad, F[_]: MonadCancelThrow](implicit makeRef: MakeRef[I, F], diff --git a/modules/kernelCE3Interop/src/main/scala/tofu/interop/ContextDispatch.scala b/modules/kernelCE3Interop/src/main/scala/tofu/interop/ContextDispatch.scala new file mode 100644 index 000000000..f4b3e4d61 --- /dev/null +++ b/modules/kernelCE3Interop/src/main/scala/tofu/interop/ContextDispatch.scala @@ -0,0 +1,33 @@ +package tofu.interop + +import tofu.lift.Unlift +import cats.effect.std.Dispatcher +import tofu.WithContext +import cats.effect.Async +import cats.Apply + +abstract class ContextDispatch[F[_]] { + type Base[_] + def unlift: Unlift[Base, F] + def dispatcher: WithContext[F, Dispatcher[Base]] + def async: Async[Base] + def apply: Apply[F] +} + +object ContextDispatch { + + final class Impl[F[_], B[_]](implicit + val dispatcher: WithContext[F, Dispatcher[B]], + val unlift: Unlift[B, F], + val async: Async[B], + val apply: Apply[F], + ) extends ContextDispatch[F] { + final type Base[A] = B[A] + } + + implicit def resolveContextDispatch[F[_]: Apply, B[_]](implicit + context: WithContext[F, Dispatcher[B]], + B: Async[B], + unlift: Unlift[B, F], + ): Impl[F, B] = new Impl[F, B] +} diff --git a/modules/kernelCE3Interop/src/main/scala/tofu/interop/DispatchPerformer.scala b/modules/kernelCE3Interop/src/main/scala/tofu/interop/DispatchPerformer.scala new file mode 100644 index 000000000..c4b21bb43 --- /dev/null +++ b/modules/kernelCE3Interop/src/main/scala/tofu/interop/DispatchPerformer.scala @@ -0,0 +1,42 @@ +package tofu.interop + +import cats.effect.std.Dispatcher +import tofu.Performer +import tofu.kernel.types.PerformThrow +import tofu.concurrent.Exit +import cats.effect.Async +import scala.concurrent.ExecutionContext +import cats.Apply +import tofu.WithContext +import tofu.lift.Unlift +import tofu.internal.instances.UnliftPerformer +import tofu.kernel.types.PerformExitCont + +class DispatchPerform[F[_], B[_]](implicit + async: Async[B], + val functor: Apply[F], + dispatcher: WithContext[F, Dispatcher[B]], + unlift: Unlift[B, F] +) extends PerformThrow[F] { + + def performer: F[Performer.OfExit[F, Throwable]] = + functor.map3( + unlift.lift(async.executionContext), + dispatcher.context, + unlift.unlift + ) { (ec, dispatcher, unlifter) => + val performer = new DispatchPerformer[B](dispatcher)(async, ec) + new UnliftPerformer[F, B, PerformExitCont[Throwable, *], Unit](performer, unlifter, unlift) + } +} + +class DispatchPerformer[F[_]](dispatcher: Dispatcher[F])(implicit F: Async[F], ec: ExecutionContext) + extends Performer.OfExit[F, Throwable] { + + def perform[A](cont: Exit[Throwable, A] => Unit)(fa: F[A]): F[Unit] = { + val (res, cancel) = dispatcher.unsafeToFutureCancelable(fa) + res.onComplete(t => cont(Exit.fromTry(t))) + F.fromFuture(F.delay(cancel())) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 81b771ccf..e64818fca 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -54,6 +54,8 @@ object Dependencies { val doobie = "0.13.4" + val doobieCE3 = "1.0.0-RC1" + // Compile time only val macroParadise = "2.1.1" @@ -109,6 +111,8 @@ object Dependencies { val refined = "eu.timepit" %% "refined" % Version.refined val doobieCore = "org.tpolecat" %% "doobie-core" % Version.doobie val doobieH2 = "org.tpolecat" %% "doobie-h2" % Version.doobie + val doobieCoreCE3 = "org.tpolecat" %% "doobie-core" % Version.doobieCE3 + val doobieH2CE3 = "org.tpolecat" %% "doobie-h2" % Version.doobieCE3 val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % Version.collectionCompat val log4Cats = "org.typelevel" %% "log4cats-core" % Version.log4Cats val groovy = "org.codehaus.groovy" % "groovy" % Version.groovy