Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement soft corcking #586

Merged
merged 13 commits into from Aug 25, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@
* Remove `assert_pipeline_play/2` from `Membrane.Testing.Assertions`. [#528](https://github.com/membraneframework/membrane_core/pull/528)
* Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct.
* Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552)
* Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586)
* Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577)

## 0.11.0
Expand Down
25 changes: 19 additions & 6 deletions lib/membrane/core/element/action_handler.ex
Expand Up @@ -70,6 +70,8 @@ defmodule Membrane.Core.Element.ActionHandler do
:stream_format,
:demand,
:redemand,
:pause_auto_demand,
:resume_auto_demand,
:forward,
:end_of_stream
] do
Expand Down Expand Up @@ -135,20 +137,31 @@ defmodule Membrane.Core.Element.ActionHandler do
end

@impl CallbackHandler
def handle_action({:redemand, out_refs}, cb, params, state)
when is_list(out_refs) do
Enum.reduce(out_refs, state, fn out_ref, state ->
handle_action({:redemand, out_ref}, cb, params, state)
def handle_action({action, pads_refs}, cb, params, state)
when action in [:redemand, :pause_auto_demand, :resume_auto_demand] and is_list(pads_refs) do
Enum.reduce(pads_refs, state, fn pad_ref, state ->
handle_action({action, pad_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:redemand, out_ref}, cb, _params, %State{type: type} = state)
when type in [:source, :filter, :endpoint] and is_pad_ref(out_ref) and
{type, cb} != {:filter, :handle_demand} do
when type in [:source, :filter, :endpoint] and {type, cb} != {:filter, :handle_demand} do
handle_redemand(out_ref, state)
end

@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:forward, data}, cb, params, %State{type: :filter} = state)
when cb in [
Expand Down
46 changes: 46 additions & 0 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Expand Up @@ -7,13 +7,58 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
}

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Logger
require Membrane.Pad, as: Pad

defguardp is_input_auto_pad_data(pad_data)
when is_map(pad_data) and is_map_key(pad_data, :flow_control) and
pad_data.flow_control == :auto and is_map_key(pad_data, :direction) and
pad_data.direction == :input

@spec pause_demands(Pad.ref(), State.t()) :: State.t()
def pause_demands(pad_ref, state) do
:ok = ensure_auto_input_pad!(pad_ref, :pause_auto_demand, state)
set_auto_demand_paused_flag(pad_ref, true, state)
end

@spec resume_demands(Pad.ref(), State.t()) :: State.t()
def resume_demands(pad_ref, state) do
:ok = ensure_auto_input_pad!(pad_ref, :resume_auto_demand, state)
state = set_auto_demand_paused_flag(pad_ref, false, state)
auto_adjust_atomic_demand(pad_ref, state)
end

defp ensure_auto_input_pad!(pad_ref, action, state) do
case PadModel.get_data!(state, pad_ref) do
%{direction: :input, flow_control: :auto} ->
:ok

%{direction: :output} ->
raise Membrane.ElementError,
"Action #{inspect(action)} can only be returned for input pads, but #{inspect(pad_ref)} is an output pad"

%{flow_control: flow_control} ->
raise Membrane.ElementError,
"Action #{inspect(action)} can only be returned for pads with :auto flow control, but #{inspect(pad_ref)} pad flow control is #{inspect(flow_control)}"
end
end

@spec set_auto_demand_paused_flag(Pad.ref(), boolean(), State.t()) :: State.t()
defp set_auto_demand_paused_flag(pad_ref, paused?, state) do
{old_value, state} =
PadModel.get_and_update_data!(state, pad_ref, :auto_demand_paused?, &{&1, paused?})

if old_value == paused? do
operation = if paused?, do: "pause", else: "resume"

Membrane.Logger.debug(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Membrane.Logger.debug(
Membrane.Logger.debug_verbose(

"Tried to #{operation} auto demand on pad #{inspect(pad_ref)}, while it has been already #{operation}d"
)
end

state
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do
Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2)
Expand Down Expand Up @@ -50,6 +95,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp increase_atomic_demand?(pad_data, state) do
state.effective_flow_control == :pull and
not pad_data.auto_demand_paused? and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state))
end
Expand Down
27 changes: 27 additions & 0 deletions lib/membrane/element/action.ex
varsill marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -91,6 +91,31 @@ defmodule Membrane.Element.Action do
@type demand :: {:demand, {Pad.ref(), demand_size}}
@type demand_size :: pos_integer | (pos_integer() -> non_neg_integer())

@typedoc """
Pauses auto-demanding on the specific pad.

The pad must have input direction and work in `:auto` flow control mode.

This action does not guarantee that no more buffers will arrive on the specific pad,
but ensures, that demand on this pad will not increase until returning
`#{inspect(__MODULE__)}.resume_auto_demand()` action. Number of buffers, that will
arrive on the pad, depends on the behaviour of the elements earlier in the pipeline.

When auto-demanding is already paused, this action has no effect.
"""
@type pause_auto_demand :: {:pause_auto_demand, Pad.ref() | [Pad.ref()]}

@typedoc """
Resumes auto-demanding on the specific pad.

The pad must have input direction and work in `:auto` flow control mode.

This action reverts the effects of `#{inspect(__MODULE__)}.pause_auto_demand()` action.

When auto demanding is not paused, this action has no effect.
"""
@type resume_auto_demand :: {:resume_auto_demand, Pad.ref() | [Pad.ref()]}

@typedoc """
Executes `c:Membrane.Element.WithOutputPads.handle_demand/5` callback
for the given pad (or pads), that have demand greater than 0.
Expand Down Expand Up @@ -237,6 +262,8 @@ defmodule Membrane.Element.Action do
| buffer
| demand
| redemand
| pause_auto_demand
| resume_auto_demand
| forward
| start_timer
| timer_interval
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/element/pad_data.ex
Expand Up @@ -13,6 +13,7 @@ defmodule Membrane.Element.PadData do
- `:options` - options passed in `Membrane.ParentSpec` when linking pad
- `:ref` - see `t:Membrane.Pad.ref/0`
- `:start_of_stream?` - flag determining whether the stream processing via the pad has been started
- `auto_demand_paused?` - flag determining if auto-demanding on the pad is paused or no

Other fields in the struct ARE NOT PART OF THE PUBLIC API and should not be
accessed or relied on.
Expand All @@ -34,6 +35,7 @@ defmodule Membrane.Element.PadData do
name: Pad.name(),
ref: Pad.ref(),
options: %{optional(atom) => any},
auto_demand_paused?: boolean(),
stream_format_validation_params: private_field,
pid: private_field,
other_ref: private_field,
Expand Down Expand Up @@ -92,6 +94,7 @@ defmodule Membrane.Element.PadData do
stream_format_validation_params: [],
other_demand_unit: nil,
other_effective_flow_control: :push,
stalker_metrics: %{}
stalker_metrics: %{},
auto_demand_paused?: false
]
end
161 changes: 161 additions & 0 deletions test/membrane/integration/demands_test.exs
varsill marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -11,6 +11,8 @@ defmodule Membrane.Integration.DemandsTest do
alias Membrane.Testing
alias Membrane.Testing.{Pipeline, Sink, Source}

require Membrane.Pad, as: Pad

defp assert_buffers_received(range, pid) do
Enum.each(range, fn i ->
assert_sink_buffer(pid, :sink, %Buffer{payload: <<^i::16>> <> <<255>>})
Expand Down Expand Up @@ -133,4 +135,163 @@ defmodule Membrane.Integration.DemandsTest do

Testing.Pipeline.terminate(pipeline)
end

defmodule RedemandingSource do
use Membrane.Source

@sleep_time 10

def_output_pad :output, accepted_format: _any, flow_control: :manual

defmodule StreamFormat do
defstruct []
end

@spec sleep_time() :: pos_integer()
def sleep_time(), do: @sleep_time

@impl true
def handle_playing(_ctx, _default_state) do
{[stream_format: {:output, %StreamFormat{}}], %{counter: 0}}
end

@impl true
def handle_demand(:output, _size, _unit, _ctx, state) do
Process.sleep(@sleep_time)

actions = [buffer: {:output, %Membrane.Buffer{payload: state.counter}}, redemand: :output]
{actions, Map.update!(state, :counter, &(&1 + 1))}
end
end

defmodule PausingSink do
use Membrane.Sink

def_input_pad :input, accepted_format: _any, flow_control: :auto

@impl true
def handle_init(_ctx, _opts), do: {[], %{counter: 0}}

@impl true
def handle_buffer(:input, _buffer, _ctx, state) do
{[], Map.update!(state, :counter, &(&1 + 1))}
end

@impl true
def handle_parent_notification(action, _ctx, state)
when action in [:pause_auto_demand, :resume_auto_demand] do
actions = [
{action, :input},
notify_parent: {:buff_no, state.counter}
]

{actions, %{state | counter: 0}}
end
end

test "actions :pause_auto_demand and :resume_auto_demand" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

# time for pipeline to start playing
Process.sleep(500)

for _i <- 1..10 do
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive aropund 100 buffers, but the boundary is set to 70, in case of eg.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# sink should receive aropund 100 buffers, but the boundary is set to 70, in case of eg.
# sink should receive around 100 buffers, but the boundary is set to 70, in case of eg.

# slowdown of the source when running all tests in the project asynchronously
assert buff_no > 70
varsill marked this conversation as resolved.
Show resolved Hide resolved

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end

defmodule Funnel do
use Membrane.Filter

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options pads_upperbounds: [spec: map()]

@impl true
def handle_init(_ctx, %{pads_upperbounds: pads_upperbounds}) do
{[], %{pads_upperbounds: pads_upperbounds, pads_counters: %{}}}
end

@impl true
def handle_pad_added(pad, _ctx, state) do
{[], put_in(state, [:pads_counters, pad], 0)}
end

@impl true
def handle_buffer(pad, buffer, ctx, state) do
buffer = %Membrane.Buffer{buffer | metadata: pad}

{pad_counter, state} = get_and_update_in(state, [:pads_counters, pad], &{&1, &1 + 1})

pad_upperbound = Map.get(state.pads_upperbounds, pad, :infinity)

actions =
if pad_counter > pad_upperbound and not ctx.pads[pad].auto_demand_paused? do
Copy link
Contributor

@varsill varsill Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that pad_counter > pad_upperbound comparison is quite tricky (since we are using :infinity atom) :D

[pause_auto_demand: pad, buffer: {:output, buffer}]
else
[buffer: {:output, buffer}]
end

{actions, state}
end

@impl true
def handle_end_of_stream(_pad, ctx, state) do
actions =
Map.values(ctx.pads)
|> Enum.filter(&(&1.direction == :output and not &1.end_of_stream?))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, there is only one pad with direction:output, something might be wrong with that action generation.

|> Enum.map(&{:end_of_stream, &1.ref})

{actions, state}
end
end

test "funnel pausing auto demands on one of its pads" do
spec = [
child({:source, :a}, RedemandingSource)
|> via_in(Pad.ref(:input, :a), auto_demand_size: 10)
|> child(:funnel, %Funnel{pads_upperbounds: %{Pad.ref(:input, :a) => 100}})
|> child(:sink, %Testing.Sink{autodemand: true}),
child({:source, :b}, RedemandingSource)
|> via_in(Pad.ref(:input, :b), auto_demand_size: 10)
|> get_child(:funnel)
]

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_sink_buffer(pipeline, :sink, %{payload: 100, metadata: Pad.ref(:input, :a)})
assert_sink_buffer(pipeline, :sink, %{payload: 100, metadata: Pad.ref(:input, :b)})
assert_sink_buffer(pipeline, :sink, %{payload: 500, metadata: Pad.ref(:input, :b)}, 8000)
refute_sink_buffer(pipeline, :sink, %{payload: 200, metadata: Pad.ref(:input, :a)}, 5000)

Testing.Pipeline.terminate(pipeline)
end
end