From 0821fb519c159556b8ad9ae1adbd5f025e13d594 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 19 May 2023 16:36:58 +0200 Subject: [PATCH] Start AtomicDemand Worker under supervisor --- lib/membrane/core/element/atomic_demand.ex | 51 +++++++++++-------- .../atomic_demand/atomic_flow_status.ex | 6 +-- .../atomic_demand/distributed_atomic.ex | 7 +-- .../distributed_atomic/worker.ex | 14 ++--- .../core/element/effective_flow_controller.ex | 2 +- lib/membrane/core/element/pad_controller.ex | 21 ++++---- 6 files changed, 53 insertions(+), 48 deletions(-) diff --git a/lib/membrane/core/element/atomic_demand.ex b/lib/membrane/core/element/atomic_demand.ex index a82212926..4e555c237 100644 --- a/lib/membrane/core/element/atomic_demand.ex +++ b/lib/membrane/core/element/atomic_demand.ex @@ -1,6 +1,6 @@ defmodule Membrane.Core.Element.AtomicDemand do @moduledoc false - alias Membrane.Core.Element.AtomicDemand.AtomicFlowStatus + alias Membrane.Core.Element.EffectiveFlowController alias __MODULE__.{ @@ -46,40 +46,49 @@ defmodule Membrane.Core.Element.AtomicDemand do defstruct @enforce_keys ++ [buffered_decrementation: 0, toilet_overflowed?: false] - @spec new( - receiver_effective_flow_control :: EffectiveFlowController.effective_flow_control(), - receiver_process :: Process.dest(), - receiver_demand_unit :: Membrane.Buffer.Metric.unit(), - sender_process :: Process.dest(), - sender_pad_ref :: Pad.ref(), - toilet_capacity :: non_neg_integer() | nil, - throttling_factor :: pos_integer() | nil - ) :: t + @spec new(%{ + :receiver_effective_flow_control => EffectiveFlowController.effective_flow_control(), + :receiver_process => Process.dest(), + :receiver_demand_unit => Membrane.Buffer.Metric.unit(), + :sender_process => Process.dest(), + :sender_pad_ref => Pad.ref(), + :supervisor => pid(), + optional(:toilet_capacity) => non_neg_integer() | nil, + optional(:throttling_factor) => pos_integer() | nil + }) :: t def new( - receiver_effective_flow_control, - receiver_process, - receiver_demand_unit, - sender_process, - sender_pad_ref, - toilet_capacity \\ nil, - throttling_factor \\ nil + %{ + receiver_effective_flow_control: receiver_effective_flow_control, + receiver_process: receiver_process, + receiver_demand_unit: receiver_demand_unit, + sender_process: sender_process, + sender_pad_ref: sender_pad_ref, + supervisor: supervisor + } = options ) do - %DistributedAtomic{worker: worker} = counter = DistributedAtomic.new() + toilet_capacity = options[:toilet_capacity] + throttling_factor = options[:throttling_factor] + + counter = DistributedAtomic.new(supervisor: supervisor) throttling_factor = cond do throttling_factor != nil -> throttling_factor - node(sender_process) == node(worker) -> @default_throttling_factor + node(sender_process) == node(counter.worker) -> @default_throttling_factor true -> @distributed_default_throttling_factor end - receiver_status = AtomicFlowStatus.new({:resolved, receiver_effective_flow_control}) + receiver_status = + AtomicFlowStatus.new( + {:resolved, receiver_effective_flow_control}, + supervisor: supervisor + ) %__MODULE__{ counter: counter, receiver_status: receiver_status, receiver_process: receiver_process, - sender_status: AtomicFlowStatus.new(:to_be_resolved), + sender_status: AtomicFlowStatus.new(:to_be_resolved, supervisor: supervisor), sender_process: sender_process, sender_pad_ref: sender_pad_ref, toilet_capacity: toilet_capacity || default_toilet_capacity(receiver_demand_unit), diff --git a/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex b/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex index ae672a3da..c9ca3b5a3 100644 --- a/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex +++ b/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex @@ -7,11 +7,11 @@ defmodule Membrane.Core.Element.AtomicDemand.AtomicFlowStatus do @type t :: DistributedAtomic.t() @type value :: {:resolved, EffectiveFlowController.effective_flow_control()} | :to_be_resolved - @spec new(value) :: t - def new(initial_value) do + @spec new(value, supervisor: pid()) :: t + def new(initial_value, supervisor: supervisor) do initial_value |> flow_status_to_int() - |> DistributedAtomic.new() + |> DistributedAtomic.new(supervisor: supervisor) end @spec get(t) :: value() diff --git a/lib/membrane/core/element/atomic_demand/distributed_atomic.ex b/lib/membrane/core/element/atomic_demand/distributed_atomic.ex index 86e730a24..7a7fddf1e 100644 --- a/lib/membrane/core/element/atomic_demand/distributed_atomic.ex +++ b/lib/membrane/core/element/atomic_demand/distributed_atomic.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic do # The module allows to create and modify the value of a counter in the same manner both when the counter is about to be accessed # from the same node, and from different nodes. + alias Membrane.Core.SubprocessSupervisor alias __MODULE__.Worker @enforce_keys [:worker, :atomic_ref] @@ -16,10 +17,10 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic do defguardp on_the_same_node_as_self(distributed_atomic) when distributed_atomic.worker |> node() == self() |> node() - @spec new(integer() | nil) :: t - def new(initial_value \\ nil) do + @spec new(integer() | nil, supervisor: pid()) :: t + def new(initial_value \\ nil, supervisor: supervisor) do atomic_ref = :atomics.new(1, []) - {:ok, worker} = Worker.start_link(self()) + {:ok, worker} = SubprocessSupervisor.start_utility(supervisor, Worker) distributed_atomic = %__MODULE__{ atomic_ref: atomic_ref, diff --git a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex index 296e6e6d0..5f60450d7 100644 --- a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex +++ b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex @@ -8,13 +8,12 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do @type t :: pid() - @spec start_link(pid()) :: {:ok, t} - def start_link(owner_pid), do: GenServer.start_link(__MODULE__, owner_pid) + @spec start_link(any()) :: {:ok, t} + def start_link(opts), do: GenServer.start_link(__MODULE__, opts) @impl true - def init(owner_pid) do - ref = Process.monitor(owner_pid) - {:ok, %{ref: ref}, :hibernate} + def init(_opts) do + {:ok, nil, :hibernate} end @impl true @@ -40,9 +39,4 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do :atomics.put(atomic_ref, 1, value) {:noreply, nil} end - - @impl true - def handle_info({:DOWN, ref, _process, _pid, _reason}, %{ref: ref} = state) do - {:stop, :normal, state} - end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 9d74a81c7..8db26fb79 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -8,7 +8,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # If element A is linked via its input auto pads only to the :push output pads of other elements, then effective flow # control of element A will be set to :push. Otherwise, if element A is linked via its input auto pads to at least one - # :pull output pad, element A will set itss effective flow control to :pull and will forward this information + # :pull output pad, element A will set its effective flow control to :pull and will forward this information # via its output auto pads. # Resolving effective flow control is performed on diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 388b58106..d7729af4e 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -10,8 +10,8 @@ defmodule Membrane.Core.Element.PadController do alias Membrane.Core.Element.{ ActionHandler, - CallbackContext, AtomicDemand, + CallbackContext, EffectiveFlowController, EventController, InputQueue, @@ -160,15 +160,16 @@ defmodule Membrane.Core.Element.PadController do EffectiveFlowController.get_pad_effective_flow_control(endpoint.pad_ref, state) atomic_demand = - AtomicDemand.new( - pad_effective_flow_control, - self(), - input_demand_unit || :buffers, - other_endpoint.pid, - other_endpoint.pad_ref, - endpoint.pad_props[:toilet_capacity], - endpoint.pad_props[:throttling_factor] - ) + AtomicDemand.new(%{ + receiver_effective_flow_control: pad_effective_flow_control, + receiver_process: self(), + receiver_demand_unit: input_demand_unit || :buffers, + sender_process: other_endpoint.pid, + sender_pad_ref: other_endpoint.pad_ref, + supervisor: state.subprocess_supervisor, + toilet_capacity: endpoint.pad_props[:toilet_capacity], + throttling_factor: endpoint.pad_props[:throttling_factor] + }) # The sibiling was an initiator, we don't need to use the pid of a task spawned for observability _metadata = Observability.setup_link(endpoint.pad_ref, link_metadata.observability_metadata)