Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: CE 3 #273

Merged
merged 12 commits into from
May 14, 2021
Merged

WIP: CE 3 #273

merged 12 commits into from
May 14, 2021

Conversation

iRevive
Copy link
Contributor

@iRevive iRevive commented Apr 4, 2021

Temporarily disabled modules: odin-zio, odin-monix
Missing CE3 dependencies: zio-cats, monix

odin-core depends on cats-effect-std.

Changes due to missing Monix dependency:

  • Replaced ConcurrentQueue from Monix with Queue from cats.effect.std
  • Replaced monix.Task with cats.effect.IO in tests
  • Removed monix.execution.Scheduler in favor of IORuntime

Once a compatible version of Monix will be released, I can revert these changes.

Benchmarks

I observed a performance degradation after the upgrade to CE3. Evaluating a task via .unsafeRunSync() in a for-loop is 3x slower comparing to the CE2. Using traverse instead of a for-loop leads to more clear results.

for-loop

The results below represent the evaluation of a logging effect in a for-loop. Example:

@Benchmark
@OperationsPerInvocation(1000)
def msg(): Unit = for (_ <- 1 to 1000) logger.info(message).unsafeRunSync()
Benchmark Mode Cnt Score Error Units
FileLoggerBenchmarks.msg avgt 25 21488.981 ± 1462.785 ns/op
FileLoggerBenchmarks.msgAndCtx avgt 25 20273.076 ± 695.874 ns/op
FileLoggerBenchmarks.msgCtxThrowable avgt 25 30110.791 ± 1010.558 ns/op
AsyncLoggerBenchmark.msg avgt 25 18097.440 ± 4621.083 ns/op
AsyncLoggerBenchmark.msgAndCtx avgt 25 14337.669 ± 1383.534 ns/op
AsyncLoggerBenchmark.msgCtxThrowable avgt 25 17652.329 ± 1098.268 ns/op
ScribeBenchmark.asyncMsg avgt 25 114.641 ± 3.568 ns/op
ScribeBenchmark.asyncMsgCtx avgt 25 131.083 ± 2.123 ns/op
ScribeBenchmark.msg avgt 25 1443.887 ± 34.406 ns/op
ScribeBenchmark.msgAndCtx avgt 25 1717.407 ± 50.303 ns/op

traverse

@Benchmark
@OperationsPerInvocation(1000)
def msg(): Unit = (1 to 1000).toList.traverse(_ => logger.info(message)).unsafeRunSync()
Benchmark Mode Cnt Score Error Units
FileLoggerBenchmarks.msg avgt 25 7750.887 ± 456.193 ns/op
FileLoggerBenchmarks.msgAndCtx avgt 25 8385.711 ± 585.243 ns/op
FileLoggerBenchmarks.msgCtxThrowable avgt 25 21720.537 ± 4569.168 ns/op
AsyncLoggerBenchmark.msg avgt 25 1486.737 ± 271.336 ns/op
AsyncLoggerBenchmark.msgAndCtx avgt 25 1523.111 ± 211.884 ns/op
AsyncLoggerBenchmark.msgCtxThrowable avgt 25 1624.252 ± 170.380 ns/op

AsyncLoggerBenchmark issue

From my point of view, the async logger benchmark implemented in a bit wrong way. And it does not measure the real throughput.

The key element of the AsyncLogger is a Queue. Logging a message, basically, an enqueue operation:

def submit(msg: LoggerMessage): F[Unit] = {
  queue.tryOffer(msg).void
}

In benchmarks, the size of a queue is 1_000_000 elements and the flush period is 1 millisecond. Since the JMH executes the code thousands of times, the queue is populated up to the limit almost immediately. Hence the tryOffer method does nothing during evaluation:

def tryOffer(a: A): F[Boolean] =
  state
    .modify {
      case State(queue, size, takers, offerers) if takers.nonEmpty =>
        val (taker, rest) = takers.dequeue
        State(queue, size, rest, offerers) -> taker.complete(a).as(true)
      case State(queue, size, takers, offerers) if size < capacity =>
        State(queue.enqueue(a), size + 1, takers, offerers) -> F.pure(true)
      case s => 
         s -> F.pure(false) <- the branch being evaluated when the queue is full
    }
    .flatten
    .uncancelable

To prove my assumption I changed the logic of the background fiber:

def runF: F[Fiber[F, Throwable, Unit]] = {
-  def drainLoop: F[Unit] = drain >> F.sleep(timeWindow) >> F.cede >> drainLoop
+  def drainLoop: F[Unit] = F.unit

  F.start(drainLoop).map { fiber =>
    new Fiber[F, Throwable, Unit] {
      override def cancel: F[Unit] = drain >> fiber.cancel
      override def join: F[Outcome[F, Throwable, Unit]] = fiber.join
    }
  }
}

The queue never being drained and tryOffer does nothing. And the measurements became similar to the CE2 version:

Benchmark Mode Cnt Score Error Units
AsyncLoggerBenchmark.msg avgt 25 996.862 ± 393.928 ns/op
AsyncLoggerBenchmark.msgAndCtx avgt 25 710.134 ± 134.316 ns/op
AsyncLoggerBenchmark.msgCtxThrowable avgt 25 741.075 ± 195.111 ns/op

@iRevive iRevive changed the title CE 3 WIP: CE 3 Apr 4, 2021
@codecov
Copy link

codecov bot commented Apr 4, 2021

Codecov Report

Merging #273 (be287ec) into master (19c38e4) will decrease coverage by 1.70%.
The diff coverage is 76.40%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #273      +/-   ##
==========================================
- Coverage   93.02%   91.32%   -1.71%     
==========================================
  Files          33       33              
  Lines         502      530      +28     
  Branches        9       14       +5     
==========================================
+ Hits          467      484      +17     
- Misses         35       46      +11     
Flag Coverage Δ
unittests 91.32% <76.40%> (-1.71%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
benchmarks/src/main/scala/io/odin/Test.scala 0.00% <0.00%> (ø)
...src/main/scala/io/odin/config/DefaultBuilder.scala 100.00% <ø> (ø)
...c/main/scala/io/odin/config/EnclosureRouting.scala 100.00% <ø> (ø)
core/src/main/scala/io/odin/config/package.scala 100.00% <ø> (ø)
...src/main/scala/io/odin/loggers/ConsoleLogger.scala 100.00% <ø> (ø)
...ain/scala/io/odin/loggers/ConstContextLogger.scala 100.00% <ø> (ø)
.../main/scala/io/odin/loggers/ContextualLogger.scala 100.00% <ø> (ø)
...c/main/scala/io/odin/loggers/ContramapLogger.scala 100.00% <ø> (ø)
.../src/main/scala/io/odin/loggers/FilterLogger.scala 100.00% <ø> (ø)
...src/main/scala/io/odin/loggers/WriterTLogger.scala 100.00% <ø> (ø)
... and 16 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 19c38e4...be287ec. Read the comment docs.

@iRevive
Copy link
Contributor Author

iRevive commented Apr 4, 2021

Outstanding items:

  • Fix flaky test: RollingFileLoggerSpec
  • Wait for compatible dependencies:
    • monix
    • zio-cats

@iRevive
Copy link
Contributor Author

iRevive commented Apr 4, 2021

Evaluating a task via .unsafeRunSync() in a for-loop is 3x slower comparing to the CE2. Using traverse instead of a for-loop leads to more clear results.

I didn't investigate this issue yet.
@kubukoz perhaps you've observed the similar behavior in different libraries during upgrade to CE 3?

@kubukoz
Copy link

kubukoz commented Apr 4, 2021

I haven't done any benchmarking with CE3.

@vasilmkd you might be interested

@vasilmkd
Copy link

vasilmkd commented Apr 4, 2021

Can confirm, unsafeRunSync on IO initially shifts to the IO compute execution context and there are some other bookkeeping actions taken to ensure safe execution and propagation of errors, which has a non-trivial overhead especially visible in benchmarks. It's basically not an apples to apples comparison anymore. Not sure if this helps...

import cats.effect.{ContextShift, IO}
import cats.effect.Clock
import cats.effect.IO
import cats.effect.unsafe.IORuntime
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import cats.effect.unsafe.IORuntime
import cats.effect.unsafe.implicits._

and you won't need to define the implicit val for IORuntime.

implicit val F: Effect[IO] = IO.ioEffect
implicit val clock: Clock[IO] = Clock.create
implicit val F: Sync[IO] = IO.asyncForIO
implicit val dispatcher: Dispatcher[IO] = Dispatcher[IO].allocated.unsafeRunSync()._1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad idea to do allocated on Dispatcher, can you use an IORuntime instead?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not really possible since the interface is supposed to work on any F[_]...

@@ -97,7 +98,7 @@ lazy val sharedSettings = Seq(
lazy val `odin-core` = (project in file("core"))
.settings(sharedSettings)
.settings(
libraryDependencies ++= (monix % Test) :: catsMtl :: sourcecode :: monixCatnap :: perfolation :: catsEffect :: cats
libraryDependencies ++= (catsEffect % Test) :: catsMtl :: sourcecode :: perfolation :: catsEffectStd :: cats
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to add catsEffectStd if you have core already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kubukoz I want to have IO only in tests, since cats-effect-std is enough for the core.

timer: Timer[F],
contextShift: ContextShift[F]
case class AsyncLogger[F[_]](queue: Queue[F, LoggerMessage], timeWindow: FiniteDuration, inner: Logger[F])(
implicit F: Async[F]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you need the full Async here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runF method uses F.start under the hood. If we move the event consumer loop outside of the class, the constraints can be relaxed to Monad and Clock.
Related comment: #273 (comment)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start is from Spawn, Async is way more powerful than that ;)

implicit F: ConcurrentEffect[F]
): Logger[F] = F.toIO(withAsync(inner, timeWindow, maxBufferSize).allocated).unsafeRunSync()._1
implicit F: Async[F],
dispatcher: Dispatcher[F]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not recommended to pass Dispatcher implicitly, you might be better off creating one here and using allocated here.

Copy link
Contributor Author

@iRevive iRevive Apr 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to semantic of the withAsyncUnsafe method the Dispatcher cannot be instantiated:

def withAsyncUnsafe[F[_]](
    inner: Logger[F],
    timeWindow: FiniteDuration,
    maxBufferSize: Option[Int]
)(
    implicit F: Async[F]
): Logger[F] = {
  val dispatcher: F[(Dispatcher[F], F[Unit])] = Dispatcher[F].allocated <- still cannot run an effect and access the dispatcher
}

It can work in a case of the different signature:

def withAsync[F[_]]: Resource[F, Logger[F]] = ???

- def withAsyncUnsafe[F[_]](...): Logger[F] = ???
+ def withAsyncUnsafe[F[_]](...): F[Logger[F]] = ???

@sergeykolbasov what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say if someone is feeling adventurous enough to deal with unsafe API, let them do it on their own by providing a custom dispatcher implicitly. If the users want to deal with unsafety, they should know better how to deal with it on their side.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd take it explicitly, but I agree about having to manage it on the user's side :)

@@ -28,7 +26,7 @@ abstract class DefaultLogger[F[_]](val minLevel: Level)(implicit clock: Clock[F]
exception = t,
position = position,
threadName = Thread.currentThread().getName,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random thought, this should be suspended

_ <- timer.sleep(100.millis)
_ <- cs.shift
_ <- F.sleep(100.millis)
_ <- F.cede
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to cede after a sleep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know this, thanks!


checkAll(
"ContextualLogger",
LoggerTests[F](
new WriterTLogger[IO].withConstContext(Map.empty),
_.written.unsafeRunSync()
_.written.evalOn(singleThreadCtx).unsafeRunSync()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need these evalOns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IO executes effects on different threads more often comparing to CE2. Therefore loggerMessageEq returns false since message.threadName is different. Executing effects on a single thread prevents such an issue.

On the other hand, evalOn feels more a bandaid than a proper fix. Perhaps I should ignore threadName field in the Eq logic.

def runF: F[Fiber[F, Throwable, Unit]] = {
def drainOnce: F[Unit] = drain >> F.sleep(timeWindow) >> F.cede

F.start(drainOnce.foreverM[Unit]).map { fiber =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@iRevive iRevive Apr 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to be honest. runF already is a part of the AsyncLogger lifecycle.

Btw, should runF even be public or part of the class? The event consumer loop should be started only once.

At this case, the definition of the AsyncLogger can be simplified:

case class AsyncLogger[F: Monad: Clock] private (...) extends DefaultLogger(...) {
  def submit(msg: LoggerMessage): F[Unit] = ...
  private def drain: F[Unit] = ...
}

object AsyncLogger {
  def withAsync[F[_]: Async](inner: Logger[F], timeWindow: FiniteDuration, maxBufferSize: Option[Int]): Resource[F, Logger[F]] = {
    val createQueue = ...
    
    def backgroundConsumer(logger: AsyncLogger[F]): Resource[F, Unit] = {
      def drainLoop: F[Unit] = F.andWait(logger.drain, timeWindow).foreverM[Unit]
    
      // cannot use F.background due to a custom cancellation logic
      Resource.make(F.start(drainLoop))(fiber => logger.drain >> fiber.cancel).void
    }

    for {
      queue  <- Resource.eval(createQueue)
      logger <- Resource.pure(AsyncLogger(queue, timeWindow, inner))
      _      <- backgroundConsumer(logger)
    } yield logger
  }  
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

@sergeykolbasov
Copy link
Contributor

Thanks for the effort @iRevive

That RollingFileLogger spec is indeed annoying, however I couldn't manage the timer mock for that specific case back in the days. I guess it's related to the internal rolling loop, but gave up tracing it down to the root cause

@sergeykolbasov
Copy link
Contributor

@iRevive do you think using Hotswap would make a rolling file logger cleaner, and those fix the test?

@kubukoz
Copy link

kubukoz commented Apr 6, 2021

fs2-io is using Hotswap internally to implement something similar, for reference: https://github.com/typelevel/fs2/blob/24370abb527147da78b93d59a5be60e1079fdfbe/io/src/main/scala/fs2/io/file/Files.scala#L507-L555

@iRevive
Copy link
Contributor Author

iRevive commented Apr 7, 2021

@iRevive do you think using Hotswap would make a rolling file logger cleaner, and those fix the test?

Can be useful. I will try to use the Hotswap

@iRevive
Copy link
Contributor Author

iRevive commented Apr 7, 2021

Switched to Hotswap. RollingFileLoggerSpec is not failing locally and on CI.

* Run internal loop of consuming events from the queue and push them down the chain
*/
def backgroundConsumer(logger: AsyncLogger[F]): Resource[F, Unit] = {
def drainLoop: F[Unit] = F.andWait(logger.drain, timeWindow).foreverM[Unit]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing andWait being actually used makes me happy.

@kubukoz
Copy link

kubukoz commented Apr 7, 2021

I can't promise I'll make it before the weekend, but I'm planning to go through this again and search for any potential fiber leaks etc. :)

@kubukoz kubukoz mentioned this pull request Apr 21, 2021
@@ -28,13 +30,18 @@ package object zio {
fileName: String,
formatter: Formatter = Formatter.default,
minLevel: Level = Level.Trace
): Managed[LoggerError, Logger[IO[LoggerError, *]]] =
): ZManaged[Clock & CBlocking, LoggerError, Logger[IO[LoggerError, *]]] =
ZManaged
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kubukoz A gut feeling says there should be a more elegant combinator. Something similar to Resource.suspend.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is one, I'm not aware of it.

@kubukoz
Copy link

kubukoz commented May 13, 2021

Can we get this merged? :)

@sergeykolbasov sergeykolbasov merged commit 276ba00 into valskalla:master May 14, 2021
@sergeykolbasov
Copy link
Contributor

Thanks, @iRevive & @kubukoz

I'll drop a release soon

@vasilmkd
Copy link

@iRevive Do you mind repeating the unsafeRunSync benchmarks against the latest Cats Effect 3.1.1 release? We released several performance optimizations right on this code path that should be quite noticeable in benchmarks. Thank you and sorry for the trouble.

@iRevive
Copy link
Contributor Author

iRevive commented May 16, 2021

@vasilmkd sure, I will give it a try.

@kubukoz
Copy link

kubukoz commented Jun 11, 2021

hey @sergeykolbasov, I don't see a release in maven, did it fail or did you just not have a chance to do it yet?

Just FYI, I checked a local snapshot of this and it seems to work :) a little bummer about having to do Dispatcher[IO].allocated.unsafeRunSync()._1 but maybe it'll be better if we get typelevel/cats-effect#1791.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants