Skip to content

Commit

Permalink
WIP auto demand
Browse files Browse the repository at this point in the history
  • Loading branch information
mat-hek committed Jun 2, 2021
1 parent 153e86b commit 1e69ec6
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 20 deletions.
13 changes: 12 additions & 1 deletion lib/membrane/core/child/pads_specs.ex
Expand Up @@ -164,9 +164,20 @@ defmodule Membrane.Core.Child.PadsSpecs do
availability: [in: [:always, :on_request], default: :always],
caps: [validate: &Caps.Matcher.validate_specs/1],
mode: [in: [:pull, :push], default: :pull],
demand: [
in: [:auto, :manual],
require_if: &(&1.mode == :pull and direction == :input),
default: :auto
],
demand_inputs: [
require_if: &(&1.mode == :pull and direction == :output),
default: []
],
demand_unit: [
in: [:buffers, :bytes],
require_if: &(&1.mode == :pull and (component == :bin or direction == :input))
require_if:
&(&1.mode == :pull and &1[:demand] != :auto and
(component == :bin or direction == :input))
],
options: [default: nil]
) do
Expand Down
28 changes: 15 additions & 13 deletions lib/membrane/core/element/buffer_controller.ex
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Core.Element.BufferController do
alias Membrane.{Buffer, Pad}
alias Membrane.Core.{CallbackHandler, InputBuffer}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.{ActionHandler, DemandHandler, State}
alias Membrane.Core.Element.{ActionHandler, DemandController, DemandHandler, State}
alias Membrane.Element.CallbackContext

require Membrane.Core.Child.PadModel
Expand All @@ -22,9 +22,10 @@ defmodule Membrane.Core.Element.BufferController do
def handle_buffer(pad_ref, buffers, state) do
PadModel.assert_data!(state, pad_ref, %{direction: :input})

case PadModel.get_data!(state, pad_ref, :mode) do
:pull -> handle_buffer_pull(pad_ref, buffers, state)
:push -> exec_buffer_handler(pad_ref, buffers, state)
case PadModel.get_data!(state, pad_ref) do
%{mode: :pull, demand_pads: []} -> handle_buffer_pull(pad_ref, buffers, state)
%{mode: :pull} -> handle_buffer_auto_pull(pad_ref, buffers, state)
%{mode: :push} -> exec_buffer_handler(pad_ref, buffers, state)
end
end

Expand All @@ -34,30 +35,27 @@ defmodule Membrane.Core.Element.BufferController do
@spec exec_buffer_handler(
Pad.ref_t(),
[Buffer.t()] | Buffer.t(),
params :: map,
State.t()
) :: State.stateful_try_t()
def exec_buffer_handler(pad_ref, buffers, params \\ %{}, state)

def exec_buffer_handler(pad_ref, buffers, params, %State{type: :filter} = state) do
def exec_buffer_handler(pad_ref, buffers, %State{type: :filter} = state) do
require CallbackContext.Process

CallbackHandler.exec_and_handle_callback(
:handle_process_list,
ActionHandler,
%{context: &CallbackContext.Process.from_state/1} |> Map.merge(params),
%{context: &CallbackContext.Process.from_state/1},
[pad_ref, buffers],
state
)
end

def exec_buffer_handler(pad_ref, buffers, params, %State{type: :sink} = state) do
def exec_buffer_handler(pad_ref, buffers, %State{type: :sink} = state) do
require CallbackContext.Write

CallbackHandler.exec_and_handle_callback(
:handle_write_list,
ActionHandler,
%{context: &CallbackContext.Write.from_state/1} |> Map.merge(params),
%{context: &CallbackContext.Write.from_state/1},
[pad_ref, buffers],
state
)
Expand All @@ -66,8 +64,6 @@ defmodule Membrane.Core.Element.BufferController do
@spec handle_buffer_pull(Pad.ref_t(), [Buffer.t()] | Buffer.t(), State.t()) ::
State.stateful_try_t()
defp handle_buffer_pull(pad_ref, buffers, state) do
PadModel.assert_data!(state, pad_ref, %{direction: :input})

with {:ok, old_input_buf} <- PadModel.get_data(state, pad_ref, :input_buf) do
input_buf = InputBuffer.store(old_input_buf, buffers)
state = PadModel.set_data!(state, pad_ref, :input_buf, input_buf)
Expand All @@ -81,4 +77,10 @@ defmodule Membrane.Core.Element.BufferController do
{:error, reason} -> {{:error, reason}, state}
end
end

defp handle_buffer_auto_pull(pad_ref, buffers, state) do
state = PadModel.update_data!(state, pad_ref, :demand, &(&1 - length(buffers)))
state = DemandController.check_auto_demand(pad_ref, state)
exec_buffer_handler(pad_ref, buffers, state)
end
end
36 changes: 31 additions & 5 deletions lib/membrane/core/element/demand_controller.ex
Expand Up @@ -5,7 +5,7 @@ defmodule Membrane.Core.Element.DemandController do

use Bunch

alias Membrane.Core.CallbackHandler
alias Membrane.Core.{CallbackHandler, Message}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.{ActionHandler, State}
alias Membrane.Element.CallbackContext
Expand All @@ -20,10 +20,38 @@ defmodule Membrane.Core.Element.DemandController do
@spec handle_demand(Pad.ref_t(), non_neg_integer, State.t()) ::
State.stateful_try_t()
def handle_demand(pad_ref, size, state) do
if ignore?(pad_ref, state) do
%{direction: :output, demand_pads: demand_pads} = PadModel.get_data!(state, pad_ref)

cond do
ignore?(pad_ref, state) -> {:ok, state}
demand_pads == [] -> do_handle_demand(pad_ref, size, state)
true -> handle_auto_demand(pad_ref, size, state)
end
end

defp handle_auto_demand(pad_ref, size, state) do
%{demand: old_demand, demand_pads: demand_pads} = PadModel.get_data!(state, pad_ref)
state = PadModel.set_data!(state, pad_ref, :demand, old_demand + size)

if old_demand <= 0 do
{:ok, Enum.reduce(demand_pads, state, &check_auto_demand/2)}
else
{:ok, state}
end
end

def check_auto_demand(pad_ref, state) do
demand = PadModel.get_data!(state, pad_ref, :demand)
demand_size = 40

if demand <= demand_size / 2 and
PadModel.get_data!(state, pad_ref, :demand_pads)
|> Enum.all?(&(PadModel.get_data!(state, &1, :demand) > 0)) do
%{pid: pid, other_ref: other_ref} = PadModel.get_data!(state, pad_ref)
Message.send(pid, :demand, demand_size, for_pad: other_ref)
PadModel.set_data!(state, pad_ref, :demand, demand + demand_size)
else
do_handle_demand(pad_ref, size, state)
state
end
end

Expand All @@ -33,8 +61,6 @@ defmodule Membrane.Core.Element.DemandController do
@spec do_handle_demand(Pad.ref_t(), non_neg_integer, State.t()) ::
State.stateful_try_t()
defp do_handle_demand(pad_ref, size, state) do
PadModel.assert_data(state, pad_ref, %{direction: :output})

{total_size, state} =
state
|> PadModel.get_and_update_data!(pad_ref, :demand, fn demand ->
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/pad_data.ex
Expand Up @@ -62,5 +62,6 @@ defmodule Membrane.Pad.Data do
sticky_messages: nil,
input_buf: nil,
demand: nil,
options: %{}
options: %{},
demand_pads: []
end

0 comments on commit 1e69ec6

Please sign in to comment.