Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

noproc errors - connecting finite GenStage to Flow.partition #91

Open
sunaku opened this issue Sep 11, 2019 · 0 comments

Comments

@sunaku
Copy link
Contributor

commented Sep 11, 2019

Hello,

I'm using GenStage 0.14.2 and Flow master (at 1ffac6a) under Elixir 1.9.0 and Erlang/OTP 22, where I'm encountering :noproc errors when I connect a short-lived GenStage producer to a Flow and then immediately partition that flow. Below is a minimal example for reproduction (see related issue #88).

In my actual use case, I'm connecting a large (but finite) GenStage to a Flow partition with 32 stages.

Thanks for your consideration.

Reproduction steps

  1. Sanity check: everything Just Works when it's Flow-only and GenStage isn't involved. ✔️
  2. GenStage has 3 items, from_stages() has 1 stage, and partition() has 1 stage. ✔️
  3. GenStage has 3 items, from_stages() has 1 stage, and partition() has 2 stages. ✔️
  4. GenStage has 3 items, from_stages() has 1 stage, and partition() has 3 stages. 💥
Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:32:32] [ds:32:32:10] [async-threads:1] [hipe]

Interactive Elixir (1.9.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Flow.from_enumerable(1..3) |> Flow.partition(stages: 3) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.213.0>: sleep 2"
"#PID<0.203.0>: sleep 1"
"#PID<0.196.0>: sleep 3"
[1000, 2000, 3000]
iex(2)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 1) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.231.0>: sleep 1"
"#PID<0.231.0>: sleep 2"
"#PID<0.231.0>: sleep 3"
[1000, 2000, 3000]
iex(3)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 2) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.238.0>: sleep 1"
"#PID<0.239.0>: sleep 3"
"#PID<0.238.0>: sleep 2"
[3000, 1000, 2000]
iex(4)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 3) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.246.0>: sleep 2"
"#PID<0.247.0>: sleep 1"

16:29:54.396 pid=<0.248.0> [info]  GenStage consumer #PID<0.248.0> is stopping after receiving cancel from producer #PID<0.245.0> with reason: :noproc


16:29:54.444 pid=<0.248.0> [error] GenServer #PID<0.248.0> terminating
** (stop) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
Last message: {:DOWN, #Reference<0.1394119603.2769551375.77567>, :process, #PID<0.245.0>, :noproc}
State: {%{}, %{done?: true, producers: %{}, trigger: #Function<2.127884580/3 in Flow.Window.Global.materialize/5>}, {2, 3}, [], #Function<32.81753312/4 in Flow.Materialize.mapper_ops/1>}
"#PID<0.247.0>: sleep 3"
** (exit) exited in: GenStage.close_stream(%{#Reference<0.1394119603.2769551372.77002> => {:subscribed, #PID<0.246.0>, :transient, 500, 1000, 1000}, #Reference<0.1394119603.2769551372.77003> => {:subscribed, #PID<0.247.0>, :transient, 500, 1000, 1000}})
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (gen_stage) lib/gen_stage/stream.ex:160: GenStage.Stream.close_stream/1
    (elixir) lib/stream.ex:1400: Stream.do_resource/5
    (elixir) lib/enum.ex:3023: Enum.reverse/1
    (elixir) lib/enum.ex:2668: Enum.to_list/1

Environment details

$ uname -a
Linux myhost 4.1.15.pnotify #18 SMP Thu May 18 15:50:05 PDT 2017 x86_64 GNU/Linux

$ elixir -v
Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [hipe]

Elixir 1.9.0 (compiled with Erlang/OTP 22)

$ cat mix.lock
%{
  "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm"},
  "flow": {:git, "https://github.com/plataformatec/flow.git", "1ffac6a801602bf8b02192488e58ce5728b581aa", []},
  "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},
  "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
  "mix_test_watch": {:hex, :mix_test_watch, "0.9.0", "c72132a6071261893518fa08e121e911c9358713f62794a90c95db59042af375", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm"},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant
You can’t perform that action at this time.