Skip to content

Commit

Permalink
Implement soft corcking (#586)
Browse files Browse the repository at this point in the history
* Add :pause_auto_demands and :resume_auto_demands actions

Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com>
  • Loading branch information
FelonEkonom and varsill committed Aug 25, 2023
1 parent 8fcb2f0 commit eb8e8ef
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@
* Remove `assert_pipeline_play/2` from `Membrane.Testing.Assertions`. [#528](https://github.com/membraneframework/membrane_core/pull/528)
* Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct.
* Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552)
* Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586)
* Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577)

## 0.11.0
Expand Down
25 changes: 19 additions & 6 deletions lib/membrane/core/element/action_handler.ex
Expand Up @@ -70,6 +70,8 @@ defmodule Membrane.Core.Element.ActionHandler do
:stream_format,
:demand,
:redemand,
:pause_auto_demand,
:resume_auto_demand,
:forward,
:end_of_stream
] do
Expand Down Expand Up @@ -135,20 +137,31 @@ defmodule Membrane.Core.Element.ActionHandler do
end

@impl CallbackHandler
def handle_action({:redemand, out_refs}, cb, params, state)
when is_list(out_refs) do
Enum.reduce(out_refs, state, fn out_ref, state ->
handle_action({:redemand, out_ref}, cb, params, state)
def handle_action({action, pads_refs}, cb, params, state)
when action in [:redemand, :pause_auto_demand, :resume_auto_demand] and is_list(pads_refs) do
Enum.reduce(pads_refs, state, fn pad_ref, state ->
handle_action({action, pad_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:redemand, out_ref}, cb, _params, %State{type: type} = state)
when type in [:source, :filter, :endpoint] and is_pad_ref(out_ref) and
{type, cb} != {:filter, :handle_demand} do
when type in [:source, :filter, :endpoint] and {type, cb} != {:filter, :handle_demand} do
handle_redemand(out_ref, state)
end

@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:forward, data}, cb, params, %State{type: :filter} = state)
when cb in [
Expand Down
46 changes: 46 additions & 0 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Expand Up @@ -7,13 +7,58 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
}

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Logger
require Membrane.Pad, as: Pad

defguardp is_input_auto_pad_data(pad_data)
when is_map(pad_data) and is_map_key(pad_data, :flow_control) and
pad_data.flow_control == :auto and is_map_key(pad_data, :direction) and
pad_data.direction == :input

@spec pause_demands(Pad.ref(), State.t()) :: State.t()
def pause_demands(pad_ref, state) do
:ok = ensure_auto_input_pad!(pad_ref, :pause_auto_demand, state)
set_auto_demand_paused_flag(pad_ref, true, state)
end

@spec resume_demands(Pad.ref(), State.t()) :: State.t()
def resume_demands(pad_ref, state) do
:ok = ensure_auto_input_pad!(pad_ref, :resume_auto_demand, state)
state = set_auto_demand_paused_flag(pad_ref, false, state)
auto_adjust_atomic_demand(pad_ref, state)
end

defp ensure_auto_input_pad!(pad_ref, action, state) do
case PadModel.get_data!(state, pad_ref) do
%{direction: :input, flow_control: :auto} ->
:ok

%{direction: :output} ->
raise Membrane.ElementError,
"Action #{inspect(action)} can only be returned for input pads, but #{inspect(pad_ref)} is an output pad"

%{flow_control: flow_control} ->
raise Membrane.ElementError,
"Action #{inspect(action)} can only be returned for pads with :auto flow control, but #{inspect(pad_ref)} pad flow control is #{inspect(flow_control)}"
end
end

@spec set_auto_demand_paused_flag(Pad.ref(), boolean(), State.t()) :: State.t()
defp set_auto_demand_paused_flag(pad_ref, paused?, state) do
{old_value, state} =
PadModel.get_and_update_data!(state, pad_ref, :auto_demand_paused?, &{&1, paused?})

if old_value == paused? do
operation = if paused?, do: "pause", else: "resume"

Membrane.Logger.debug_verbose(
"Tried to #{operation} auto demand on pad #{inspect(pad_ref)}, while it has been already #{operation}d"
)
end

state
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do
Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2)
Expand Down Expand Up @@ -50,6 +95,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp increase_atomic_demand?(pad_data, state) do
state.effective_flow_control == :pull and
not pad_data.auto_demand_paused? and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state))
end
Expand Down
27 changes: 27 additions & 0 deletions lib/membrane/element/action.ex
Expand Up @@ -91,6 +91,31 @@ defmodule Membrane.Element.Action do
@type demand :: {:demand, {Pad.ref(), demand_size}}
@type demand_size :: pos_integer | (pos_integer() -> non_neg_integer())

@typedoc """
Pauses auto-demanding on the specific pad.
The pad must have input direction and work in `:auto` flow control mode.
This action does not guarantee that no more buffers will arrive on the specific pad,
but ensures, that demand on this pad will not increase until returning
`#{inspect(__MODULE__)}.resume_auto_demand()` action. Number of buffers, that will
arrive on the pad, depends on the behaviour of the elements earlier in the pipeline.
When auto-demanding is already paused, this action has no effect.
"""
@type pause_auto_demand :: {:pause_auto_demand, Pad.ref() | [Pad.ref()]}

@typedoc """
Resumes auto-demanding on the specific pad.
The pad must have input direction and work in `:auto` flow control mode.
This action reverts the effects of `#{inspect(__MODULE__)}.pause_auto_demand()` action.
When auto demanding is not paused, this action has no effect.
"""
@type resume_auto_demand :: {:resume_auto_demand, Pad.ref() | [Pad.ref()]}

@typedoc """
Executes `c:Membrane.Element.WithOutputPads.handle_demand/5` callback
for the given pad (or pads), that have demand greater than 0.
Expand Down Expand Up @@ -237,6 +262,8 @@ defmodule Membrane.Element.Action do
| buffer
| demand
| redemand
| pause_auto_demand
| resume_auto_demand
| forward
| start_timer
| timer_interval
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/element/pad_data.ex
Expand Up @@ -13,6 +13,7 @@ defmodule Membrane.Element.PadData do
- `:options` - options passed in `Membrane.ParentSpec` when linking pad
- `:ref` - see `t:Membrane.Pad.ref/0`
- `:start_of_stream?` - flag determining whether the stream processing via the pad has been started
- `auto_demand_paused?` - flag determining if auto-demanding on the pad is paused or no
Other fields in the struct ARE NOT PART OF THE PUBLIC API and should not be
accessed or relied on.
Expand All @@ -34,6 +35,7 @@ defmodule Membrane.Element.PadData do
name: Pad.name(),
ref: Pad.ref(),
options: %{optional(atom) => any},
auto_demand_paused?: boolean(),
stream_format_validation_params: private_field,
pid: private_field,
other_ref: private_field,
Expand Down Expand Up @@ -92,6 +94,7 @@ defmodule Membrane.Element.PadData do
stream_format_validation_params: [],
other_demand_unit: nil,
other_effective_flow_control: :push,
stalker_metrics: %{}
stalker_metrics: %{},
auto_demand_paused?: false
]
end
160 changes: 160 additions & 0 deletions test/membrane/integration/demands_test.exs
Expand Up @@ -11,6 +11,8 @@ defmodule Membrane.Integration.DemandsTest do
alias Membrane.Testing
alias Membrane.Testing.{Pipeline, Sink, Source}

require Membrane.Pad, as: Pad

defp assert_buffers_received(range, pid) do
Enum.each(range, fn i ->
assert_sink_buffer(pid, :sink, %Buffer{payload: <<^i::16>> <> <<255>>})
Expand Down Expand Up @@ -133,4 +135,162 @@ defmodule Membrane.Integration.DemandsTest do

Testing.Pipeline.terminate(pipeline)
end

defmodule RedemandingSource do
use Membrane.Source

@sleep_time 10

def_output_pad :output, accepted_format: _any, flow_control: :manual

defmodule StreamFormat do
defstruct []
end

@spec sleep_time() :: pos_integer()
def sleep_time(), do: @sleep_time

@impl true
def handle_playing(_ctx, _default_state) do
{[stream_format: {:output, %StreamFormat{}}], %{counter: 0}}
end

@impl true
def handle_demand(:output, _size, _unit, _ctx, state) do
Process.sleep(@sleep_time)

actions = [buffer: {:output, %Membrane.Buffer{payload: state.counter}}, redemand: :output]
{actions, Map.update!(state, :counter, &(&1 + 1))}
end
end

defmodule PausingSink do
use Membrane.Sink

def_input_pad :input, accepted_format: _any, flow_control: :auto

@impl true
def handle_init(_ctx, _opts), do: {[], %{counter: 0}}

@impl true
def handle_buffer(:input, _buffer, _ctx, state) do
{[], Map.update!(state, :counter, &(&1 + 1))}
end

@impl true
def handle_parent_notification(action, _ctx, state)
when action in [:pause_auto_demand, :resume_auto_demand] do
actions = [
{action, :input},
notify_parent: {:buff_no, state.counter}
]

{actions, %{state | counter: 0}}
end
end

test "actions :pause_auto_demand and :resume_auto_demand" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

# time for pipeline to start playing
Process.sleep(500)

for _i <- 1..10 do
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive around 100 buffers, but the boundary is set to 70, in case of eg.
# slowdown of the source when running all tests in the project asynchronously
assert buff_no > 70

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end

defmodule Funnel do
use Membrane.Filter

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options pads_upperbounds: [spec: map()]

@impl true
def handle_init(_ctx, %{pads_upperbounds: pads_upperbounds}) do
{[], %{pads_upperbounds: pads_upperbounds, pads_counters: %{}}}
end

@impl true
def handle_pad_added(pad, _ctx, state) do
{[], put_in(state, [:pads_counters, pad], 0)}
end

@impl true
def handle_buffer(pad, buffer, ctx, state) do
buffer = %Membrane.Buffer{buffer | metadata: pad}

{pad_counter, state} = get_and_update_in(state, [:pads_counters, pad], &{&1, &1 + 1})

actions =
with {:ok, upperbound} when pad_counter > upperbound <-
Map.fetch(state.pads_upperbounds, pad),
%{auto_demand_paused?: false} <- ctx.pads[pad] do
[pause_auto_demand: pad, buffer: {:output, buffer}]
else
_other -> [buffer: {:output, buffer}]
end

{actions, state}
end

@impl true
def handle_end_of_stream(_pad, ctx, state) do
if ctx.pads.output.end_of_stream? do
{[], state}
else
{[end_of_stream: :output], state}
end
end
end

test "funnel pausing auto demands on one of its pads" do
spec = [
child({:source, :a}, RedemandingSource)
|> via_in(Pad.ref(:input, :a), auto_demand_size: 10)
|> child(:funnel, %Funnel{pads_upperbounds: %{Pad.ref(:input, :a) => 100}})
|> child(:sink, %Testing.Sink{autodemand: true}),
child({:source, :b}, RedemandingSource)
|> via_in(Pad.ref(:input, :b), auto_demand_size: 10)
|> get_child(:funnel)
]

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_sink_buffer(pipeline, :sink, %{payload: 100, metadata: Pad.ref(:input, :a)})
assert_sink_buffer(pipeline, :sink, %{payload: 100, metadata: Pad.ref(:input, :b)})
assert_sink_buffer(pipeline, :sink, %{payload: 500, metadata: Pad.ref(:input, :b)}, 8000)
refute_sink_buffer(pipeline, :sink, %{payload: 200, metadata: Pad.ref(:input, :a)}, 5000)

Testing.Pipeline.terminate(pipeline)
end
end

0 comments on commit eb8e8ef

Please sign in to comment.