Skip to content

Commit

Permalink
refactor link requests
Browse files Browse the repository at this point in the history
  • Loading branch information
mat-hek committed Oct 22, 2021
1 parent c57f3f7 commit 1521964
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 55 deletions.
17 changes: 0 additions & 17 deletions lib/membrane/core/bin.ex
Expand Up @@ -104,23 +104,6 @@ defmodule Membrane.Core.Bin do
}
|> PadSpecHandler.init_pads()

state =
state.pads.info
|> Map.values()
|> Enum.filter(&(&1.availability == :always))
|> Enum.reduce(state, fn info, state ->
data =
Map.merge(info, %{
link_id: nil,
endpoint: nil,
linked?: false,
spec_ref: nil,
options: nil
})

put_in(state, [:pads, :data, info.name], data)
end)

with {:ok, state} <-
CallbackHandler.exec_and_handle_callback(
:handle_init,
Expand Down
100 changes: 69 additions & 31 deletions lib/membrane/core/bin/pad_controller.ex
Expand Up @@ -5,7 +5,7 @@ defmodule Membrane.Core.Bin.PadController do

use Bunch
alias Bunch.Type
alias Membrane.{Core, Pad}
alias Membrane.{Core, LinkError, Pad}
alias Membrane.Core.{CallbackHandler, Child, Message}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Bin.ActionHandler
Expand All @@ -26,27 +26,58 @@ defmodule Membrane.Core.Bin.PadController do
:ok = Child.PadController.validate_pad_being_linked!(pad_ref, direction, info, state)
pad_props = Child.PadController.parse_pad_props!(pad_props, pad_name, state)

if info.availability == :always do
if PadModel.get_data!(state, pad_ref, :link_id) == :ready do
Membrane.Logger.debug("Sending link response, #{inspect(pad_ref)}")
Message.send(state.watcher, :link_response, link_id)
state =
cond do
:ok != PadModel.assert_instance(state, pad_ref) ->
init_pad_data(pad_ref, info, state)

PadModel.get_data!(state, pad_ref, :response_received?) ->
Membrane.Logger.debug("Sending link response, #{inspect(pad_ref)}")
Message.send(state.watcher, :link_response, link_id)
state

true ->
state
end

PadModel.set_data!(state, pad_ref, :link_id, link_id)
else
data =
Map.merge(info, %{
link_id: link_id,
endpoint: nil,
linked?: false,
spec_ref: nil,
options: pad_props.options
})

state = put_in(state, [:pads, :data, pad_ref], data)
{:ok, state} = handle_pad_added(pad_ref, state)
state
end
state =
PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_props.options})

{:ok, state} = handle_pad_added(pad_ref, state)
state
end

def handle_internal_link_request(pad_ref, direction, endpoint, spec_ref, state) do
pad_name = Pad.name_by_ref(pad_ref)
info = Map.fetch!(state.pads.info, pad_name)

state =
cond do
:ok == PadModel.assert_instance(state, pad_ref) ->
state

info.availability == :always ->
init_pad_data(pad_ref, info, state)

true ->
raise LinkError, "Dynamic pads must be firstly linked externally, then internally"
end

PadModel.update_data!(state, pad_ref, &%{&1 | endpoint: endpoint, spec_ref: spec_ref})
end

defp init_pad_data(pad_ref, info, state) do
data =
Map.merge(info, %{
link_id: nil,
endpoint: nil,
linked?: false,
response_received?: false,
spec_ref: nil,
options: nil
})

put_in(state, [:pads, :data, pad_ref], data)
end

@doc """
Expand Down Expand Up @@ -104,25 +135,32 @@ defmodule Membrane.Core.Bin.PadController do
@spec handle_pad_added(Pad.ref_t(), Core.Bin.State.t()) ::
Type.stateful_try_t(Core.Bin.State.t())
defp handle_pad_added(ref, state) do
%{options: pad_opts, direction: direction} = PadModel.get_data!(state, ref)
context = &CallbackContext.PadAdded.from_state(&1, options: pad_opts, direction: direction)

CallbackHandler.exec_and_handle_callback(
:handle_pad_added,
ActionHandler,
%{context: context},
[ref],
state
)
%{options: pad_opts, direction: direction, availability: availability} =
PadModel.get_data!(state, ref)

if Pad.availability_mode(availability) == :dynamic do
context = &CallbackContext.PadAdded.from_state(&1, options: pad_opts, direction: direction)

CallbackHandler.exec_and_handle_callback(
:handle_pad_added,
ActionHandler,
%{context: context},
[ref],
state
)
else
{:ok, state}
end
end

@spec handle_pad_removed(Pad.ref_t(), Core.Bin.State.t()) ::
Type.stateful_try_t(Core.Bin.State.t())
def handle_pad_removed(ref, state) do
%{direction: direction, availability: availability} = PadModel.get_data!(state, ref)
context = &CallbackContext.PadRemoved.from_state(&1, direction: direction)

if Pad.availability_mode(availability) == :dynamic do
context = &CallbackContext.PadRemoved.from_state(&1, direction: direction)

CallbackHandler.exec_and_handle_callback(
:handle_pad_removed,
ActionHandler,
Expand Down
Expand Up @@ -4,6 +4,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkHandler do
use Bunch

alias Membrane.Core.{Bin, Child, Message, Parent, Telemetry}
alias Membrane.Core.Bin.PadController
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Parent.{CrashGroup, Link, LinkParser}
alias Membrane.Core.Parent.Link.Endpoint
Expand Down Expand Up @@ -41,19 +42,15 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkHandler do
end

defp do_request_link(
_direction,
direction,
%Link.Endpoint{child: {Membrane.Bin, :itself}} = this,
other,
spec_ref,
_link_id,
state
) do
state =
PadModel.update_data!(
state,
this.pad_ref,
&%{&1 | endpoint: other, spec_ref: spec_ref}
)
PadController.handle_internal_link_request(this.pad_ref, direction, other, spec_ref, state)

{0, state}
end
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/core/parent/message_dispatcher.ex
Expand Up @@ -87,7 +87,12 @@ defmodule Membrane.Core.Parent.MessageDispatcher do
Message.send(state.watcher, :link_response, link_id)
state
else
Membrane.Core.Child.PadModel.set_data!(state, endpoint.pad_ref, :link_id, :ready)
Membrane.Core.Child.PadModel.set_data!(
state,
endpoint.pad_ref,
:response_received?,
true
)
end
end)
end
Expand Down

0 comments on commit 1521964

Please sign in to comment.