Skip to content

Commit

Permalink
Implement Debug.Filter and Debug.Sink (#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 15, 2023
1 parent e8278aa commit fade9a0
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 2 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Introduce support for crash groups in Bins. [#521](https://github.com/membraneframework/membrane_core/pull/521)
* Remove `assert_pipeline_play/2` from `Membrane.Testing.Assertions`. [#528](https://github.com/membraneframework/membrane_core/pull/528)
* Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct.
* Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand All @@ -38,8 +39,7 @@
* Remove all deprecated stuff [#399](https://github.com/membraneframework/membrane_core/pull/399)
* Make `Membrane.Pipeline.{prepare, play, stop}` deprecated and add `:playback` action instead
* Make `Membrane.Pipeline.stop_and_terminate` deprecated and add `Membrane.Pipeline.terminate/2` instead
* Add `Membrane.RemoteControlled.Pipeline` - a basic implementation of a `Membrane.Pipeline` that </br>
can be spawned and controlled by an external process [#366](https://github.com/membraneframework/membrane_core/pull/366)
* Add `Membrane.RemoteControlled.Pipeline` - a basic implementation of a `Membrane.Pipeline` that can be spawned and controlled by an external process [#366](https://github.com/membraneframework/membrane_core/pull/366)
* Disallow sending buffers without sending caps first [#341](https://github.com/membraneframework/membrane_core/issues/341)
* Refine the `Membrane.Testing.Pipeline` API - deprecate the `Membrane.Testing.Pipeline.Options` usage, use keyword list as options in `Membrane.Testing.Pipeline.start/1` and `Membrane.Testing.Pipeline.start_link/1`

Expand Down
78 changes: 78 additions & 0 deletions lib/membrane/debug/filter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule Membrane.Debug.Filter do
@moduledoc """
Membrane Filter, that can be used to create a child that will be used to debug data flowing thouth pipeline.
Any buffers, stream formats and events arriving to #{__MODULE__} will be forwarded by it to the opposite
side than the one from which they came.
Usage example:
```elixir
child(:source, CustomSource)
|> child(:filter, %Membrane.Debug.Filter{
handle_buffer: &IO.inspect(&1, label: "buffer"),
handle_stream_format: &IO.inspect(&1, label: "stream format")
})
|> child(:sink, CustomSink)
```
"""

use Membrane.Filter

alias Membrane.Buffer
alias Membrane.Event
alias Membrane.StreamFormat

def_input_pad :input, accepted_format: _any, flow_control: :auto
def_output_pad :output, accepted_format: _any, flow_control: :auto

@spec noop(any()) :: :ok
def noop(_arg), do: :ok

def_options handle_buffer: [
spec: (Buffer.t() -> any()),
default: &__MODULE__.noop/1,
description: """
Function with arity 1, that will be called with all buffers handled by this sink.
Result of this function is ignored.
"""
],
handle_event: [
spec: (Event.t() -> any()),
default: &__MODULE__.noop/1,
description: """
Function with arity 1, that will be called with all events handled by this sink.
Result of this function is ignored.
"""
],
handle_stream_format: [
spec: (StreamFormat.t() -> any()),
default: &__MODULE__.noop/1,
description: """
Function with arity 1, that will be called with all stream formats handled by this sink.
Result of this function is ignored.
"""
]

@impl true
def handle_init(_ctx, opts) do
{[], Map.from_struct(opts)}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
_ingored = state.handle_buffer.(buffer)
{[buffer: {:output, buffer}], state}
end

@impl true
def handle_event(_pad, event, _ctx, state) do
_ingored = state.handle_event.(event)
{[forward: event], state}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
_ingored = state.handle_stream_format.(stream_format)
{[stream_format: {:output, stream_format}], state}
end
end
73 changes: 73 additions & 0 deletions lib/membrane/debug/sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
defmodule Membrane.Debug.Sink do
@moduledoc """
Membrane Sink, that can be used to create a child that will be used to debug data flowing thouth pipeline.
Usage example:
```elixir
child(:source, CustomSource)
|> child(:sink, %Membrane.Debug.Sink{
handle_buffer: &IO.inspect(&1, label: "buffer"),
handle_event: &IO.inspect(&1, label: "event")
})
```
"""

use Membrane.Sink

alias Membrane.Buffer
alias Membrane.Event
alias Membrane.StreamFormat

def_input_pad :input, accepted_format: _any, flow_control: :auto

@spec noop(any()) :: :ok
def noop(_arg), do: :ok

def_options handle_buffer: [
spec: (Buffer.t() -> any()),
default: &__MODULE__.noop/1,
description: """
Function with arity 1, that will be called with all buffers handled by this sink.
Result of this function is ignored.
"""
],
handle_event: [
spec: (Event.t() -> any()),
default: &__MODULE__.noop/1,
description: """
Function with arity 1, that will be called with all events handled by this sink.
Result of this function is ignored.
"""
],
handle_stream_format: [
spec: (StreamFormat.t() -> any()),
default: &__MODULE__.noop/1,
description: """
Function with arity 1, that will be called with all stream formats handled by this sink.
Result of this function is ignored.
"""
]

@impl true
def handle_init(_ctx, opts) do
{[], Map.from_struct(opts)}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
_ignored = state.handle_buffer.(buffer)
{[], state}
end

@impl true
def handle_event(:input, event, _ctx, state) do
_ignored = state.handle_event.(event)
{[], state}
end

@impl true
def handle_stream_format(:input, stream_format, _ctx, state) do
_ignored = state.handle_stream_format.(stream_format)
{[], state}
end
end
53 changes: 53 additions & 0 deletions test/membrane/integration/debug_elements_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule Membrane.Integration.DebugElementsTest do
use ExUnit.Case

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Debug
alias Membrane.Testing

test "Membrane.Debug.Filter calls function passed in :handle_buffer and forwards buffers on :output pad" do
payloads = Enum.map(1..100, &inspect/1)
test_pid = self()

spec =
child(:source, %Testing.Source{output: payloads})
|> child(%Debug.Filter{handle_buffer: &send(test_pid, {:buffer, &1})})
|> child(:sink, Testing.Sink)

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_sink_stream_format(pipeline, :sink, _any)

for expected_payload <- payloads do
assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^expected_payload})
assert_receive {:buffer, %Buffer{payload: ^expected_payload}}
end

Testing.Pipeline.terminate(pipeline)
end

test "Membrane.Debug.Sink calls function passed in :handle_buffer" do
payloads = Enum.map(1..100, &inspect/1)
test_pid = self()

spec =
child(:source, %Testing.Source{output: payloads})
|> child(:sink, %Debug.Sink{
handle_buffer: &send(test_pid, {:buffer, &1}),
handle_stream_format: &send(test_pid, {:stream_format, &1})
})

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_receive {:stream_format, _any}

for expected_payload <- payloads do
assert_receive {:buffer, %Buffer{payload: ^expected_payload}}
end

Testing.Pipeline.terminate(pipeline)
end
end

0 comments on commit fade9a0

Please sign in to comment.