From 814f10b9806983a6d539a4362f986649e31135f1 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 24 May 2024 11:42:14 +0200 Subject: [PATCH] Fix linking timeout mechanism --- lib/membrane/bin/pad_data.ex | 5 ++- lib/membrane/core/bin.ex | 4 +-- lib/membrane/core/bin/pad_controller.ex | 34 +++++++------------ lib/membrane/core/bin/state.ex | 4 +-- .../core/parent/child_life_controller.ex | 22 +++++------- test/membrane/integration/linking_test.exs | 1 + 6 files changed, 28 insertions(+), 42 deletions(-) diff --git a/lib/membrane/bin/pad_data.ex b/lib/membrane/bin/pad_data.ex index 8fce6efdf..4db74e343 100644 --- a/lib/membrane/bin/pad_data.ex +++ b/lib/membrane/bin/pad_data.ex @@ -27,8 +27,7 @@ defmodule Membrane.Bin.PadData do link_id: private_field, endpoint: private_field, linked?: private_field, - response_received?: private_field, - linking_timeout_ref: private_field + response_received?: private_field } @enforce_keys [ @@ -44,5 +43,5 @@ defmodule Membrane.Bin.PadData do :spec_ref ] - defstruct @enforce_keys ++ [:linking_timeout_ref] + defstruct @enforce_keys end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index e16acb297..2d324c99a 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do {:noreply, state} end - defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do - state = PadController.handle_linking_timeout(pad_ref, timeout_ref, state) + defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do + state = PadController.handle_linking_timeout(pad_ref, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 8152f7a3f..453e92463 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -71,19 +71,7 @@ defmodule Membrane.Core.Bin.PadController do state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options}) - state = - if PadModel.get_data!(state, pad_ref, :endpoint) == nil do - # If there's no endpoint associated to the pad, no internal link to the pad - # has been requested in the bin yet - - linking_timeout_ref = make_ref() - message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref]) - Process.send_after(self(), message, 5000) - - PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref) - else - state - end + _ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000) maybe_handle_pad_added(pad_ref, state) end @@ -108,15 +96,19 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: State.t() - def handle_linking_timeout(pad_ref, timeout_ref, state) do - map_set_item = {pad_ref, timeout_ref} + @spec handle_linking_timeout(Pad.ref(), State.t()) :: State.t() | no_return() + def handle_linking_timeout(pad_ref, state) do + case Map.fetch(state.linking_timeout_counters, pad_ref) do + {:ok, 1} -> + Map.update!(state, :linking_timeout_counters, &Map.delete(&1, pad_ref)) - if MapSet.member?(state.initialized_internal_pads, map_set_item) do - Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item)) - else - raise Membrane.LinkError, - "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}" + {:ok, counter} when counter > 1 -> + put_in(state.linking_timeout_counters[pad_ref], counter - 1) + + _else -> + raise Membrane.LinkError, """ + Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)} + """ end end diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 662538149..68421d016 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,7 +45,7 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - initialized_internal_pads: MapSet.t(Pad.ref()), + linking_timeout_counters: %{optional(Pad.ref()) => integer()}, stalker: Membrane.Core.Stalker.t() } @@ -77,6 +77,6 @@ defmodule Membrane.Core.Bin.State do resource_guard: nil, subprocess_supervisor: nil, children_log_metadata: [], - initialized_internal_pads: MapSet.new(), + linking_timeout_counters: %{}, pads_data: nil end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 04dc9d173..e972b44b5 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -17,7 +17,6 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias Membrane.Pad alias Membrane.ParentError - require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Component require Membrane.Core.Message, as: Message require Membrane.Logger @@ -313,22 +312,17 @@ defmodule Membrane.Core.Parent.ChildLifeController do defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do state = with %Bin.State{} <- state do - initialized_internal_pads = + linking_timeout_counters = spec_data.links_ids |> Enum.map(&Map.fetch!(state.links, &1)) |> Enum.flat_map(&[&1.from, &1.to]) - |> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} -> - with {Membrane.Bin, :itself} <- child, - {:ok, timeout_ref} when timeout_ref != nil <- - PadModel.get_data(state, pad_ref, :linking_timeout_ref) do - [{pad_ref, timeout_ref}] - else - _other -> [] - end - end) - |> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1)) - - %{state | initialized_internal_pads: initialized_internal_pads} + |> Enum.filter(&(&1.child == {Membrane.Bin, :itself})) + |> Enum.reduce( + state.linking_timeout_counters, + &Map.update(&2, &1.pad_ref, 1, fn i -> i + 1 end) + ) + + %{state | linking_timeout_counters: linking_timeout_counters} end do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state) diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index a2a9e5081..ec732a40a 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -353,6 +353,7 @@ defmodule Membrane.Integration.LinkingTest do Membrane.Pipeline.terminate(pipeline) end + @tag :xd test "Bin should crash if it doesn't link internally within timeout" do defmodule NoInternalLinkBin do use Membrane.Bin