diff --git a/lib/membrane/bin/pad_data.ex b/lib/membrane/bin/pad_data.ex index 4db74e343..8fce6efdf 100644 --- a/lib/membrane/bin/pad_data.ex +++ b/lib/membrane/bin/pad_data.ex @@ -27,7 +27,8 @@ defmodule Membrane.Bin.PadData do link_id: private_field, endpoint: private_field, linked?: private_field, - response_received?: private_field + response_received?: private_field, + linking_timeout_ref: private_field } @enforce_keys [ @@ -43,5 +44,5 @@ defmodule Membrane.Bin.PadData do :spec_ref ] - defstruct @enforce_keys + defstruct @enforce_keys ++ [:linking_timeout_ref] end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 37b09b22a..42e3c2e63 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do {:noreply, state} end - defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do - PadController.handle_linking_timeout(pad_ref, state) + defp do_handle_info(Message.new(:linking_timeout, [pad_ref, timeout_ref]), state) do + PadController.handle_linking_timeout(pad_ref, timeout_ref, state) {:noreply, state} end diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index af6c448d7..dc1010d94 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -70,16 +70,22 @@ defmodule Membrane.Core.Bin.PadController do end state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options}) - state = maybe_handle_pad_added(pad_ref, state) - unless PadModel.get_data!(state, pad_ref, :endpoint) do - # If there's no endpoint associated to the pad, no internal link to the pad - # has been requested in the bin yet - _ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000) - :ok - end + state = + if PadModel.get_data!(state, pad_ref, :endpoint) == nil do + # If there's no endpoint associated to the pad, no internal link to the pad + # has been requested in the bin yet - state + linking_timeout_ref = make_ref() + message = Message.new(:linking_timeout, [pad_ref, linking_timeout_ref]) + Process.send_after(self(), message, 5000) + + PadModel.set_data!(state, pad_ref, :linking_timeout_ref, linking_timeout_ref) + else + state + end + + maybe_handle_pad_added(pad_ref, state) end @spec remove_pad(Pad.ref(), State.t()) :: State.t() @@ -102,15 +108,15 @@ defmodule Membrane.Core.Bin.PadController do end end - @spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return() - def handle_linking_timeout(pad_ref, state) do - case PadModel.get_data(state, pad_ref) do - {:ok, %{endpoint: nil} = pad_data} -> - raise Membrane.LinkError, - "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}" + @spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return() + def handle_linking_timeout(pad_ref, timeout_ref, state) do + map_set_item = {pad_ref, timeout_ref} - _other -> - :ok + if MapSet.member?(state.initialized_internal_pads, map_set_item) do + Map.update!(state, :initialized_internal_pads, &MapSet.delete(&1, map_set_item)) + else + raise Membrane.LinkError, + "Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}" end end @@ -316,8 +322,8 @@ defmodule Membrane.Core.Bin.PadController do end @spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t() - defp maybe_handle_pad_added(ref, state) do - %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) + defp maybe_handle_pad_added(pad_ref, state) do + %{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref) if Pad.availability_mode(availability) == :dynamic do context = &CallbackContext.from_state(&1, pad_options: pad_opts) @@ -326,7 +332,7 @@ defmodule Membrane.Core.Bin.PadController do :handle_pad_added, ActionHandler, %{context: context}, - [ref], + [pad_ref], state ) else diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index e9605a1cf..662538149 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,6 +45,7 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), + initialized_internal_pads: MapSet.t(Pad.ref()), stalker: Membrane.Core.Stalker.t() } @@ -76,5 +77,6 @@ defmodule Membrane.Core.Bin.State do resource_guard: nil, subprocess_supervisor: nil, children_log_metadata: [], + initialized_internal_pads: MapSet.new(), pads_data: nil end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 794c5ea64..04dc9d173 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -17,6 +17,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do alias Membrane.Pad alias Membrane.ParentError + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Component require Membrane.Core.Message, as: Message require Membrane.Logger @@ -154,7 +155,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do state = put_in(state, [:pending_specs, spec_ref], %{ - status: :initializing, + status: :created, children_names: MapSet.new(all_children_names), links_ids: Enum.map(links, & &1.id), dependent_specs: dependent_specs, @@ -309,14 +310,36 @@ defmodule Membrane.Core.Parent.ChildLifeController do end end + defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do + state = + with %Bin.State{} <- state do + initialized_internal_pads = + spec_data.links_ids + |> Enum.map(&Map.fetch!(state.links, &1)) + |> Enum.flat_map(&[&1.from, &1.to]) + |> Enum.flat_map(fn %{child: child, pad_ref: pad_ref} -> + with {Membrane.Bin, :itself} <- child, + {:ok, timeout_ref} when timeout_ref != nil <- + PadModel.get_data(state, pad_ref, :linking_timeout_ref) do + [{pad_ref, timeout_ref}] + else + _other -> [] + end + end) + |> Enum.reduce(state.initialized_internal_pads, &MapSet.put(&2, &1)) + + %{state | initialized_internal_pads: initialized_internal_pads} + end + + do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state) + end + defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do Membrane.Logger.debug( "Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}" ) - %{children: children} = state - - if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and + if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and Enum.empty?(spec_data.dependent_specs) do Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized") do_proceed_spec_startup(spec_ref, %{spec_data | status: :initialized}, state) @@ -351,7 +374,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do end defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do - if Enum.empty?(spec_data.awaiting_responses) do + if MapSet.size(spec_data.awaiting_responses) == 0 do state = spec_data.links_ids |> Enum.map(&Map.fetch!(state.links, &1))