Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Stream#noInterruptScope #2843

Closed

Conversation

armanbilge
Copy link
Member

Opening for CI, let's see if I broke anything 😅

@armanbilge
Copy link
Member Author

armanbilge commented Mar 10, 2022

Woops?

==> X fs2.StreamCombinatorsSuite.evalFilterNotAsync - filters up to N items in parallel  2.021s java.util.NoSuchElementException: null
    at fs2.Stream$CompileOps.lastOrError$$anonfun$1$$anonfun$1(Stream.scala:4654)
    at scala.Option.fold(Option.scala:263)
    at fs2.Stream$CompileOps.lastOrError$$anonfun$1(Stream.scala:4654)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at modify @ fs2.internal.Scope.close(Scope.scala:268)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at rethrow$extension @ fs2.Compiler$Target.compile$$anonfun$1(Compiler.scala:156)
    at get @ fs2.internal.Scope.openScope(Scope.scala:287)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Pull$.goCloseScope$1$$anonfun$1$$anonfun$3(Pull.scala:1184)
    at update @ fs2.concurrent.SignallingRef$$anon$3.cleanup$1(Signal.scala:241)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)

Edit: can't reproduce locally, a flake? 😓

Edit 2: right, it also happened in #2831 (comment).

Comment on lines +373 to +382
test("issue #2842 - no interrupt scope") {
val s = for {
local <- Stream.eval(IOLocal(List.empty[Int]))
_ <- Stream.eval(local.update(1 :: _)).noInterruptScope
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber
_ <- Stream.eval(local.update(3 :: _)).noInterruptScope
result <- Stream.eval(local.get)
} yield result
s.interruptScope.compile.lastOrError.assertEquals(List(3, 1))
}
Copy link
Member Author

@armanbilge armanbilge Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was concerned about—that the noInterruptScope would leak to other portions of the Stream. This test currently fails, because Stream.eval(local.update(2 :: _)) is not interruptible even though it should be.

@armanbilge
Copy link
Member Author

Ok, I'm afraid this particular approach may be a dead end. I didn't think hard enough about how scopes work and It suffers from exactly the problem @rossabaker asked about in armanbilge/bayou#1 (comment), which is that the no-interruptibility scope extends over the flatMaps.

Of course, a noInterruptScope concept may be valuable in of itself, but it doesn't solve #2842.

val s = for {
local <- Stream.eval(IOLocal(List.empty[Int]))
_ <- Stream.eval(local.update(1 :: _)).noInterruptScope
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know whether this is a good idea or not, but it makes the test pass.

Suggested change
_ <- Stream.eval(local.update(2 :: _)) // this one will be lost on a forked fiber
_ <- Stream.eval(local.update(2 :: _)).interruptScope // this one will be lost on a forked fiber

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that works. Going to your example in armanbilge/bayou#1 (comment) it would be like doing this I think:

- x  <- (spanS[IO]("stream") >> s2).interruptWhen(d).compile.foldMonoid
+ x  <- (spanS[IO]("stream") >> s2.interruptScope).interruptWhen(d).compile.foldMonoid

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually I'm not sure. Because even if s2 is interruptible I don't think it would respect the interruptibility of the outer Stream since it doesn't know about that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got called away to a meeting before I could share and again before I can refine, but I'm trying to define a spanS. This passes. (Both interrupt scopes aren't necessary, but the one in spanS is necessary when it doesn't know what it's spanning.)

  test("spanS -- interrupted") {
    IOLocal(0).flatMap { local =>
      def spanS: Stream[IO, Unit] =
        Stream.bracket(local.set(1))(_ => local.set(0))
          .noInterruptScope
          .interruptScope

      ((spanS >> Stream.eval(local.get)) ++ Stream.eval(local.get))
        .interruptScope.compile.toList
    }.assertEquals(List(1, 0))
  }

Copy link
Member

@rossabaker rossabaker Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand your comment on the outer stream. We don't want to go noInterrupt-interrupt. We want to go noInterrupt-reset. My spanS above would make an uninterruptible-by-inheritance stream interruptible to trace it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually it doesn't matter at all whether the Stream we want to trace is interruptible or not. Really, we shouldn't be messing with that. In fact, interruptibility is sort of convoluted with the real problem here.

All we want is a way to call local.set(...) on the fiber will parent every other forked fiber in that Stream. (And ideally a way to do it again, at the end of the Stream when we close the Span). Because interruptibility is using race is why it becomes important in this context, but these are brittle sort of implementation details. And there could be other similar gotchas throughout fs2.

Since IOLocal is specific to IO, I wonder if we just need to add some IO specific methods to Stream specifically for working with locals. Because currently compositionality is kind of broken for anything to do with IOLocal.

val s = Stream.eval(local.set(...)) >> Stream.eval(local.get)
s.compile.lastOrError =!= s.interruptScope.compile.lastOrError

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy question: does fs2 even depend on IO? Wouldn't it need to put some sort of hint into the algebra that an IO interpreter was aware of?

Copy link
Member Author

@armanbilge armanbilge Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs2 depends on CE core because of SyncIO but there's no direct dependency to IO that I'm aware of anyway.

I don't think that the interpreter needs to be aware of IO. Roughly I was thinking we could have some (ugly) method like this:

object Stream {
  // edit: this signature needs to be re-thought, for sure ...
  def locally[A, B](local: IOLocal[A])(IOLocal[A] => IO[B]): Stream[IO, B]
}

that must satisfy something like

val s = Stream.locally(local)(_.set(...)) >> Stream.locally(local)(_.get)
s.compile.lastOrError === s.interruptScope.compile.lastOrError

The implementation can delegate to something like Stream.evalUninterruptible which can be a private[fs2] method. So that way we don't expose it, but also the algebra doesn't need to know about IO. The fact that the desired semantics are achieved by disabling interruption is remains an implementation detail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable, but it also means we can't derive spanS from the proposed TraceResource and would need a third type class:

classDiagram
    Trace <|-- TraceResource
    TraceResource <|-- TraceStream
      class Trace{
        +span(String name)(F~A~ fa) F~A~
      }
      class TraceResource{
        +spanR(String name) Resource~F~
      }
      class TraceStream{
        +spanS(String name) Stream~F~
      }
Loading

Sorry, I don't know enough Mermaid UML™ to render the output type of Resource and Stream, but you get the gist.

Or maybe TraceResource could have an abstract type parameter, and fs2 could have a Locally type class that relates IO and IOLocal, but now I'm feeling even dizzier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is what I was getting at in armanbilge/bayou#1 (comment) and typelevel/natchez#526 (comment): we will need a spanS method implemented specifically for Stream and thus would need to take on an fs2 dependency. This would be true even with my original noInterruptScope idea.

I don't know if TraceResource and TraceStream or more typeclasses are the right answer 😅 starts to get complicated. I'd rather have an enhanced Trace and take on the fs2 dependency which is kind of what Bayou purported anyway.

@rossabaker
Copy link
Member

I wonder whether something like this could help make this test pass, but I haven't made it click yet. I want the finalizer on s to run even if the stream is canceled. The use case is for http4s-0.23, where the response finalizer is embedded in response.body. Response should be in a resource, but until it is, we need to guarantee that the body is run if a response is produced.

    Deferred[IO, Boolean].flatMap { d =>
      val s = Stream.never[IO].onFinalize(d.complete(true).attempt.void)
      IO.uncancelable { poll =>
        IO.canceled *> poll(s.compile.drain)
      }.onCancel(d.complete(false).attempt.void).start *> d.get
    }.assert

@armanbilge
Copy link
Member Author

Ross found a better way to do tracing, which was the motivating use-case for this method.

@armanbilge armanbilge closed this Jun 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Counter-intuitive behavior with interruptScope and IOLocals
2 participants