Skip to content

Commit

Permalink
Add test for pasue_auto_demand and resume_auto_demand
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jul 24, 2023
1 parent 0e43d19 commit d763bb5
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 14 deletions.
16 changes: 8 additions & 8 deletions lib/membrane/core/element/action_handler.ex
Expand Up @@ -70,8 +70,8 @@ defmodule Membrane.Core.Element.ActionHandler do
:stream_format,
:demand,
:redemand,
:pause_auto_demands,
:resume_auto_demands,
:pause_auto_demand,
:resume_auto_demand,
:forward,
:end_of_stream
] do
Expand Down Expand Up @@ -152,27 +152,27 @@ defmodule Membrane.Core.Element.ActionHandler do
end

@impl CallbackHandler
def handle_action({:pause_auto_demands, in_refs}, cb, params, state) when is_list(in_refs) do
def handle_action({:pause_auto_demand, in_refs}, cb, params, state) when is_list(in_refs) do
Enum.reduce(in_refs, state, fn in_ref, state ->
handle_action({:pause_auto_demands, in_ref}, cb, params, state)
handle_action({:pause_auto_demand, in_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:pause_auto_demands, in_ref}, _cb, _params, %State{type: type} = state)
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] and is_pad_ref(in_ref) do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demands, in_refs}, cb, params, state) when is_list(in_refs) do
def handle_action({:resume_auto_demand, in_refs}, cb, params, state) when is_list(in_refs) do
Enum.reduce(in_refs, state, fn in_ref, state ->
handle_action({:resume_auto_demands, in_ref}, cb, params, state)
handle_action({:resume_auto_demand, in_ref}, cb, params, state)
end)
end

@impl CallbackHandler
def handle_action({:resume_auto_demands, in_ref}, _cb, _params, %State{type: type} = state)
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] and is_pad_ref(in_ref) do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
end
Expand Down
Expand Up @@ -22,7 +22,8 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

@spec resume_demands(Pad.ref(), State.t()) :: State.t()
def resume_demands(pad_ref, state) do
set_auto_demand_stopped_flag(pad_ref, false, state)
state = set_auto_demand_stopped_flag(pad_ref, false, state)
auto_adjust_atomic_demand(pad_ref, state)
end

@spec set_auto_demand_stopped_flag(Pad.ref(), boolean(), State.t()) :: State.t()
Expand Down
8 changes: 4 additions & 4 deletions lib/membrane/element/action.ex
Expand Up @@ -91,8 +91,8 @@ defmodule Membrane.Element.Action do
@type demand :: {:demand, {Pad.ref(), demand_size}}
@type demand_size :: pos_integer | (pos_integer() -> non_neg_integer())

@type pause_auto_demands :: {:pause_auto_demands, Pad.ref() | [Pad.ref()]}
@type resume_auto_demands :: {:resume_auto_demands, Pad.ref() | [Pad.ref()]}
@type pause_auto_demand :: {:pause_auto_demand, Pad.ref() | [Pad.ref()]}
@type resume_auto_demand :: {:resume_auto_demand, Pad.ref() | [Pad.ref()]}

@typedoc """
Executes `c:Membrane.Element.WithOutputPads.handle_demand/5` callback
Expand Down Expand Up @@ -240,8 +240,8 @@ defmodule Membrane.Element.Action do
| buffer
| demand
| redemand
| pause_auto_demands
| resume_auto_demands
| pause_auto_demand
| resume_auto_demand
| forward
| start_timer
| timer_interval
Expand Down
89 changes: 89 additions & 0 deletions test/membrane/integration/demands_test.exs
Expand Up @@ -133,4 +133,93 @@ defmodule Membrane.Integration.DemandsTest do

Testing.Pipeline.terminate(pipeline)
end

test "actions :pause_auto_demand and :resume_auto_demand" do
defmodule RedemandingSource do
use Membrane.Source

def_output_pad :output, accepted_format: _any, flow_control: :manual, availability: :always

defmodule StreamFormat do
defstruct []
end

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %StreamFormat{}}], state}
end

@impl true
def handle_demand(:output, _size, _unit, _ctx, state) do
Process.sleep(10)
{[buffer: {:output, %Membrane.Buffer{payload: ""}}, redemand: :output], state}
end
end

defmodule PausingSink do
use Membrane.Sink

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :always

@impl true
def handle_init(_ctx, _opts), do: {[], %{counter: 0}}

@impl true
def handle_buffer(:input, _buffer, _ctx, state) do
{[notify_parent: {:buff_no, state.counter}], Map.update!(state, :counter, &(&1 + 1))}
end

@impl true
def handle_parent_notification(action, _ctx, state)
when action in [:pause_auto_demand, :resume_auto_demand] do
{[{action, :input}], state}
end
end

pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

Process.sleep(500)
buff_no_before_loop = max_buff_no()

Enum.reduce(1..10, buff_no_before_loop, fn _i, max_buff_no_in_prev_step ->
# during sleep below source should send about 100 buffers
Process.sleep(100 * 10)

buff_no = max_buff_no()
assert buff_no > max_buff_no_in_prev_step + 70

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * 10)

new_buff_no = max_buff_no()
assert new_buff_no <= buff_no + 25

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

new_buff_no
end)

Testing.Pipeline.terminate(pipeline)
end

defp max_buff_no() do
Process.info(self(), :messages)
|> elem(1)
|> Enum.reduce(-1, fn
{Testing.Pipeline, _pid, {:handle_child_notification, {{:buff_no, buff_no}, :sink}}},
max_buff_no ->
max(buff_no, max_buff_no)

_msg, max_buff_no ->
max_buff_no
end)
end
end
1 change: 0 additions & 1 deletion test/membrane/integration/stream_format_test.exs
Expand Up @@ -45,7 +45,6 @@ defmodule Membrane.StreamFormatTest do
assert_down(Sink, source_module: Source)
end

@tag :dupa
test "input pad :accepted_format in outer bin" do
start_test_pipeline(Source, OuterSinkBin, AcceptedByInnerBins)
assert_down(Sink, source_module: Source)
Expand Down

0 comments on commit d763bb5

Please sign in to comment.