Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in Supervisors when pipeline is distributed #556

Merged
merged 9 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions lib/membrane/core/parent/child_life_controller/startup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do
log_metadata: log_metadata
}

server_module =
component_module =
case child.component_type do
:element ->
Core.Element
Expand All @@ -220,17 +220,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do
Core.Bin
end

start_fun = fn supervisor, parent_supervisor ->
server_module.start(
Map.merge(params, %{
subprocess_supervisor: supervisor,
parent_supervisor: parent_supervisor
})
)
end

with {:ok, child_pid} <-
SubprocessSupervisor.start_component(supervisor, name, start_fun),
SubprocessSupervisor.start_component(supervisor, name, component_module, params),
{:ok, clock} <- receive_clock(name) do
%ChildEntry{
child
Expand Down
37 changes: 28 additions & 9 deletions lib/membrane/core/subprocess_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.Core.SubprocessSupervisor do
use Bunch
use GenServer

alias Membrane.Core
alias Membrane.Core.Observability

require Membrane.Core.Message, as: Message
Expand All @@ -28,14 +29,14 @@ defmodule Membrane.Core.SubprocessSupervisor do
Starts a Membrane component under the supervisor
"""
@spec start_component(
supervisor_pid,
supervisor_pid :: pid(),
name :: Membrane.Child.name(),
(supervisor_pid, parent_supervisor_pid -> {:ok, child_pid} | {:error, reason :: any()})
component_module :: Core.Bin | Core.Element,
options :: map()
) ::
{:ok, child_pid} | {:error, reason :: any()}
when child_pid: pid(), supervisor_pid: pid(), parent_supervisor_pid: pid
def start_component(supervisor, name, start_fun) do
Message.call!(supervisor, :start_component, [name, start_fun])
{:ok, child_pid :: pid()} | {:error, reason :: any()}
def start_component(supervisor, name, component_module, options) do
Message.call!(supervisor, :start_component, [name, component_module, options])
end

@doc """
Expand Down Expand Up @@ -103,10 +104,16 @@ defmodule Membrane.Core.SubprocessSupervisor do
end

@impl true
def handle_call(Message.new(:start_component, [name, start_fun]), _from, state) do
subprocess_supervisor = start_link!()
def handle_call(Message.new(:start_component, [name, component_module, options]), _from, state) do
subprocess_supervisor = start_link_subprocess_supervisor!(options)

with {:ok, child_pid} <- start_fun.(subprocess_supervisor, self()) do
options =
Map.merge(options, %{
subprocess_supervisor: subprocess_supervisor,
parent_supervisor: self()
})

with {:ok, child_pid} <- component_module.start(options) do
state =
state
|> put_in([:children, child_pid], %{
Expand Down Expand Up @@ -211,6 +218,18 @@ defmodule Membrane.Core.SubprocessSupervisor do
end
end

defp start_link_subprocess_supervisor!(component_options) do
case component_options[:node] do
nil ->
start_link!()

node ->
{:ok, pid} = :rpc.call(node, GenServer, :start, [__MODULE__, self()])
Process.link(pid)
pid
end
end

defp handle_exit(%{role: :subprocess_supervisor} = data, reason, state) do
case Map.fetch(state.children, data.child_pid) do
{:ok, child_data} ->
Expand Down
57 changes: 32 additions & 25 deletions test/membrane/integration/distributed_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,44 @@ defmodule Membrane.Integration.DistributedPipelineTest do

import Membrane.Testing.Assertions

alias Membrane.Support.Distributed
alias Membrane.Testing

setup do
hostname = start_nodes()
on_exit(fn -> kill_node(hostname) end)
another_node = start_another_node()
on_exit(fn -> kill_node(another_node) end)
[first_node: node(self()), second_node: another_node]
end

test "if distributed pipeline works properly" do
defmodule Pipeline do
use Membrane.Pipeline
alias Membrane.Support.Distributed.{Sink, Source}

@impl true
def handle_init(_ctx, _opts) do
{[
spec: [
{child(:source, %Source{output: [1, 2, 3, 4, 5]}), node: :"first@127.0.0.1"},
{get_child(:source)
|> via_in(:input, toilet_capacity: 100, throttling_factor: 50)
|> child(:sink, Sink), node: :"second@127.0.0.1"}
]
], %{}}
end
end

pipeline = Membrane.Testing.Pipeline.start_link_supervised!(module: Pipeline)

assert_end_of_stream(pipeline, :sink)
test "if distributed pipeline works properly", context do
pipeline =
Testing.Pipeline.start_link_supervised!(
module: Distributed.Pipeline,
custom_args: context
)

assert_pipeline_notified(pipeline, :sink_bin, :end_of_stream)

assert context.first_node == node(pipeline)

assert context.first_node ==
Testing.Pipeline.get_child_pid!(pipeline, :source)
|> node()

assert context.second_node ==
Testing.Pipeline.get_child_pid!(pipeline, :sink_bin)
|> node()

assert context.second_node ==
Testing.Pipeline.get_child_pid!(pipeline, [:sink_bin, :sink])
|> node()

Testing.Pipeline.terminate(pipeline)
end

defp start_nodes() do
defp start_another_node() do
System.cmd("epmd", ["-daemon"])
{:ok, _pid} = Node.start(:"first@127.0.0.1", :longnames)
_start_result = Node.start(:"first@127.0.0.1", :longnames)
{:ok, _pid, hostname} = :peer.start(%{host: ~c"127.0.0.1", name: :second})
:rpc.block_call(hostname, :code, :add_paths, [:code.get_path()])
hostname
Expand Down
40 changes: 39 additions & 1 deletion test/support/distributed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ defmodule Membrane.Support.Distributed do

defmodule Sink do
@moduledoc false

use Membrane.Sink

def_input_pad :input, flow_control: :manual, accepted_format: _any, demand_unit: :buffers
Expand All @@ -56,4 +55,43 @@ defmodule Membrane.Support.Distributed do
{[demand: {:input, 1}], state}
end
end

defmodule SinkBin do
@moduledoc false
use Membrane.Bin

def_input_pad :input, accepted_format: _any

@impl true
def handle_init(_ctx, _opts) do
spec = bin_input() |> child(:sink, Sink)
{[spec: spec], %{}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
{[notify_parent: :end_of_stream], state}
end
end

defmodule Pipeline do
@moduledoc false
use Membrane.Pipeline

@impl true
def handle_init(_ctx, opts) do
first_node = opts.first_node
second_node = opts.second_node

{[
spec: [
{child(:source, %Source{output: [1, 2, 3, 4, 5]}), node: first_node},
{
get_child(:source) |> child(:sink_bin, SinkBin),
node: second_node
}
]
], %{}}
end
end
end