-
Notifications
You must be signed in to change notification settings - Fork 33
/
filter_aggregator.ex
317 lines (252 loc) · 10.5 KB
/
filter_aggregator.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
defmodule Membrane.FilterAggregator do
@moduledoc """
An element allowing to aggregate many filters within one Elixir process.
Warning: This element is still in experimental phase
This element supports only filters with one input and one output
with following restrictions:
* not using timers
* not relying on received messages
* not expecting any events coming from downstream elements
* their pads have to be named `:input` and `:output`
* their pads cannot use manual demands
* the first filter must make demands in buffers
"""
use Membrane.Filter
alias Membrane.Core.FilterAggregator.Context
alias Membrane.Element.CallbackContext
require Membrane.Core.FilterAggregator.InternalAction, as: InternalAction
require Membrane.Logger
def_options filters: [
spec: [{Membrane.Child.name_t(), module() | struct()}],
description: "A list of filters applied to incoming stream"
]
def_input_pad :input,
accepted_format: _any,
demand_mode: :auto
def_output_pad :output,
demand_mode: :auto,
accepted_format: _any
@impl true
def handle_init(agg_ctx, %__MODULE__{filters: filter_specs}) do
if filter_specs == [] do
Membrane.Logger.warn("No filters provided, #{inspect(__MODULE__)} will be a no-op")
end
states =
filter_specs
|> Enum.map(fn
{name, %module{} = options} ->
{name, module, options}
{name, module} ->
unless is_atom(module) and Membrane.Element.element?(module) do
raise ArgumentError, """
The module "#{inspect(module)}" is not a Membrane Element.
Make sure that given module is the right one, implements proper behaviour
and all needed dependencies are properly specified in the Mixfile.
"""
end
options =
module
|> Code.ensure_loaded!()
|> function_exported?(:__struct__, 1)
|> case do
true -> struct!(module, [])
false -> %{}
end
{name, module, options}
invalid_spec ->
raise ArgumentError, "Invalid filter spec: `#{inspect(invalid_spec)}`"
end)
|> Enum.map(fn {name, module, options} ->
context = Context.build_context!(name, module, agg_ctx)
{[], state} = module.handle_init(context, options)
{name, module, context, state}
end)
{[], %{states: states}}
end
@impl true
def handle_setup(_ctx, %{states: states}) do
{actions, states} = pipe_downstream([InternalAction.setup()], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
@impl true
def handle_playing(agg_ctx, %{states: states}) do
contexts = states |> Enum.map(fn {_name, _module, context, _state} -> context end)
prev_contexts = contexts |> List.insert_at(-1, agg_ctx)
next_contexts = [agg_ctx | contexts]
states =
[prev_contexts, states, next_contexts]
|> Enum.zip_with(fn [prev_context, {name, module, context, state}, next_context] ->
context = Context.link_contexts(context, prev_context, next_context)
{name, module, context, state}
end)
{actions, states} = pipe_downstream([InternalAction.playing()], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
@impl true
def handle_start_of_stream(:input, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([InternalAction.start_of_stream(:output)], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
@impl true
def handle_end_of_stream(:input, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([end_of_stream: :output], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
@impl true
def handle_event(:input, event, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([event: {:output, event}], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
@impl true
def handle_stream_format(:input, stream_format, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([stream_format: {:output, stream_format}], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
@impl true
def handle_process_list(:input, buffers, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffers}], states)
actions = reject_internal_actions(actions)
{actions, %{states: states}}
end
# Takes actions received from the upstream elements (closer to source) and performs them on elements from first to last,
# i.e. in the direction of data flow
defp pipe_downstream(initial_actions, states) do
{states, actions} =
states
|> Enum.map_reduce(initial_actions, fn {name, module, context, state}, actions ->
{actions, next_context, next_state} = perform_actions(actions, module, context, state, [])
{{name, module, next_context, next_state}, actions}
end)
{actions, states}
end
defp perform_actions([], _module, context, state, next_actions_acc) do
{next_actions_acc |> Enum.reverse() |> List.flatten(), context, state}
end
defp perform_actions([action | actions], module, context, state, next_actions_acc) do
context = Context.before_incoming_action(context, action)
result = perform_action(action, module, context, state)
context = Context.after_incoming_action(context, action)
case result do
# Perform split actions within the same element
{[{:split, _action} | _tail] = next_actions, next_state} ->
perform_actions(
next_actions ++ actions,
module,
context,
next_state,
next_actions_acc
)
{next_actions, next_state} when is_list(next_actions) ->
next_actions = transform_out_actions(next_actions)
next_context = Context.after_out_actions(context, next_actions)
perform_actions(actions, module, next_context, next_state, [
next_actions | next_actions_acc
])
{[], next_state} ->
perform_actions(actions, module, context, next_state, next_actions_acc)
term ->
raise "Invalid return from callback: #{inspect(term)}"
end
end
defp perform_action({:buffer, {:output, []}}, _module, _context, state) do
{[], state}
end
defp perform_action({:buffer, {:output, buffer}}, module, context, state) do
cb_context = struct!(CallbackContext.Process, context)
module.handle_process_list(:input, List.wrap(buffer), cb_context, state)
end
defp perform_action({:stream_format, {:output, stream_format}}, module, context, state) do
cb_context =
context
|> Map.put(:old_stream_format, context.pads.input.stream_format)
|> then(&struct!(CallbackContext.StreamFormat, &1))
module.handle_stream_format(:input, stream_format, cb_context, state)
end
defp perform_action({:event, {:output, event}}, module, context, state) do
cb_context = struct!(CallbackContext.Event, context)
module.handle_event(:input, event, cb_context, state)
end
# Internal, FilterAggregator action used to trigger handle_start_of_stream
defp perform_action(InternalAction.start_of_stream(:output), module, context, state) do
cb_context = struct!(CallbackContext.StreamManagement, context)
{actions, new_state} = module.handle_start_of_stream(:input, cb_context, state)
{[InternalAction.start_of_stream(:output) | actions], new_state}
end
defp perform_action({:end_of_stream, :output}, module, context, state) do
cb_context = struct!(CallbackContext.StreamManagement, context)
module.handle_end_of_stream(:input, cb_context, state)
end
defp perform_action({:demand, {:input, _size}}, _module, _context, _state) do
raise "Manual demands are not supported by #{inspect(__MODULE__)}"
end
defp perform_action({:redemand, :output}, _module, _context, _state) do
raise "Demands are not supported by #{inspect(__MODULE__)}"
end
defp perform_action({:notify, message}, _module, _context, state) do
# Pass the action downstream
{[notify: message], state}
end
# Internal action used to manipulate context after performing an action
defp perform_action(InternalAction.merge_context(_ctx_data), _module, _context, state) do
{[], state}
end
defp perform_action({:split, {:handle_process, []}}, _module, _context, state) do
{[], state}
end
defp perform_action({:split, {:handle_process, args_lists}}, module, context, state) do
{actions, {context, state}} =
args_lists
|> Enum.flat_map_reduce({context, state}, fn [:input, buffer], {acc_context, acc_state} ->
acc_context = Context.before_incoming_action(acc_context, {:buffer, {:output, buffer}})
cb_context = struct!(CallbackContext.Process, acc_context)
{actions, state} = module.handle_process(:input, buffer, cb_context, acc_state)
acc_context = Context.after_incoming_action(acc_context, {:buffer, {:output, buffer}})
{actions, {acc_context, state}}
end)
{actions ++ [InternalAction.merge_context(context)], state}
end
defp perform_action(InternalAction.setup(), module, context, state) do
cb_context = struct!(CallbackContext.Setup, context)
{actions, state} = module.handle_setup(cb_context, state)
{actions ++ [InternalAction.setup()], state}
end
defp perform_action(InternalAction.playing(), module, context, state) do
cb_context = struct!(CallbackContext.Playing, context)
{actions, state} = module.handle_playing(cb_context, state)
{actions ++ [InternalAction.playing()], state}
end
defp perform_action({:latency, _latency}, _module, _context, _state) do
raise "latency action not supported in #{inspect(__MODULE__)}"
end
defp reject_internal_actions(actions) do
actions
|> Enum.reject(&InternalAction.is_internal_action/1)
end
defp transform_out_actions(actions) do
actions
|> Enum.map(fn
{:forward, data} -> resolve_forward_action(data)
action -> action
end)
end
defp resolve_forward_action(%Membrane.Buffer{} = buffer) do
{:buffer, {:output, buffer}}
end
defp resolve_forward_action([%Membrane.Buffer{} | _tail] = buffers) do
{:buffer, {:output, buffers}}
end
defp resolve_forward_action(:end_of_stream) do
{:end_of_stream, :output}
end
defp resolve_forward_action(%_struct{} = data) do
if Membrane.Event.event?(data),
do: {:event, {:output, data}},
else: {:stream_format, {:output, data}}
end
end