-
Notifications
You must be signed in to change notification settings - Fork 32
/
dynamic_source.ex
146 lines (122 loc) · 4.96 KB
/
dynamic_source.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
defmodule Membrane.Testing.DynamicSource do
@moduledoc """
Testing Element for supplying data based on generator function or payloads passed
through options. It is very similar to `Membrane.Testing.Source` but is has dynamic
pad instead of static.
## Example usage
As mentioned earlier you can use this element in one of two ways, providing
either a generator function or an `Enumerable.t`.
If you provide an `Enumerable.t` with payloads, then each of those payloads will
be wrapped in a `Membrane.Buffer` and sent through `:output` pad.
```
%Source{output: [0xA1, 0xB2, 0xC3, 0xD4]}
```
In order to specify `Membrane.Testing.Source` with generator function you need
to provide initial state and function that matches `t:generator/0` type. This
function should take state and demand size as its arguments and return
a tuple consisting of actions that element will return during the
`c:Membrane.Element.WithOutputPads.handle_demand/5`
callback and new state.
```
generator_function = fn state, pad, size ->
#generate some buffers
{actions, state + 1}
end
%Source{output: {1, generator_function}}
```
"""
use Membrane.Source
alias Membrane.Buffer
alias Membrane.Element.Action
@type generator ::
(state :: any(), pad :: Pad.ref_t(), buffers_cnt :: pos_integer ->
{[Action.t()], state :: any()})
def_output_pad :output, accepted_format: _any, availability: :on_request
def_options output: [
spec: {initial_state :: any(), generator()} | Enum.t(),
default: {0, &__MODULE__.default_buf_gen/3},
description: """
If `output` is an enumerable with `t:Membrane.Payload.t/0` then
buffer containing those payloads will be sent through the
`:output` pad and followed by `t:Membrane.Element.Action.end_of_stream_t/0`.
If `output` is a `{initial_state, function}` tuple then the
the function will be invoked each time `handle_demand` is called.
It is an action generator that takes two arguments.
The first argument is the state that is initially set to
`initial_state`. The second one defines the pad on which the demand
has been requested. The third one defines the size of the demand.
Such function should return `{actions, next_state}` where
`actions` is a list of actions that will be returned from
`handle_demand/4` and `next_state` is the value that will be
used for the next call.
"""
],
stream_format: [
spec: struct(),
default: %Membrane.RemoteStream{},
description: """
Stream format to be sent before the `output`.
"""
]
@spec default_buf_gen(integer(), Pad.ref_t(), integer()) :: {[Action.t()], integer()}
def default_buf_gen(generator_state, pad, size) do
buffers =
generator_state..(size + generator_state - 1)
|> Enum.map(fn generator_state ->
%Buffer{payload: <<generator_state::16>>}
end)
action = [buffer: {pad, buffers}]
{action, generator_state + size}
end
@impl true
def handle_init(_ctx, opts) do
opts = Map.from_struct(opts)
case opts.output do
{initial_state, generator} when is_function(generator) ->
{[],
Map.merge(opts, %{
type: :generator,
generator: generator,
generator_state: initial_state,
state_for_pad: %{}
})}
_enumerable_output ->
{[], Map.merge(opts, %{type: :enum, output: opts.output, output_for_pad: %{}})}
end
end
@impl true
def handle_playing(ctx, state) do
actions = Map.keys(ctx.pads) |> Enum.map(&{:stream_format, {&1, state.stream_format}})
{actions, state}
end
@impl true
def handle_pad_added(pad, _ctx, %{type: :enum} = state) do
{[], Map.update!(state, :output_for_pad, &Map.put(&1, pad, state.output))}
end
@impl true
def handle_pad_added(pad, _ctx, %{type: :generator} = state) do
{[], Map.update!(state, :state_for_pad, &Map.put(&1, pad, state.generator_state))}
end
@impl true
def handle_demand(pad, _size, :buffers, _ctx, %{type: :enum} = state) do
output_for_pad = state.output_for_pad[pad]
if length(output_for_pad) > 0 do
[payload | rest] = output_for_pad
{
[
{:buffer, {pad, %Buffer{payload: payload}}},
{:redemand, pad}
],
Map.update!(state, :output_for_pad, &Map.put(&1, pad, rest))
}
else
{[end_of_stream: pad], state}
end
end
@impl true
def handle_demand(pad, _size, :buffers, _ctx, %{type: :generator} = state) do
state_for_pad = state.state_for_pad[pad]
{actions, new_state} = state.generator.(state_for_pad, pad, 1)
{actions ++ [redemand: pad], Map.update!(state, :state_for_pad, &Map.put(&1, pad, new_state))}
end
end