Skip to content

Commit

Permalink
Start AtomicDemand Worker under supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 19, 2023
1 parent 812cf9c commit 0821fb5
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 48 deletions.
51 changes: 30 additions & 21 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Membrane.Core.Element.AtomicDemand do
@moduledoc false
alias Membrane.Core.Element.AtomicDemand.AtomicFlowStatus

alias Membrane.Core.Element.EffectiveFlowController

alias __MODULE__.{
Expand Down Expand Up @@ -46,40 +46,49 @@ defmodule Membrane.Core.Element.AtomicDemand do

defstruct @enforce_keys ++ [buffered_decrementation: 0, toilet_overflowed?: false]

@spec new(
receiver_effective_flow_control :: EffectiveFlowController.effective_flow_control(),
receiver_process :: Process.dest(),
receiver_demand_unit :: Membrane.Buffer.Metric.unit(),
sender_process :: Process.dest(),
sender_pad_ref :: Pad.ref(),
toilet_capacity :: non_neg_integer() | nil,
throttling_factor :: pos_integer() | nil
) :: t
@spec new(%{
:receiver_effective_flow_control => EffectiveFlowController.effective_flow_control(),
:receiver_process => Process.dest(),
:receiver_demand_unit => Membrane.Buffer.Metric.unit(),
:sender_process => Process.dest(),
:sender_pad_ref => Pad.ref(),
:supervisor => pid(),
optional(:toilet_capacity) => non_neg_integer() | nil,
optional(:throttling_factor) => pos_integer() | nil
}) :: t
def new(
receiver_effective_flow_control,
receiver_process,
receiver_demand_unit,
sender_process,
sender_pad_ref,
toilet_capacity \\ nil,
throttling_factor \\ nil
%{
receiver_effective_flow_control: receiver_effective_flow_control,
receiver_process: receiver_process,
receiver_demand_unit: receiver_demand_unit,
sender_process: sender_process,
sender_pad_ref: sender_pad_ref,
supervisor: supervisor
} = options
) do
%DistributedAtomic{worker: worker} = counter = DistributedAtomic.new()
toilet_capacity = options[:toilet_capacity]
throttling_factor = options[:throttling_factor]

counter = DistributedAtomic.new(supervisor: supervisor)

throttling_factor =
cond do
throttling_factor != nil -> throttling_factor
node(sender_process) == node(worker) -> @default_throttling_factor
node(sender_process) == node(counter.worker) -> @default_throttling_factor
true -> @distributed_default_throttling_factor
end

receiver_status = AtomicFlowStatus.new({:resolved, receiver_effective_flow_control})
receiver_status =
AtomicFlowStatus.new(
{:resolved, receiver_effective_flow_control},
supervisor: supervisor
)

%__MODULE__{
counter: counter,
receiver_status: receiver_status,
receiver_process: receiver_process,
sender_status: AtomicFlowStatus.new(:to_be_resolved),
sender_status: AtomicFlowStatus.new(:to_be_resolved, supervisor: supervisor),
sender_process: sender_process,
sender_pad_ref: sender_pad_ref,
toilet_capacity: toilet_capacity || default_toilet_capacity(receiver_demand_unit),
Expand Down
6 changes: 3 additions & 3 deletions lib/membrane/core/element/atomic_demand/atomic_flow_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ defmodule Membrane.Core.Element.AtomicDemand.AtomicFlowStatus do
@type t :: DistributedAtomic.t()
@type value :: {:resolved, EffectiveFlowController.effective_flow_control()} | :to_be_resolved

@spec new(value) :: t
def new(initial_value) do
@spec new(value, supervisor: pid()) :: t
def new(initial_value, supervisor: supervisor) do
initial_value
|> flow_status_to_int()
|> DistributedAtomic.new()
|> DistributedAtomic.new(supervisor: supervisor)
end

@spec get(t) :: value()
Expand Down
7 changes: 4 additions & 3 deletions lib/membrane/core/element/atomic_demand/distributed_atomic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic do
# The module allows to create and modify the value of a counter in the same manner both when the counter is about to be accessed
# from the same node, and from different nodes.

alias Membrane.Core.SubprocessSupervisor
alias __MODULE__.Worker

@enforce_keys [:worker, :atomic_ref]
Expand All @@ -16,10 +17,10 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic do
defguardp on_the_same_node_as_self(distributed_atomic)
when distributed_atomic.worker |> node() == self() |> node()

@spec new(integer() | nil) :: t
def new(initial_value \\ nil) do
@spec new(integer() | nil, supervisor: pid()) :: t
def new(initial_value \\ nil, supervisor: supervisor) do
atomic_ref = :atomics.new(1, [])
{:ok, worker} = Worker.start_link(self())
{:ok, worker} = SubprocessSupervisor.start_utility(supervisor, Worker)

distributed_atomic = %__MODULE__{
atomic_ref: atomic_ref,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do

@type t :: pid()

@spec start_link(pid()) :: {:ok, t}
def start_link(owner_pid), do: GenServer.start_link(__MODULE__, owner_pid)
@spec start_link(any()) :: {:ok, t}
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

@impl true
def init(owner_pid) do
ref = Process.monitor(owner_pid)
{:ok, %{ref: ref}, :hibernate}
def init(_opts) do
{:ok, nil, :hibernate}
end

@impl true
Expand All @@ -40,9 +39,4 @@ defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
end

@impl true
def handle_info({:DOWN, ref, _process, _pid, _reason}, %{ref: ref} = state) do
{:stop, :normal, state}
end
end
2 changes: 1 addition & 1 deletion lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do

# If element A is linked via its input auto pads only to the :push output pads of other elements, then effective flow
# control of element A will be set to :push. Otherwise, if element A is linked via its input auto pads to at least one
# :pull output pad, element A will set itss effective flow control to :pull and will forward this information
# :pull output pad, element A will set its effective flow control to :pull and will forward this information
# via its output auto pads.

# Resolving effective flow control is performed on
Expand Down
21 changes: 11 additions & 10 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Membrane.Core.Element.PadController do

alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
AtomicDemand,
CallbackContext,
EffectiveFlowController,
EventController,
InputQueue,
Expand Down Expand Up @@ -160,15 +160,16 @@ defmodule Membrane.Core.Element.PadController do
EffectiveFlowController.get_pad_effective_flow_control(endpoint.pad_ref, state)

atomic_demand =
AtomicDemand.new(
pad_effective_flow_control,
self(),
input_demand_unit || :buffers,
other_endpoint.pid,
other_endpoint.pad_ref,
endpoint.pad_props[:toilet_capacity],
endpoint.pad_props[:throttling_factor]
)
AtomicDemand.new(%{
receiver_effective_flow_control: pad_effective_flow_control,
receiver_process: self(),
receiver_demand_unit: input_demand_unit || :buffers,
sender_process: other_endpoint.pid,
sender_pad_ref: other_endpoint.pad_ref,
supervisor: state.subprocess_supervisor,
toilet_capacity: endpoint.pad_props[:toilet_capacity],
throttling_factor: endpoint.pad_props[:throttling_factor]
})

# The sibiling was an initiator, we don't need to use the pid of a task spawned for observability
_metadata = Observability.setup_link(endpoint.pad_ref, link_metadata.observability_metadata)
Expand Down

0 comments on commit 0821fb5

Please sign in to comment.