Skip to content

Commit

Permalink
Fix linking timeout mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 24, 2024
1 parent e4cfff8 commit 814f10b
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 42 deletions.
5 changes: 2 additions & 3 deletions lib/membrane/bin/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ defmodule Membrane.Bin.PadData do
link_id: private_field,
endpoint: private_field,
linked?: private_field,
response_received?: private_field,
linking_timeout_ref: private_field
response_received?: private_field
}

@enforce_keys [
Expand All @@ -44,5 +43,5 @@ defmodule Membrane.Bin.PadData do
:spec_ref
]

defstruct @enforce_keys ++ [:linking_timeout_ref]
defstruct @enforce_keys
end
4 changes: 2 additions & 2 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do
state = PadController.handle_linking_timeout(pad_ref, timeout_ref, state)
defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
state = PadController.handle_linking_timeout(pad_ref, state)
{:noreply, state}
end

Expand Down
34 changes: 13 additions & 21 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,7 @@ defmodule Membrane.Core.Bin.PadController do

state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})

state =
if PadModel.get_data!(state, pad_ref, :endpoint) == nil do
# If there's no endpoint associated to the pad, no internal link to the pad
# has been requested in the bin yet

linking_timeout_ref = make_ref()
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref])
Process.send_after(self(), message, 5000)

PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref)
else
state
end
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)

maybe_handle_pad_added(pad_ref, state)
end
Expand All @@ -108,15 +96,19 @@ defmodule Membrane.Core.Bin.PadController do
end
end

@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: State.t()
def handle_linking_timeout(pad_ref, timeout_ref, state) do
map_set_item = {pad_ref, timeout_ref}
@spec handle_linking_timeout(Pad.ref(), State.t()) :: State.t() | no_return()
def handle_linking_timeout(pad_ref, state) do
case Map.fetch(state.linking_timeout_counters, pad_ref) do
{:ok, 1} ->
Map.update!(state, :linking_timeout_counters, &Map.delete(&1, pad_ref))

if MapSet.member?(state.initialized_internal_pads, map_set_item) do
Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item))
else
raise Membrane.LinkError,
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}"
{:ok, counter} when counter > 1 ->
put_in(state.linking_timeout_counters[pad_ref], counter - 1)

_else ->
raise Membrane.LinkError, """
Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}
"""
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Membrane.Core.Bin.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
initialized_internal_pads: MapSet.t(Pad.ref()),
linking_timeout_counters: %{optional(Pad.ref()) => integer()},
stalker: Membrane.Core.Stalker.t()
}

Expand Down Expand Up @@ -77,6 +77,6 @@ defmodule Membrane.Core.Bin.State do
resource_guard: nil,
subprocess_supervisor: nil,
children_log_metadata: [],
initialized_internal_pads: MapSet.new(),
linking_timeout_counters: %{},
pads_data: nil
end
22 changes: 8 additions & 14 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ defmodule Membrane.Core.Parent.ChildLifeController do
alias Membrane.Pad
alias Membrane.ParentError

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -313,22 +312,17 @@ defmodule Membrane.Core.Parent.ChildLifeController do
defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
state =
with %Bin.State{} <- state do
initialized_internal_pads =
linking_timeout_counters =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} ->
with {Membrane.Bin, :itself} <- child,
{:ok, timeout_ref} when timeout_ref != nil <-
PadModel.get_data(state, pad_ref, :linking_timeout_ref) do
[{pad_ref, timeout_ref}]
else
_other -> []
end
end)
|> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1))

%{state | initialized_internal_pads: initialized_internal_pads}
|> Enum.filter(&(&1.child == {Membrane.Bin, :itself}))
|> Enum.reduce(
state.linking_timeout_counters,
&Map.update(&2, &1.pad_ref, 1, fn i -> i + 1 end)
)

%{state | linking_timeout_counters: linking_timeout_counters}
end

do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
Expand Down
1 change: 1 addition & 0 deletions test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ defmodule Membrane.Integration.LinkingTest do
Membrane.Pipeline.terminate(pipeline)
end

@tag :xd
test "Bin should crash if it doesn't link internally within timeout" do
defmodule NoInternalLinkBin do
use Membrane.Bin
Expand Down

0 comments on commit 814f10b

Please sign in to comment.