-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Errors are not always caught with .handleErrorWith #3045
Comments
Here's the minimizer with scala-cli. //> using scala "3.2.1"
//> using lib "co.fs2::fs2-core::3.3.0"
package ru.delimobil
import cats.MonadThrow
import cats.effect.IO
import cats.effect.IOApp
import cats.syntax.applicative._
import cats.syntax.option._
import fs2.Stream
import scala.concurrent.duration._
object FS2Example extends IOApp.Simple {
case class MaybeInt(int: Option[Int])
private val empty: MaybeInt = MaybeInt(none)
private val big = 2
private val small = 1
private val track = empty.copy(int = big.some)
def run: IO[Unit] = {
val singleRun =
Stream(track, track)
.repeat
.through(assemble)
.evalTap(checkDateValidity)
.parEvalMap(10)(_ => IO.unit)
.handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
.groupWithin(512, 1.second)
.compile
.drain
Stream.repeatEval(singleRun).compile.drain.guaranteeCase(IO.println)
}
private def assemble(tracks: Stream[IO, MaybeInt]): Stream[IO, MaybeInt] =
(Stream(empty) ++ tracks)
.sliding(2)
.map { chunk => MaybeInt(chunk.head.get.int) }
.chunks
.flatMap(chunk => Stream.fromOption(chunk.last))
private def checkDateValidity(updateAction: MaybeInt): IO[Unit] = {
val action = MonadThrow[IO].raiseError(new IllegalArgumentException("TheException"))
action.whenA(updateAction.int.exists(_ > small))
}
} |
If no one else is currently having a look at this, can I try and take this on? |
Go for it! I played with a little bit this morning. All I discovered was that removing the |
I think I excluded
|
The exception persists in 3.3.0, 3.2.4, 3.0.0, 2.5.4, 2.5.0, 2.1.0. But doesn't appear in 2.0.0. UPD: I also checked 2.2.0, 2.3.0, 2.4.0, 3.1.0; the error persists in that versions |
This is CE2 compliant code
|
I bisected 2.1.0 ... 2.0.0
That's the link to the commit: 659791b |
Sorry this is taking so long, I haven't had much luck on the little time I have had available to look at this. For just now I will unassign myself so other people can have a go as well. |
@nikiforo Some small minimisations: the (this was based on the |
@diesalbla would you mind posting a complete reproducer? I tried as you said and it does not reproducer for me: the error is caught. //> using lib "co.fs2::fs2-core::3.4.0"
import cats.effect._
import fs2._
object App extends IOApp.Simple {
def run = Stream(())
.concurrently(Stream.raiseError[IO](new Exception("ruh roh")))
.handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
.compile
.drain
} |
@armanbilge Here is the modified code: object err extends IllegalArgumentException("TheException") with NoStackTrace
val bullet: Stream[IO, Unit] = Stream.raiseError[IO](err)
def handler(ex: Throwable) = Stream.exec(IO.println(s"Caught TheException"))
def run: IO[Unit] = {
val singleRun =
Stream(()).concurrently(bullet)
.handleErrorWith(handler)
.groupWithin(100, 100.millis)
def aux: IO[Unit] = singleRun.compile.drain >> aux
aux.guaranteeCase(IO.println)
}
This was done using fs2 version 3.4.0. Here are the details of the JVM I used:
EDIT As a further minimisation, we can write outer loop without streams, using recursion in |
Take a look at https://github.com/nikiforo/fs21107
The project can be run with
sbt "test:runMain ru.delimobil.FS2Example"
I expect the execution to continue inifinitely. However it ends with
Errored(java.lang.IllegalArgumentException: TheException)
in tens seconds.I can further minimize the example. I've checked fs2 3.2.4, 3.3.0, both versions end with
Errrored
messageThe text was updated successfully, but these errors were encountered: