Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 25, 2024
1 parent 187b205 commit 58e7f3c
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 30 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ import Config
if config_env() == :test do
config :junit_formatter, include_filename?: true
end

config :membrane_core, :logger, verbose: true
8 changes: 4 additions & 4 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Membrane.Core.CallbackHandler do
@callback transform_actions(actions :: list, callback :: atom, handler_params, state) ::
{actions :: list, state}

@callback handle_end_of_actions(state) :: state
@callback handle_end_of_actions(callback :: atom, state) :: state

defmacro __using__(_args) do
quote location: :keep do
Expand All @@ -44,7 +44,7 @@ defmodule Membrane.Core.CallbackHandler do
end

@impl unquote(__MODULE__)
def handle_end_of_actions(state) do
def handle_end_of_actions(_callback, state) do
state
end

Expand Down Expand Up @@ -133,7 +133,7 @@ defmodule Membrane.Core.CallbackHandler do
%{context: context_fun},
%{module: module, internal_state: internal_state} = state
) do
args = args ++ [context_fun.(state), internal_state]
args = args ++ [context_fun.(state) |> Map.put(:s, state), internal_state]

callback_result =
try do
Expand Down Expand Up @@ -223,6 +223,6 @@ defmodule Membrane.Core.CallbackHandler do
do: %{state | supplying_demand?: false},
else: state

handler_module.handle_end_of_actions(state)
handler_module.handle_end_of_actions(callback, state)
end
end
85 changes: 63 additions & 22 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,63 @@ defmodule Membrane.Core.Element.ActionHandler do
defguardp is_demand_size(size) when is_integer(size) or is_function(size)

@impl CallbackHandler
def handle_end_of_actions(state) do
def handle_end_of_actions(callback, state) do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state

state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)

state
# state =
# if Enum.random([1, 2]) == 1 do
# DemandController.AutoFlowUtils.pop_auto_flow_queues_while_needed(state)
# else
# state
# end

# state =
# with %{supplying_demand?: false} <- state do
# DemandHandler.handle_delayed_demands(state)
# end

# Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might
# be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.
# state =
# if callback != :handle_spec_started do
# Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
# |> Map.put(:pads_to_snapshot, MapSet.new())
# else
# state
# end

# snapshot(callback, state)
# |> hdd()
# |> then(& snapshot(callback, &1))

if Enum.random([1, 2]) == 1 do
snapshot(callback, state)
|> hdd()
else
state
|> hdd()
|> then(&snapshot(callback, &1))
end
end

defp maybe_handle_delayed_demands(state) do
defp hdd(state) do
with %{supplying_demand?: false} <- state do
DemandHandler.handle_delayed_demands(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
# with %{handling_action?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
defp snapshot(callback, state) do
if callback != :handle_spec_started do
state.pads_to_snapshot
|> Enum.shuffle()
|> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
# end
else
state
end
end

@impl CallbackHandler
Expand Down Expand Up @@ -333,6 +358,12 @@ defmodule Membrane.Core.Element.ActionHandler do

pad_data = PadModel.get_data!(state, pad_ref)

old_pad_data = pad_data

# if state.name == :filter do
# IO.inspect({pad_data.name, pad_data.demand}, label: "PRZED")
# end

with %{
direction: :output,
end_of_stream?: false,
Expand All @@ -342,12 +373,22 @@ defmodule Membrane.Core.Element.ActionHandler do
stalker_metrics: stalker_metrics
}
when stream_format != nil <- pad_data do
# todo: move this function to one of the controllers, to avoid redundant PadModet.get_data in the function below
state = DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state)
:atomics.add(stalker_metrics.total_buffers, 1, length(buffers))
Message.send(pid, :buffer, buffers, for_pad: other_ref)

PadModel.set_data!(state, pad_ref, :start_of_stream?, true)
|> Map.update!(:pads_to_snapshot, &MapSet.put(&1, pad_ref))
state =
PadModel.set_data!(state, pad_ref, :start_of_stream?, true)
|> Map.update!(:pads_to_snapshot, &MapSet.put(&1, pad_ref))

# if state.name == :filter do
# pad_data = PadModel.get_data!(state, pad_ref)
# IO.inspect({pad_data.name, old_pad_data.demand, pad_data.demand})
# # IO.inspect(pad_ref)
# end

state
else
%{direction: :input} ->
raise PadDirectionError, action: :buffer, direction: :input, pad: pad_ref
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
atomic_value = AtomicDemand.get(pad_data.atomic_demand)
state = PadModel.set_data!(state, pad_data.ref, :demand, atomic_value)

if atomic_value > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
end

@impl CallbackHandler
def handle_end_of_actions(state) do
def handle_end_of_actions(_callback, state) do
with %{awaiting_setup_completition?: true} <- state do
%{state | awaiting_setup_completition?: false}
|> Membrane.Core.LifecycleController.complete_setup()
Expand Down
2 changes: 0 additions & 2 deletions test/membrane/core/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ defmodule Membrane.Core.PipelineTest do
[],
state
)
|> ActionHandler.handle_end_of_actions()
end
end

Expand All @@ -93,7 +92,6 @@ defmodule Membrane.Core.PipelineTest do
[],
state
)
|> ActionHandler.handle_end_of_actions()
end
end
end
Expand Down
128 changes: 128 additions & 0 deletions test/membrane/integration/delayed_demands_loop_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,132 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do

Testing.Pipeline.terminate(pipeline)
end

# todo: dokoncz ten test tak, zeby na handle_buffer z dowolnego input pada robil forward na oba outputy, pozagluj proporcjami zeby sprobowac zaglocic auto output

defmodule VariousFlowFilter do
alias Membrane.Core.Element.AtomicDemand
use Membrane.Filter

def_input_pad :manual_input,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers

def_input_pad :auto_input, accepted_format: _any, flow_control: :auto

def_output_pad :manual_output, accepted_format: _any, flow_control: :manual
def_output_pad :auto_output, accepted_format: _any, flow_control: :auto

defmodule StreamFormat do
defstruct []
end

@impl true
def handle_playing(_ctx, _state) do
actions =
[:manual_output, :auto_output]
|> Enum.map(&{:stream_format, {&1, %StreamFormat{}}})

{actions, %{eos?: false, i: 0}}
end

@impl true
def handle_demand(:manual_output, size, :buffers, _ctx, state) do
{[demand: {:manual_input, size}], state}
end

@impl true
def handle_buffer(_pad, buffer, ctx, state) do
Process.sleep(1)

state = %{state | i: state.i + 1}

buffer =
if rem(state.i, 10_000) == 0 do
payload =
for pad_ref <- Map.keys(ctx.pads) do
{
pad_ref,
AtomicDemand.get(ctx.pads[pad_ref].atomic_demand)
}
end ++
[
state: ctx.s
]

%{buffer | payload: payload}
else
buffer
end

actions =
for pad <- [:auto_output, :manual_output] do
{:buffer, {pad, buffer}}
end

{actions, state}
end

@impl true
def handle_end_of_stream(_pad, _ctx, state) do
{[], state}
end
end

@tag :dupa
test "manual pad doesn't starve auto pad" do
buffers_per_source = 100_000

input_demand_size = 100

manual_source_buffers =
Stream.repeatedly(fn -> %Buffer{metadata: :manual, payload: <<>>} end)
|> Stream.take(buffers_per_source)

auto_source_buffers =
Stream.repeatedly(fn -> %Buffer{metadata: :auto, payload: <<>>} end)
|> Stream.take(buffers_per_source)

pipeline =
Testing.Pipeline.start_link_supervised!(
spec: [
child(:manual_source, %Testing.Source{output: manual_source_buffers})
|> via_in(:manual_input, target_queue_size: input_demand_size)
|> child(:filter, VariousFlowFilter)
|> via_out(:manual_output)
|> child(:manual_sink, Testing.Sink),
child(:auto_source, %Testing.Source{output: auto_source_buffers})
|> via_in(:auto_input, auto_demand_size: input_demand_size)
|> get_child(:filter)
|> via_out(:auto_output)
|> child(:auto_sink, Testing.Sink)
]
)

stats = %{manual: 0, auto: 0}

Enum.reduce(1..100_000, stats, fn i, stats ->
assert_sink_buffer(pipeline, :auto_sink, buffer)
stats = Map.update!(stats, buffer.metadata, &(&1 + 1))

label = " STATS"
IO.inspect(stats, label: label)

# if div(i, 10_000) * 10_000 == i do
# Testing.Pipeline.get_child_pid!(pipeline, :filter)
# |> :sys.get_state()
# |> IO.inspect(limit: :infinity, label: "FILTER_STATE")
# end

if buffer.payload != <<>> do
buffer.payload
|> IO.inspect(limit: :infinity, label: "FILTER_STATE")
end

stats
end)

Testing.Pipeline.terminate(pipeline)
end
end

0 comments on commit 58e7f3c

Please sign in to comment.