Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Premature resource finalization in Stream.bracket #3461

Open
hemirime opened this issue Aug 8, 2024 · 3 comments
Open

Premature resource finalization in Stream.bracket #3461

hemirime opened this issue Aug 8, 2024 · 3 comments
Labels

Comments

@hemirime
Copy link

hemirime commented Aug 8, 2024

Version: co.fs2::fs2-core::3.10.2

When running the code https://scastie.scala-lang.org/btTjBsN3QWGSP8S0AMPz6A, the Stream.bracket resource closes unexpectedly before the inner process and emit streams complete their operations.
It seems that somehow the Stream.bracket sticks to the incoming stream argument in the processData function.

The lines .chunkN(1).unchunks are needed only to simplify the code, which is more like this https://scastie.scala-lang.org/pFqksNB6TIeVl9emDlInjA

  val input = Stream
    .force {
      Queue
        .unbounded[IO, Option[Int]]
        .flatTap { q =>
          Vector(3, 1, 2, 3).map(Some.apply).traverse(q.offer) *> q.offer(None)
        }
        // .map(q => Stream.fromQueueNoneTerminated(q, 1)) // [1] works
        .map(q => Stream.repeatEval(q.take).unNoneTerminate) // [2] doesn't
    }
    .evalTap(x => IO.println(s"> $x"))

In the actual code, line [2] is used inside a third-party library.

Output:

> 3
=== START PROCESS === with 3 elems
> 1
> 2
>>> Got Chunk(1, 2)
> 3
=== END PROCESS ===
>>> Got Chunk(3)
< Emitting element #0
< Emitting element #1
< Emitting element #2
Vector(element #0, element #1, element #2)

Expected output:

> 3
=== START PROCESS === with 3 elems
> 1
> 2
>>> Got Chunk(1, 2)
> 3
>>> Got Chunk(3)
< Emitting element #0
< Emitting element #1
< Emitting element #2
=== END PROCESS ===
Vector(element #0, element #1, element #2)
@hemirime hemirime added the bug label Aug 8, 2024
@mpilquist
Copy link
Member

This issue is caused by the re-threading of the tail stream here:

    input.pull.uncons1
      .flatMap {
        case None => Pull.done
        case Some(head -> tail) =>
          Pull.extendScopeTo(tail)
            .evalMap(t => IO(Data(head, t)))           // <--- tail is returned here
            .flatMap(Pull.output1)
      }
      .stream
      .flatMap { data =>
        processData(data.size, data.stream).       // <-- and then later evaluated here
      }

The tail of the result from uncons1 is returned outside of the normal flow of the pull, resulting in finalizers running at an unexpected time. This seems like the same type of scope rethreading issue that requires StepLeg to be a thing when pulling from different streams. Is this re-threading essential to the use case in question?

@hemirime
Copy link
Author

Yes, this re-threading is essential because the head of the stream always contains the input parameters needed for the processData function and the rest of the stream is a data stream for this function.

@onionyuyixi
Copy link

Yes, this re-threading is essential because the head of the stream always contains the input parameters needed for the processData function and the rest of the stream is a data stream for this function.

I encountered a quetion that after deleting codes chunkN(1) and unchunk , the program ran as what we expected and the result is correct , but i do not understanding the reason which confused me ,sincerely hope you point the hidden key points.The picture as following
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants