-
Notifications
You must be signed in to change notification settings - Fork 0
/
sink.go
121 lines (102 loc) · 2.47 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package audio
import (
"sync/atomic"
)
// Sink is an audio sink that processes audio chunks using an array of transformers.
type Sink interface {
Append(*Chunk)
OutputTo(Sink)
Drain() <-chan *Chunk
}
// Transformer is a function that modifies the chunk in place.
type Transformer func(*Chunk)
// TransformSink allows using audio transformers on passed in chunks.
type TransformSink struct {
fx []Transformer
out chan *Chunk
}
// NewTransformSink constructor.
func NewTransformSink(fx ...Transformer) *TransformSink {
return &TransformSink{
fx: fx,
out: make(chan *Chunk),
}
}
// Append to sink.
func (sink *TransformSink) Append(chunk *Chunk) {
// apply all transforms
for _, tr := range sink.fx {
tr(chunk)
}
sink.out <- chunk
}
// OutputTo forwards all output to next sink.
func (sink *TransformSink) OutputTo(nextSink Sink) {
for chunk := range sink.out {
nextSink.Append(chunk)
}
}
// Drain the sink.
func (sink *TransformSink) Drain() <-chan *Chunk {
return sink.out
}
// EmptySink outputs all chunks it receives.
type EmptySink struct {
out chan *Chunk
}
// NewEmptySink constructor.
func NewEmptySink() *EmptySink {
return &EmptySink{
out: make(chan *Chunk, 1),
}
}
// Append to sink.
func (sink *EmptySink) Append(chunk *Chunk) {
sink.out <- chunk
}
// OutputTo forwards all output to next sink.
func (sink *EmptySink) OutputTo(nextSink Sink) {
for chunk := range sink.out {
nextSink.Append(chunk)
}
}
// Drain the sink.
func (sink *EmptySink) Drain() <-chan *Chunk {
return sink.out
}
// OrderedSink allows appending audio chunks in any order
// and outputs them ordered.
type OrderedSink struct {
streamStart uint64
locker *PriorityLocker
out chan *Chunk
}
// NewOrderedSink constructor.
func NewOrderedSink(streamStart uint64) *OrderedSink {
return &OrderedSink{
streamStart: streamStart,
locker: NewPriorityLocker(streamStart),
out: make(chan *Chunk, 1),
}
}
// Append a chunk.
func (sink *OrderedSink) Append(chunk *Chunk) {
if chunk.StreamStart != atomic.LoadUint64(&sink.streamStart) {
panic("new stream must use a new sink")
}
// lock forces the order of chunks to be sorted.
mu := sink.locker.NewLock(chunk.Index)
mu.Lock()
defer mu.Unlock()
sink.out <- chunk
}
// OutputTo forwards all output to next sink.
func (sink *OrderedSink) OutputTo(nextSink Sink) {
for chunk := range sink.out {
nextSink.Append(chunk)
}
}
// Drain the audio.
func (sink *OrderedSink) Drain() <-chan *Chunk {
return sink.out
}