Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from_stages - noproc errors when GenStage producer finishes early #88

Closed
sunaku opened this issue Jul 24, 2019 · 4 comments

Comments

@sunaku
Copy link
Contributor

commented Jul 24, 2019

Hello,

I'm getting :noproc errors when I connect a short-lived GenStage producer to a Flow using Flow.from_stages(). Below is a minimal example that reproduces the problem I'm seeing:

  1. Passing through Flow using multiple stages works correctly. ✔️
  2. Mapping through Flow using a single stage works correctly. ✔️
  3. Mapping through Flow using multiple stages triggers the error. 💥
Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:32:32] [ds:32:32:10] [async-threads:1] [hipe]

Interactive Elixir (1.9.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 2) |> Enum.to_list()
[1, 2, 3]
iex(2)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms);
ms end) |> Enum.to_list()
"#PID<0.194.0>: sleep 1"
"#PID<0.194.0>: sleep 2"
"#PID<0.194.0>: sleep 3"
[1000, 2000, 3000]
iex(3)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 2) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms);
ms end) |> Enum.to_list()
"#PID<0.200.0>: sleep 1"

12:14:48.166 [info]  GenStage consumer #PID<0.201.0> is stopping after receiving cancel from producer #PID<0.197.0> with reason: :noproc


12:14:48.180 [error] GenServer #PID<0.201.0> terminating
** (stop) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
Last message: {:DOWN, #Reference<0.3205571650.1190133762.109920>, :process, #PID<0.197.0>, :noproc}
State: {%{}, %{done?: true, producers: %{}, trigger: #Function<2.127884580/3 in Flow.Window.Global.materialize/5>}, {1, 2}, [], #Function<33.87744541/4 in Flow.Materialize.mapper_ops/1>}
"#PID<0.200.0>: sleep 2"
"#PID<0.200.0>: sleep 3"
** (exit) exited in: GenStage.close_stream(%{#Reference<0.3205571650.1190133762.109930> => {:subscribed, #PID<0.200.0>, :transient, 500, 1000, 1000}})
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (gen_stage) lib/gen_stage/stream.ex:160: GenStage.Stream.close_stream/1
    (elixir) lib/stream.ex:1400: Stream.do_resource/5
    (elixir) lib/enum.ex:3023: Enum.reverse/1
    (elixir) lib/enum.ex:2668: Enum.to_list/1
iex(3)>

How can I safely connect a short-lived GenStage producer to a Flow using multiple stages? 😕 See also GenStage: How to cancel a Flow from the producer? whose answer relies on an outdated GenStage API.

Thanks for your consideration.

@josevalim

This comment has been minimized.

Copy link
Member

commented Jul 24, 2019

What is your flow version? Have you tried flow master?

@josevalim

This comment has been minimized.

Copy link
Member

commented Jul 24, 2019

Also please report your gen_stage version and make sure you are running on latest gen_stage too.

@sunaku

This comment has been minimized.

Copy link
Contributor Author

commented Jul 24, 2019

Sorry for omitting such basic information 😅 Here are the versions I'm using, according to mix.lock: 💁‍♂

  "flow": {:hex, :flow, "0.14.3", "0d92991fe53035894d24aa8dec10dcfccf0ae00c4ed436ace3efa9813a646902", [:mix], [{:gen_stage, "~> 0.14.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm"},
  "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},

I will try to reproduce the issue using the git master versions of these packages and report back shortly.

@sunaku

This comment has been minimized.

Copy link
Contributor Author

commented Jul 24, 2019

Thanks! 🎉 Upgrading to the git master version of Flow (although GenStage hasn't changed) has solved it:

-  "flow": {:hex, :flow, "0.14.3", "0d92991fe53035894d24aa8dec10dcfccf0ae00c4ed436ace3efa9813a646902", [:mix], [{:gen_stage, "~> 0.14.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm"},
+  "flow": {:git, "https://github.com/plataformatec/flow.git", "1ffac6a801602bf8b02192488e58ce5728b581aa", []},
   "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},

@josevalim josevalim closed this Jul 24, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.