Skip to content

Commit

Permalink
Implement comments from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Apr 2, 2024
1 parent c14ca13 commit e684bc1
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 21 deletions.
6 changes: 3 additions & 3 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Membrane.Core.CallbackHandler do
@callback transform_actions(actions :: list, callback :: atom, handler_params, state) ::
{actions :: list, state}

@callback handle_end_of_actions(callback :: atom, state) :: state
@callback handle_end_of_actions(state) :: state

defmacro __using__(_args) do
quote location: :keep do
Expand All @@ -43,7 +43,7 @@ defmodule Membrane.Core.CallbackHandler do
end

@impl unquote(__MODULE__)
def handle_end_of_actions(_callback, state) do
def handle_end_of_actions(state) do
state
end

Expand Down Expand Up @@ -202,6 +202,6 @@ defmodule Membrane.Core.CallbackHandler do
end
end)

handler_module.handle_end_of_actions(callback, state)
handler_module.handle_end_of_actions(state)
end
end
2 changes: 1 addition & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ defmodule Membrane.Core.Element do

defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do
pad_ref = Message.for_pad(msg)
state = BufferController.handle_ingoing_buffers(pad_ref, buffers, state)
state = BufferController.handle_incoming_buffers(pad_ref, buffers, state)
{:noreply, state}
end

Expand Down
8 changes: 1 addition & 7 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,8 @@ defmodule Membrane.Core.Element.ActionHandler do

defguardp is_demand_size(size) when is_integer(size) or is_function(size)

# Match in the function below is caused by a fact, that handle_spec_started is the only callback, that
# might be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.

@impl CallbackHandler
def handle_end_of_actions(:handle_spec_started, state), do: state

def handle_end_of_actions(_callback, state) do
def handle_end_of_actions(state) do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
Expand Down
16 changes: 8 additions & 8 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ defmodule Membrane.Core.Element.BufferController do
callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2`
to check if there are any unsupplied demands.
"""
@spec handle_ingoing_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_ingoing_buffers(pad_ref, buffers, state) do
@spec handle_incoming_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_incoming_buffers(pad_ref, buffers, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
%{
Expand All @@ -50,25 +50,25 @@ defmodule Membrane.Core.Element.BufferController do
EventController.handle_start_of_stream(pad_ref, state)
end

do_handle_ingoing_buffers(pad_ref, data, buffers, state)
do_handle_incoming_buffers(pad_ref, data, buffers, state)
else
pad: {:error, :unknown_pad} ->
# We've got a buffer from already unlinked pad
state

playback: _playback ->
PlaybackQueue.store(&handle_ingoing_buffers(pad_ref, buffers, &1), state)
PlaybackQueue.store(&handle_incoming_buffers(pad_ref, buffers, &1), state)
end
end

@spec do_handle_ingoing_buffers(
@spec do_handle_incoming_buffers(
Pad.ref(),
PadModel.pad_data(),
[Buffer.t()] | Buffer.t(),
State.t()
) ::
State.t()
defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

Expand All @@ -83,7 +83,7 @@ defmodule Membrane.Core.Element.BufferController do
end
end

defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do
%{input_queue: old_input_queue} = data

input_queue = InputQueue.store(old_input_queue, buffers)
Expand All @@ -96,7 +96,7 @@ defmodule Membrane.Core.Element.BufferController do
end
end

defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :push}, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :push}, buffers, state) do
exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Membrane.Core.Element.DemandController do
with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref),
%State{playback: :playing} <- state do
if pad_data.direction == :input do
raise("cannot snapshot atomic counter in input pad")
raise("cannot snapshot atomic counter in input pad #{inspect(pad_ref)}")
end

do_snapshot_atomic_demand(pad_data, state)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
end

@impl CallbackHandler
def handle_end_of_actions(_callback, state) do
def handle_end_of_actions(state) do
with %{awaiting_setup_completition?: true} <- state do
%{state | awaiting_setup_completition?: false}
|> Membrane.Core.LifecycleController.complete_setup()
Expand Down

0 comments on commit e684bc1

Please sign in to comment.