Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compile on Pure Stream causes unexpected behavior in Pipe #1838

Closed
sloshy opened this issue Apr 9, 2020 · 3 comments · Fixed by #1839
Closed

Compile on Pure Stream causes unexpected behavior in Pipe #1838

sloshy opened this issue Apr 9, 2020 · 3 comments · Fixed by #1839

Comments

@sloshy
Copy link
Contributor

sloshy commented Apr 9, 2020

Assume we have the following defined:

val ioStream: Stream[IO, Any] = ???
val ntq: NoneTerminatedQueue[IO, Any] = ???

val badPurePipe: Pipe[Pure, Any, Any] = in => Stream.emits(in.compile.toList)

val enqueueThenTerminate = ioStream.map(_.some).through(ntq.enqueue).onFinalize(ntq.enqueue1(None))

val badDequeue = ntq.dequeue.through(badPurePipe)

val streamThatHangsForever = badDequeue.concurrently(enqueueThenTerminate)

In badPurePipe the input stream is being compiled to a List, which triggers some strange behavior as the stream is being evaluated. When ran, the stream will hang forever.

For some reason, the following won't hang forever, and it will work as expected including printing to stdout:

val streamThatWorks = (Stream.eval(IO(println("something"))).drain ++ badDequeue).concurrently(enqueueThenTerminate)

The issue appears to be that being able to call compile on a Stream inside of a Pipe[Pure, A, B], when it's passed to a stream that is not Pure but instead something like IO, it causes undefined behavior. The correct alternative, in this case, is to prefer .fold, .chunkN, and other Stream-specific operators that do not compile the stream but instead allow the stream to continue as expected. That, or to use a Pipe[F, A, B] instead to guarantee that compile returns an F[_] value.

Special thanks to @Daenyth for debugging this issue with me on gitter.

@mpilquist
Copy link
Member

Ouch... is this the same as this unsoundness minimization?

@ val p: Pipe[Pure, Int, List[Int]] = in => Stream.emit(in.toList)
p: Stream[Pure, Int] => Stream[Pure, List[Int]] = <function1>

@ Stream.eval(IO(1)).through(p)
java.lang.ClassCastException: class cats.effect.IO$Delay cannot be cast to class cats.effect.SyncIO (cats.effect.IO$Delay and cats.effect.SyncIO are in unnamed module of loader ammonite.runtime.SpecialClassLoader @4c5ae43b)
  cats.effect.SyncIOSync.attempt(SyncIO.scala:428)
  fs2.internal.CompileScope.interruptibleEval(CompileScope.scala:374)
  fs2.internal.Algebra$.go$1(Algebra.scala:253)
  fs2.internal.Algebra$.compileLoop(Algebra.scala:343)
  fs2.internal.Algebra$.compile(Algebra.scala:133)
  fs2.Stream$Compiler$.$anonfun$compile$1(Stream.scala:4298)
  cats.effect.SyncIO.$anonfun$bracketCase$1(SyncIO.scala:182)
  cats.effect.SyncIO.$anonfun$flatMap$1(SyncIO.scala:74)
  cats.effect.internals.IORunLoop$.liftedTree3$1(IORunLoop.scala:238)
  cats.effect.internals.IORunLoop$.step(IORunLoop.scala:238)
  cats.effect.IO.unsafeRunTimed(IO.scala:321)
  cats.effect.IO.unsafeRunSync(IO.scala:240)
  cats.effect.SyncIO.unsafeRunSync(SyncIO.scala:51)
  fs2.Stream$PureOps$.to_$extension(Stream.scala:3834)
  fs2.Stream$PureOps$.toList$extension(Stream.scala:3844)
  ammonite.$sess.cmd3$$anonfun$1.apply(cmd3.sc:1)
  ammonite.$sess.cmd3$$anonfun$1.apply(cmd3.sc:1)
  fs2.Stream$.through$extension(Stream.scala:2726)
  ammonite.$sess.cmd4$.<init>(cmd4.sc:1)
  ammonite.$sess.cmd4$.<clinit>(cmd4.sc)

@sloshy
Copy link
Contributor Author

sloshy commented Apr 9, 2020

@mpilquist sure looks related, though I don't see any exceptions when I ran my less minimized example, which obfuscated the problem. I'm confused as to why the stream that works ends up working in spite of all that... Likely doesn't matter, but it was really confusing and interesting to stumble on it in that way.

EDIT: It's possible that it doesn't actually "work", but it just doesn't hang, and it still doesn't do what I expect in the end. The full example this is pulled from is a bit more complex.

mpilquist added a commit to mpilquist/fs2 that referenced this issue Apr 9, 2020
@mpilquist
Copy link
Member

OK just pushed a fix. Thanks for finding this!

SystemFw added a commit that referenced this issue Apr 13, 2020
Fix #1838 soundness bug that allowed a pure pull to be used on effectful streams
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants