Skip to content

Commit

Permalink
Modify 5s linking timeout constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 23, 2024
1 parent eae44ae commit c3bb2fb
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 28 deletions.
5 changes: 3 additions & 2 deletions lib/membrane/bin/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ defmodule Membrane.Bin.PadData do
link_id: private_field,
endpoint: private_field,
linked?: private_field,
response_received?: private_field
response_received?: private_field,
linking_timeout_ref: private_field
}

@enforce_keys [
Expand All @@ -43,5 +44,5 @@ defmodule Membrane.Bin.PadData do
:spec_ref
]

defstruct @enforce_keys
defstruct @enforce_keys ++ [:linking_timeout_ref]
end
4 changes: 2 additions & 2 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
PadController.handle_linking_timeout(pad_ref, state)
defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do
PadController.handle_linking_timeout(pad_ref, timeout_ref, state)
{:noreply, state}
end

Expand Down
44 changes: 25 additions & 19 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,22 @@ defmodule Membrane.Core.Bin.PadController do
end

state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
state = maybe_handle_pad_added(pad_ref, state)

unless PadModel.get_data!(state, pad_ref, :endpoint) do
# If there's no endpoint associated to the pad, no internal link to the pad
# has been requested in the bin yet
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
:ok
end
state =
if PadModel.get_data!(state, pad_ref, :endpoint) == nil do
# If there's no endpoint associated to the pad, no internal link to the pad
# has been requested in the bin yet

state
linking_timeout_ref = make_ref()
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref])
Process.send_after(self(), message, 5000)

PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref)
else
state
end

maybe_handle_pad_added(pad_ref, state)
end

@spec remove_pad(Pad.ref(), State.t()) :: State.t()
Expand All @@ -102,15 +108,15 @@ defmodule Membrane.Core.Bin.PadController do
end
end

@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, state) do
case PadModel.get_data(state, pad_ref) do
{:ok, %{endpoint: nil} = pad_data} ->
raise Membrane.LinkError,
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}"
@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, timeout_ref, state) do
map_set_item = {pad_ref, timeout_ref}

_other ->
:ok
if MapSet.member?(state.initialized_internal_pads, map_set_item) do
Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item))
else
raise Membrane.LinkError,
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}"
end
end

Expand Down Expand Up @@ -316,8 +322,8 @@ defmodule Membrane.Core.Bin.PadController do
end

@spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t()
defp maybe_handle_pad_added(ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref)
defp maybe_handle_pad_added(pad_ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref)

if Pad.availability_mode(availability) == :dynamic do
context = &CallbackContext.from_state(&1, pad_options: pad_opts)
Expand All @@ -326,7 +332,7 @@ defmodule Membrane.Core.Bin.PadController do
:handle_pad_added,
ActionHandler,
%{context: context},
[ref],
[pad_ref],
state
)
else
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Membrane.Core.Bin.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
initialized_internal_pads: MapSet.t(Pad.ref()),
stalker: Membrane.Core.Stalker.t()
}

Expand Down Expand Up @@ -76,5 +77,6 @@ defmodule Membrane.Core.Bin.State do
resource_guard: nil,
subprocess_supervisor: nil,
children_log_metadata: [],
initialized_internal_pads: MapSet.new(),
pads_data: nil
end
33 changes: 28 additions & 5 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
alias Membrane.Pad
alias Membrane.ParentError

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -154,7 +155,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do

state =
put_in(state, [:pending_specs, spec_ref], %{
status: :initializing,
status: :created,
children_names: MapSet.new(all_children_names),
links_ids: Enum.map(links, & &1.id),
dependent_specs: dependent_specs,
Expand Down Expand Up @@ -309,14 +310,36 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end
end

defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
state =
with %Bin.State{} <- state do
initialized_internal_pads =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} ->
with {Membrane.Bin, :itself} <- child,
{:ok, timeout_ref} when timeout_ref != nil <-
PadModel.get_data(state, pad_ref, :linking_timeout_ref) do
[{pad_ref, timeout_ref}]
else
_other -> []
end
end)
|> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1))

%{state | initialized_internal_pads: initialized_internal_pads}
end

do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
end

defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
Membrane.Logger.debug(
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
)

%{children: children} = state

if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and
Enum.empty?(spec_data.dependent_specs) do
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state)
Expand Down Expand Up @@ -351,7 +374,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end

defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do
if Enum.empty?(spec_data.awaiting_responses) do
if MapSet.size(spec_data.awaiting_responses) == 0 do
state =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
Expand Down

0 comments on commit c3bb2fb

Please sign in to comment.