Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zip + uncons + concurrency = scope lookup failure #3478

Open
Jasper-M opened this issue Sep 23, 2024 · 4 comments
Open

zip + uncons + concurrency = scope lookup failure #3478

Jasper-M opened this issue Sep 23, 2024 · 4 comments
Labels

Comments

@Jasper-M
Copy link
Contributor

Jasper-M commented Sep 23, 2024

Quoting @armanbilge:

The problem seems to be running the tail of a zipped Stream .concurrently.

I ran into this issue before here: #3081 (comment)

IIUC Arman worked around that problem for hold1 by moving the .pull.uncons into the concurrent process. However the core issue still remains: if you do stream.pull.uncons and then concurrently process the tail, things will blow up if stream happens to contain any kind of zipping. Possibly other operators that are implemented with stepLeg have the same issue?

I ran into this again while trying to use Pull.extendScopeTo for extending resource lifetimes across async boundaries. That actually seems to work, but requires stream.pull.peek or any other unconsing variation, which means it's incompatible with zipped streams:

input1.pull.peek.flatMap{
  case Some((_, stream)) =>
    Pull.extendScopeTo(asyncBufferThing(stream)).flatMap(s => Pull.output1(s))
  case None =>
    Pull.done
}
.stream
.flatten

https://scastie.scala-lang.org/56CQGK8uTzecITHSwmO0AQ

@Jasper-M Jasper-M added the bug label Sep 23, 2024
@armanbilge
Copy link
Member

possible dupe of #3123? In any case, I liked the idea I had there

when compiling the background stream in concurrently, instead of giving it a new root scope it should derive its scope from the foreground stream.

but I couldn't quite make that work.

@Jasper-M
Copy link
Contributor Author

Oh sorry, I didn't see that other issue.
You can run into the issue via stream.compile.drain.start as well, which is what groupWithin was doing. Though if you could at least avoid it by staying in Stream, that would already be a big improvement.

@Jasper-M
Copy link
Contributor Author

And to underscore how little I understand of the scope system, if you replace Pull.extendScopeTo(asyncBufferThing(stream)).flatMap(s => Pull.output1(s)) with just Pull.output1(asyncBufferThing(stream)) it still seems to "extend" the scope to the output stream... Though extendScopeTo still seems to be required in more complicated cases.

@ValdemarGr
Copy link
Contributor

ValdemarGr commented Oct 11, 2024

When unconsing, the tail of the stream is (effectively) prefixed with an eval effect that closes the previous scope.

Here is an example:
https://scastie.scala-lang.org/k8hcN1noTzCEA7VQzWtLcQ

When you do Stream#evalMap and Stream#flatMap a stack of transformations are composed, once a Pull.output is performed, the outputs are pushed through this stack of transformations and finally through whatever .compile operation chose such as drain. The Scope reasoning lies in the tail.

I've experimented a bit with a non-cps based Pull; I think the composition is a bit more explicit here.

What does scope lookup mean?

In fs2 scope's are referenced by scopeId when doing things like StepLeg or closing a scope (see the CloseScope node in Pull). If we were to omit scopeId from a hypothetical fs2 implementation and reference scopes directly by reference (Scope[F]) then we'd either open a bunch of soundness holes or make the API unwieldy.

The F in Scope is not necessarily the same F as in Pull, remember, Pull is covariant in F and the Translate node allows F ~> G, so the real effect type G may be either a supertype (G[x] >: F[x]) or an embedding (hopefully) F ~> G.

For instance, for Stream[Id, A] the actual effect used to evaluate the stream (G) is SyncIO, where Id ~> SyncIO (trivially).

Say we'd like to compile a unconsed tail without getting scope lookup failures, and our plan of attack was swapping out the scopeIds with Scope references. But now a problem arises; Pull is defined in F, but Scope is defined in G. With the current structure there is no way of embedding Scope[G] into Pull[F, O, R].

Say this tail Pull defined in F was instead compiled in H where H and G both do not occur in any partial orderings, or more specifically, H[x] !<: G[x] and G[x] !<: H[x].

// A diamond structure of effect structures
trait Effect1[A]
trait Effect2[A]
trait SubEffect[A] extends Effect1[A] with Effect2[A]

val myResource: Resource[SubEffect, Int] = ???

Stream
  .resource(myResource)
  .repeatN(3)
  .pull
  .uncons1
  .flatMap{
    case None => Pull.done
    case Some((hd, tl)) =>
      // notice that tl is now evaluated in Effect1, but the origin stream still remains in SubEffect
      tl.covary[Effect1].compile.drain
  }
  .stream
  // the  stream will be compiled in Effect2 which does not share any partial ordering with Effect1
  .covary[Effect2]
  .compile.drain

In my experiment I have explicitly encoded this issue here. In fs2, instead an exception is (justifiably) thrown when Pull's Scope cannot find the scopeId in the tree.

By taking the liberty to introduce G ~> F, there is a composition of G ~> F and then F ~> H such that G ~> H, although, such composition is almost certainly a lossy one and may introduce other issues related to covariance. More than what already may exist for translation.

So what if we just make sure to only open scopes in F? What if we never reason with the G and the relation F ~> G?
In an earlier draft of my experiment I tried something like that here. This implies that any translation F ~> F2 must also guarantee that F2 has all the required tools to handle scopes (Compiler.Target). I have not explored this solution as elaborately yet and I am unsure how well it works with the current model for interruption, nevertheless I think it is an interesting avenue to explore since it allows invariant operators (GetScope) to be public.

It may seem strange that you are able to control future consumers, so to speak, but that's exactly the ability that makes Stream more powerful than Resource, for example it's what makes concurrently capable of surfacing background errors to the foreground, whereas Resource.background can only do the reverse.

I spent a while trying to grok the whys in fs2 and want to contribute to making a sound and expressive streaming model. So any discussion is much appreciated!

Disregarding soundness issues of moving scopes between effects, @armanbilge, what issues did you hit when passing the active scope to the background stream in concurrently #3112 (comment)? In particular, this commit 779a170

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants