-
Notifications
You must be signed in to change notification settings - Fork 33
/
sink.ex
92 lines (72 loc) · 2.62 KB
/
sink.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
defmodule Membrane.Testing.Sink do
@moduledoc """
Sink Element that notifies the pipeline about buffers and events it receives.
By default `Sink` will demand buffers automatically, but you can override that
behaviour by using `autodemand` option. If set to false no automatic demands
shall be made. Demands can be then triggered by sending `{:make_demand, size}`
message.
This element can be used in conjunction with `Membrane.Testing.Pipeline` to
enable asserting on buffers and events it receives.
alias Membrane.Testing
children = [
...,
sink: %Testing.Sink{}
]
{:ok, pid} = Testing.Pipeline.start_link(
children: children,
links: Membrane.ParentSpec.link_linear(children)
)
Asserting that `Membrane.Testing.Sink` element processed a buffer that matches
a specific pattern can be achieved using
`Membrane.Testing.Assertions.assert_sink_buffer/3`.
assert_sink_buffer(pid, :sink ,%Membrane.Buffer{payload: 255})
"""
use Membrane.Sink
alias Membrane.Testing.Notification
def_input_pad :input,
demand_unit: :buffers,
caps: :any
def_options autodemand: [
type: :boolean,
default: true,
description: """
If true element will automatically make demands.
If it is set to false demand has to be triggered manually by sending `:make_demand` message.
"""
]
@impl true
def handle_init(opts) do
{:ok, opts}
end
@impl true
def handle_prepared_to_playing(_context, %{autodemand: true} = state),
do: {{:ok, demand: :input}, state}
def handle_prepared_to_playing(_context, state), do: {:ok, state}
@impl true
def handle_event(:input, event, _context, state) do
{{:ok, notify({:event, event})}, state}
end
@impl true
def handle_start_of_stream(pad, _ctx, state),
do: {{:ok, notify({:start_of_stream, pad})}, state}
@impl true
def handle_end_of_stream(pad, _ctx, state),
do: {{:ok, notify({:end_of_stream, pad})}, state}
@impl true
def handle_caps(pad, caps, _context, state),
do: {{:ok, notify({:caps, pad, caps})}, state}
@impl true
def handle_other({:make_demand, size}, _ctx, %{autodemand: false} = state) do
{{:ok, demand: {:input, size}}, state}
end
@impl true
def handle_write(:input, buf, _ctx, state) do
case state do
%{autodemand: false} -> {{:ok, notify({:buffer, buf})}, state}
%{autodemand: true} -> {{:ok, [demand: :input] ++ notify({:buffer, buf})}, state}
end
end
defp notify(payload) do
[notify: %Notification{payload: payload}]
end
end