diff --git a/lib/membrane/core/parent/child_life_controller/startup_utils.ex b/lib/membrane/core/parent/child_life_controller/startup_utils.ex index 3269c41d4..283dfc723 100644 --- a/lib/membrane/core/parent/child_life_controller/startup_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/startup_utils.ex @@ -205,7 +205,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do log_metadata: log_metadata } - server_module = + component_module = case child.component_type do :element -> Core.Element @@ -220,17 +220,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do Core.Bin end - start_fun = fn supervisor, parent_supervisor -> - server_module.start( - Map.merge(params, %{ - subprocess_supervisor: supervisor, - parent_supervisor: parent_supervisor - }) - ) - end - with {:ok, child_pid} <- - SubprocessSupervisor.start_component(supervisor, name, start_fun), + SubprocessSupervisor.start_component(supervisor, name, component_module, params), {:ok, clock} <- receive_clock(name) do %ChildEntry{ child diff --git a/lib/membrane/core/subprocess_supervisor.ex b/lib/membrane/core/subprocess_supervisor.ex index 5a3d152ab..a1cff8552 100644 --- a/lib/membrane/core/subprocess_supervisor.ex +++ b/lib/membrane/core/subprocess_supervisor.ex @@ -8,6 +8,7 @@ defmodule Membrane.Core.SubprocessSupervisor do use Bunch use GenServer + alias Membrane.Core alias Membrane.Core.Observability require Membrane.Core.Message, as: Message @@ -28,14 +29,16 @@ defmodule Membrane.Core.SubprocessSupervisor do Starts a Membrane component under the supervisor """ @spec start_component( - supervisor_pid, + pid(), name :: Membrane.Child.name(), - (supervisor_pid, parent_supervisor_pid -> {:ok, child_pid} | {:error, reason :: any()}) + # (supervisor_pid, parent_supervisor_pid -> {:ok, child_pid} | {:error, reason :: any()}) + component_module :: Core.Bin | Core.Element, + options :: Core.Bin.options() | Core.Element.options() ) :: - {:ok, child_pid} | {:error, reason :: any()} - when child_pid: pid(), supervisor_pid: pid(), parent_supervisor_pid: pid - def start_component(supervisor, name, start_fun) do - Message.call!(supervisor, :start_component, [name, start_fun]) + {:ok, pid()} | {:error, reason :: any()} + # when child_pid: pid(), supervisor_pid: pid(), parent_supervisor_pid: pid + def start_component(supervisor, name, component_module, options) do + Message.call!(supervisor, :start_component, [name, component_module, options]) end @doc """ @@ -103,10 +106,16 @@ defmodule Membrane.Core.SubprocessSupervisor do end @impl true - def handle_call(Message.new(:start_component, [name, start_fun]), _from, state) do - subprocess_supervisor = start_link!() + def handle_call(Message.new(:start_component, [name, component_module, options]), _from, state) do + subprocess_supervisor = start_link_subprocess_supervisor!(options) - with {:ok, child_pid} <- start_fun.(subprocess_supervisor, self()) do + options = + Map.merge(options, %{ + subprocess_supervisor: subprocess_supervisor, + parent_supervisor: self() + }) + + with {:ok, child_pid} <- component_module.start(options) do state = state |> put_in([:children, child_pid], %{ @@ -211,6 +220,29 @@ defmodule Membrane.Core.SubprocessSupervisor do end end + defp start_link_subprocess_supervisor!(component_options) do + case component_options[:node] do + nil -> + start_link!() + + node -> + {:ok, pid} = :rpc.call(node, GenServer, :start, [__MODULE__, self()]) + + try do + Process.link(pid) + rescue + e in ErlangError -> + with %ErlangError{original: :noproc} <- e do + send(self(), {:EXIT, pid, e.reason}) + else + e -> reraise e, __STACKTRACE__ + end + end + + pid + end + end + defp handle_exit(%{role: :subprocess_supervisor} = data, reason, state) do case Map.fetch(state.children, data.child_pid) do {:ok, child_data} ->