Skip to content

Commit

Permalink
Spawn subprocess supervisor on this same node, as child component
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 17, 2023
1 parent 4320d63 commit 3e2decb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
13 changes: 2 additions & 11 deletions lib/membrane/core/parent/child_life_controller/startup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do
log_metadata: log_metadata
}

server_module =
component_module =
case child.component_type do
:element ->
Core.Element
Expand All @@ -220,17 +220,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do
Core.Bin
end

start_fun = fn supervisor, parent_supervisor ->
server_module.start(
Map.merge(params, %{
subprocess_supervisor: supervisor,
parent_supervisor: parent_supervisor
})
)
end

with {:ok, child_pid} <-
SubprocessSupervisor.start_component(supervisor, name, start_fun),
SubprocessSupervisor.start_component(supervisor, name, component_module, params),
{:ok, clock} <- receive_clock(name) do
%ChildEntry{
child
Expand Down
39 changes: 30 additions & 9 deletions lib/membrane/core/subprocess_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.Core.SubprocessSupervisor do
use Bunch
use GenServer

alias Membrane.Core
alias Membrane.Core.Observability

require Membrane.Core.Message, as: Message
Expand All @@ -28,14 +29,16 @@ defmodule Membrane.Core.SubprocessSupervisor do
Starts a Membrane component under the supervisor
"""
@spec start_component(
supervisor_pid,
pid(),
name :: Membrane.Child.name(),
(supervisor_pid, parent_supervisor_pid -> {:ok, child_pid} | {:error, reason :: any()})
# (supervisor_pid, parent_supervisor_pid -> {:ok, child_pid} | {:error, reason :: any()})
component_module :: Core.Bin | Core.Element,
options :: Core.Bin.options() | Core.Element.options()
) ::
{:ok, child_pid} | {:error, reason :: any()}
when child_pid: pid(), supervisor_pid: pid(), parent_supervisor_pid: pid
def start_component(supervisor, name, start_fun) do
Message.call!(supervisor, :start_component, [name, start_fun])
{:ok, pid()} | {:error, reason :: any()}
# when child_pid: pid(), supervisor_pid: pid(), parent_supervisor_pid: pid
def start_component(supervisor, name, component_module, options) do
Message.call!(supervisor, :start_component, [name, component_module, options])
end

@doc """
Expand Down Expand Up @@ -103,10 +106,16 @@ defmodule Membrane.Core.SubprocessSupervisor do
end

@impl true
def handle_call(Message.new(:start_component, [name, start_fun]), _from, state) do
subprocess_supervisor = start_link!()
def handle_call(Message.new(:start_component, [name, component_module, options]), _from, state) do
subprocess_supervisor = start_link_subprocess_supervisor!(options)

with {:ok, child_pid} <- start_fun.(subprocess_supervisor, self()) do
options =
Map.merge(options, %{
subprocess_supervisor: subprocess_supervisor,
parent_supervisor: self()
})

with {:ok, child_pid} <- component_module.start(options) do
state =
state
|> put_in([:children, child_pid], %{
Expand Down Expand Up @@ -211,6 +220,18 @@ defmodule Membrane.Core.SubprocessSupervisor do
end
end

defp start_link_subprocess_supervisor!(component_options) do
case component_options[:node] do
nil ->
start_link!()

node ->
{:ok, pid} = :rpc.call(node, GenServer, :start, [__MODULE__, self()])
Process.link(pid)
pid
end
end

defp handle_exit(%{role: :subprocess_supervisor} = data, reason, state) do
case Map.fetch(state.children, data.child_pid) do
{:ok, child_data} ->
Expand Down

0 comments on commit 3e2decb

Please sign in to comment.