Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement soft corcking #586

Merged
merged 13 commits into from Aug 25, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,7 @@
* Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct.
* Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552)
* Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586)
* Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand Down
29 changes: 7 additions & 22 deletions lib/membrane/core/element/action_handler.ex
Expand Up @@ -137,43 +137,28 @@ defmodule Membrane.Core.Element.ActionHandler do
end

@impl CallbackHandler
def handle_action({:redemand, out_refs}, cb, params, state)
when is_list(out_refs) do
Enum.reduce(out_refs, state, fn out_ref, state ->
handle_action({:redemand, out_ref}, cb, params, state)
def handle_action({action, pads_refs}, cb, params, state)
when action in [:redemand, :pause_auto_demand, :resume_auto_demand] and is_list(pads_refs) do
Enum.reduce(pads_refs, state, fn pad_ref, state ->
handle_action({action, pad_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:redemand, out_ref}, cb, _params, %State{type: type} = state)
when type in [:source, :filter, :endpoint] and is_pad_ref(out_ref) and
{type, cb} != {:filter, :handle_demand} do
when type in [:source, :filter, :endpoint] and {type, cb} != {:filter, :handle_demand} do
handle_redemand(out_ref, state)
end

@impl CallbackHandler
def handle_action({:pause_auto_demand, in_refs}, cb, params, state) when is_list(in_refs) do
Enum.reduce(in_refs, state, fn in_ref, state ->
handle_action({:pause_auto_demand, in_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] and is_pad_ref(in_ref) do
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_refs}, cb, params, state) when is_list(in_refs) do
Enum.reduce(in_refs, state, fn in_ref, state ->
handle_action({:resume_auto_demand, in_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] and is_pad_ref(in_ref) do
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
end

Expand Down
18 changes: 9 additions & 9 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Expand Up @@ -18,13 +18,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
@spec pause_demands(Pad.ref(), State.t()) :: State.t()
def pause_demands(pad_ref, state) do
:ok = ensure_auto_input_pad!(pad_ref, :pause_auto_demand, state)
set_auto_demand_stopped_flag(pad_ref, true, state)
set_auto_demand_paused_flag(pad_ref, true, state)
end

@spec resume_demands(Pad.ref(), State.t()) :: State.t()
def resume_demands(pad_ref, state) do
:ok = ensure_auto_input_pad!(pad_ref, :resume_auto_demand, state)
state = set_auto_demand_stopped_flag(pad_ref, false, state)
state = set_auto_demand_paused_flag(pad_ref, false, state)
auto_adjust_atomic_demand(pad_ref, state)
end

Expand All @@ -43,16 +43,16 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
end
end

@spec set_auto_demand_stopped_flag(Pad.ref(), boolean(), State.t()) :: State.t()
defp set_auto_demand_stopped_flag(pad_ref, new_value, state) do
@spec set_auto_demand_paused_flag(Pad.ref(), boolean(), State.t()) :: State.t()
defp set_auto_demand_paused_flag(pad_ref, paused?, state) do
{old_value, state} =
PadModel.get_and_update_data!(state, pad_ref, :auto_demand_stopped?, &{&1, new_value})
PadModel.get_and_update_data!(state, pad_ref, :auto_demand_paused?, &{&1, paused?})

if old_value == new_value do
operation = if new_value, do: "pause", else: "resume"
if old_value == paused? do
operation = if paused?, do: "pause", else: "resume"

Membrane.Logger.debug(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Membrane.Logger.debug(
Membrane.Logger.debug_verbose(

"Trying to #{operation} auto demand on pad #{inspect(pad_ref)}, while it has been already #{operation}d"
"Tried to #{operation} auto demand on pad #{inspect(pad_ref)}, while it has been already #{operation}d"
)
end

Expand Down Expand Up @@ -95,7 +95,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp increase_atomic_demand?(pad_data, state) do
state.effective_flow_control == :pull and
not pad_data.auto_demand_stopped? and
not pad_data.auto_demand_paused? and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state))
end
Expand Down
6 changes: 3 additions & 3 deletions lib/membrane/element/action.ex
varsill marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -99,9 +99,9 @@ defmodule Membrane.Element.Action do
This action does not guarantee that no more buffers will arrive on the specific pad,
but ensures, that demand on this pad will not increase until returning
`#{inspect(__MODULE__)}.resume_auto_demand()` action. Number of buffers, that will
arrive on the pad, depends on the behaviour of the elements ealier in the pipeline.
arrive on the pad, depends on the behaviour of the elements earlier in the pipeline.

When auto-demanding is already paused, this action will have no effect.
When auto-demanding is already paused, this action has no effect.
"""
@type pause_auto_demand :: {:pause_auto_demand, Pad.ref() | [Pad.ref()]}

Expand All @@ -112,7 +112,7 @@ defmodule Membrane.Element.Action do

This action reverts the effects of `#{inspect(__MODULE__)}.pause_auto_demand()` action.

When auto demanding is not paused, this action will have no effect.
When auto demanding is not paused, this action has no effect.
"""
@type resume_auto_demand :: {:resume_auto_demand, Pad.ref() | [Pad.ref()]}

Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/element/pad_data.ex
Expand Up @@ -13,6 +13,7 @@ defmodule Membrane.Element.PadData do
- `:options` - options passed in `Membrane.ParentSpec` when linking pad
- `:ref` - see `t:Membrane.Pad.ref/0`
- `:start_of_stream?` - flag determining whether the stream processing via the pad has been started
- `auto_demand_paused?` - flag determining if auto-demanding on the pad is paused or no

Other fields in the struct ARE NOT PART OF THE PUBLIC API and should not be
accessed or relied on.
Expand All @@ -34,7 +35,7 @@ defmodule Membrane.Element.PadData do
name: Pad.name(),
ref: Pad.ref(),
options: %{optional(atom) => any},
auto_demand_stopped?: boolean(),
auto_demand_paused?: boolean(),
stream_format_validation_params: private_field,
pid: private_field,
other_ref: private_field,
Expand Down Expand Up @@ -94,6 +95,6 @@ defmodule Membrane.Element.PadData do
other_demand_unit: nil,
other_effective_flow_control: :push,
stalker_metrics: %{},
auto_demand_stopped?: false
auto_demand_paused?: false
]
end
85 changes: 81 additions & 4 deletions test/membrane/integration/demands_test.exs
varsill marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -11,6 +11,8 @@ defmodule Membrane.Integration.DemandsTest do
alias Membrane.Testing
alias Membrane.Testing.{Pipeline, Sink, Source}

require Membrane.Pad, as: Pad

defp assert_buffers_received(range, pid) do
Enum.each(range, fn i ->
assert_sink_buffer(pid, :sink, %Buffer{payload: <<^i::16>> <> <<255>>})
Expand Down Expand Up @@ -149,14 +151,16 @@ defmodule Membrane.Integration.DemandsTest do
def sleep_time(), do: @sleep_time

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %StreamFormat{}}], state}
def handle_playing(_ctx, _default_state) do
{[stream_format: {:output, %StreamFormat{}}], %{counter: 0}}
end

@impl true
def handle_demand(:output, _size, _unit, _ctx, state) do
Process.sleep(@sleep_time)
{[buffer: {:output, %Membrane.Buffer{payload: ""}}, redemand: :output], state}

actions = [buffer: {:output, %Membrane.Buffer{payload: state.counter}}, redemand: :output]
{actions, Map.update!(state, :counter, &(&1 + 1))}
end
end

Expand Down Expand Up @@ -198,12 +202,14 @@ defmodule Membrane.Integration.DemandsTest do
Process.sleep(500)

for _i <- 1..10 do
# during sleep below source should send about 100 buffers
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive aropund 100 buffers, but the boundary is set to 70, in case of eg.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# sink should receive aropund 100 buffers, but the boundary is set to 70, in case of eg.
# sink should receive around 100 buffers, but the boundary is set to 70, in case of eg.

# slowdown of the source when running all tests in the project asynchronously
assert buff_no > 70
varsill marked this conversation as resolved.
Show resolved Hide resolved

# during sleep below source should send up to about auto_demand_size = 10 buffers
Expand All @@ -212,9 +218,80 @@ defmodule Membrane.Integration.DemandsTest do
Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end

defmodule Funnel do
use Membrane.Filter

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto

def_options pads_upperbounds: [spec: map()]

@impl true
def handle_init(_ctx, %{pads_upperbounds: pads_upperbounds}) do
{[], %{pads_upperbounds: pads_upperbounds, pads_counters: %{}}}
end

@impl true
def handle_pad_added(pad, _ctx, state) do
{[], put_in(state, [:pads_counters, pad], 0)}
end

@impl true
def handle_buffer(pad, buffer, ctx, state) do
buffer = %Membrane.Buffer{buffer | metadata: pad}

{pad_counter, state} = get_and_update_in(state, [:pads_counters, pad], &{&1, &1 + 1})

pad_upperbound = Map.get(state.pads_upperbounds, pad, :infinity)

actions =
if pad_counter > pad_upperbound and not ctx.pads[pad].auto_demand_paused? do
Copy link
Contributor

@varsill varsill Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that pad_counter > pad_upperbound comparison is quite tricky (since we are using :infinity atom) :D

[pause_auto_demand: pad, buffer: {:output, buffer}]
else
[buffer: {:output, buffer}]
end

{actions, state}
end

@impl true
def handle_end_of_stream(_pad, ctx, state) do
actions =
Map.values(ctx.pads)
|> Enum.filter(&(&1.direction == :output and not &1.end_of_stream?))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, there is only one pad with direction:output, something might be wrong with that action generation.

|> Enum.map(&{:end_of_stream, &1.ref})

{actions, state}
end
end

test "funnel pausing auto demands on one of its pads" do
spec = [
child({:source, :a}, RedemandingSource)
|> via_in(Pad.ref(:input, :a), auto_demand_size: 10)
|> child(:funnel, %Funnel{pads_upperbounds: %{Pad.ref(:input, :a) => 100}})
|> child(:sink, %Testing.Sink{autodemand: true}),
child({:source, :b}, RedemandingSource)
|> via_in(Pad.ref(:input, :b), auto_demand_size: 10)
|> get_child(:funnel)
]

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_sink_buffer(pipeline, :sink, %{payload: 100, metadata: Pad.ref(:input, :a)})
assert_sink_buffer(pipeline, :sink, %{payload: 100, metadata: Pad.ref(:input, :b)})
assert_sink_buffer(pipeline, :sink, %{payload: 500, metadata: Pad.ref(:input, :b)}, 8000)
refute_sink_buffer(pipeline, :sink, %{payload: 200, metadata: Pad.ref(:input, :a)}, 5000)

Testing.Pipeline.terminate(pipeline)
end
end