Skip to content

Commit

Permalink
InputQueue -> ManualFlowController.InputQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 27, 2024
1 parent be0abad commit 6c585e4
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 25 deletions.
2 changes: 1 addition & 1 deletion lib/membrane/children_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do
Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive
as many buffers, as demanded, all excess buffers will be queued internally.
Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0`
for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future).
for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future).
- `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode.
See `t:Membrane.Pad.flow_control/0` for more info.
- `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1,
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do

alias Membrane.Core.Child
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.Core.Element.ManualFlowController.InputQueue
alias Membrane.{Pad, UnknownPadError}

@type bin_pad_data :: %Membrane.Bin.PadData{
Expand Down Expand Up @@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do
pid: pid,
other_ref: Pad.ref(),
sticky_messages: [Membrane.Event.t()],
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
input_queue: InputQueue.t() | nil,
options: %{optional(atom) => any},
auto_demand_size: pos_integer() | nil,
sticky_events: [Membrane.Event.t()],
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ defmodule Membrane.Core.Element do

defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do
pad_ref = Message.for_pad(msg)
state = BufferController.handle_buffer(pad_ref, buffers, state)
state = BufferController.handle_ingoing_buffers(pad_ref, buffers, state)
{:noreply, state}
end

Expand Down
22 changes: 12 additions & 10 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ defmodule Membrane.Core.Element.BufferController do
DemandController,
AutoFlowController,
EventController,
InputQueue,
ManualFlowController,
PlaybackQueue,
State
}

alias Membrane.Core.Element.ManualFlowController.InputQueue

alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
Expand All @@ -32,8 +33,8 @@ defmodule Membrane.Core.Element.BufferController do
callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2`
to check if there are any unsupplied demands.
"""
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_buffer(pad_ref, buffers, state) do
@spec handle_ingoing_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_ingoing_buffers(pad_ref, buffers, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
%{
Expand All @@ -51,20 +52,21 @@ defmodule Membrane.Core.Element.BufferController do
EventController.handle_start_of_stream(pad_ref, state)
end

do_handle_buffer(pad_ref, data, buffers, state)
do_handle_ingoing_buffers(pad_ref, data, buffers, state)
else
pad: {:error, :unknown_pad} ->
# We've got a buffer from already unlinked pad
state

playback: _playback ->
PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state)
PlaybackQueue.store(&handle_ingoing_buffers(pad_ref, buffers, &1), state)
end
end

@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
# todo: move it to the flow controllers?
@spec do_handle_ingoing_buffers(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
State.t()
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

Expand All @@ -79,20 +81,20 @@ defmodule Membrane.Core.Element.BufferController do
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do
%{input_queue: old_input_queue} = data

input_queue = InputQueue.store(old_input_queue, buffers)
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)

if old_input_queue |> InputQueue.empty?() do
if InputQueue.empty?(old_input_queue) do
ManualFlowController.supply_demand(pad_ref, state)
else
state
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :push}, buffers, state) do
exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
6 changes: 4 additions & 2 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Membrane.Core.Element.DemandController do
use Bunch

alias Membrane.Buffer
alias Membrane.Element.PadData

alias Membrane.Core.CallbackHandler
alias Membrane.Core.Element.CallbackContext
Expand All @@ -28,8 +29,9 @@ defmodule Membrane.Core.Element.DemandController do
def snapshot_atomic_demand(pad_ref, state) do
with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref),
%State{playback: :playing} <- state do
if pad_data.direction == :input,
do: raise("cannot snapshot atomic counter in input pad")
if pad_data.direction == :input do
raise("cannot snapshot atomic counter in input pad")
end

do_snapshot_atomic_demand(pad_data, state)
else
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ defmodule Membrane.Core.Element.EventController do
CallbackContext,
DemandController,
AutoFlowController,
InputQueue,
ManualFlowController,
AutoFlowController,
PlaybackQueue,
State
}

alias Membrane.Core.Element.ManualFlowController.InputQueue

require Membrane.Core.Child.PadModel
require Membrane.Core.Message
require Membrane.Core.Telemetry
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/element/manual_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ defmodule Membrane.Core.Element.ManualFlowController do
DemandController,
AutoFlowController,
EventController,
InputQueue,
State,
StreamFormatController
}

alias __MODULE__.InputQueue

alias Membrane.Element.PadData
alias Membrane.Pad

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.InputQueue do
defmodule Membrane.Core.Element.ManualFlowController.InputQueue do
@moduledoc false
# Queue that is attached to the `:input` pad when working in a `:manual` flow control mode.

Expand Down Expand Up @@ -105,7 +105,8 @@ defmodule Membrane.Core.Element.InputQueue do
|> maybe_increase_atomic_demand()
end

@spec store(t(), atom(), queue_item() | [queue_item()]) :: t()
@spec store(t(), :buffer | :buffers | :event | :stream_format, queue_item() | [queue_item()]) ::
t()
def store(input_queue, type \\ :buffers, v)

def store(input_queue, :buffers, v) when is_list(v) do
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ defmodule Membrane.Core.Element.PadController do
AutoFlowController,
EffectiveFlowController,
EventController,
InputQueue,
State,
StreamFormatController
}

alias Membrane.Core.Element.ManualFlowController.InputQueue

alias Membrane.Core.Parent.Link.Endpoint
alias Membrane.LinkError

Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ defmodule Membrane.Core.Element.StreamFormatController do
CallbackContext,
DemandController,
AutoFlowController,
InputQueue,
PlaybackQueue,
State
}

alias Membrane.Core.Element.ManualFlowController.InputQueue

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry

Expand Down
3 changes: 2 additions & 1 deletion test/membrane/core/element/event_controller_test.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
defmodule Membrane.Core.Element.EventControllerTest do
use ExUnit.Case, async: true

alias Membrane.Core.Element.{AtomicDemand, EventController, InputQueue, State}
alias Membrane.Core.Element.{AtomicDemand, EventController, State}
alias Membrane.Core.Element.ManualFlowController.InputQueue
alias Membrane.Core.Events
alias Membrane.Core.SubprocessSupervisor
alias Membrane.Event
Expand Down
3 changes: 2 additions & 1 deletion test/membrane/core/element/input_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ defmodule Membrane.Core.Element.InputQueueTest do
use ExUnit.Case, async: true

alias Membrane.Buffer
alias Membrane.Core.Element.{AtomicDemand, InputQueue}
alias Membrane.Core.Element.AtomicDemand
alias Membrane.Core.Element.ManualFlowController.InputQueue
alias Membrane.Core.Message
alias Membrane.Core.SubprocessSupervisor
alias Membrane.Testing.Event
Expand Down
3 changes: 2 additions & 1 deletion test/membrane/core/element/lifecycle_controller_test.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
defmodule Membrane.Core.Element.LifecycleControllerTest do
use ExUnit.Case, async: true

alias Membrane.Core.Element.{AtomicDemand, InputQueue, LifecycleController, State}
alias Membrane.Core.Element.{AtomicDemand, LifecycleController, State}
alias Membrane.Core.Element.ManualFlowController.InputQueue

alias Membrane.Core.{
Message,
Expand Down
3 changes: 2 additions & 1 deletion test/membrane/core/element/stream_format_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do

alias Membrane.Buffer
alias Membrane.Core.Message
alias Membrane.Core.Element.{AtomicDemand, InputQueue, State}
alias Membrane.Core.Element.{AtomicDemand, State}
alias Membrane.Core.Element.ManualFlowController.InputQueue
alias Membrane.Core.SubprocessSupervisor
alias Membrane.StreamFormat.Mock, as: MockStreamFormat
alias Membrane.Support.DemandsTest.Filter
Expand Down

0 comments on commit 6c585e4

Please sign in to comment.