forked from CyCoreSystems/ari
-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue.go
146 lines (118 loc) · 3.23 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package audio
import (
"strings"
"sync"
"golang.org/x/net/context"
"github.com/CyCoreSystems/ari"
)
// Options describes various options which
// are available to playback operations.
type Options struct {
// ID is an optional ID to use for the playback's ID. If one
// is not supplied, an ID will be randomly generated internally.
// NOTE that this ID will only be used for the FIRST playback
// in a queue. All subsequent playback IDs will be randomly generated.
ID string
// ExitOnDTMF defines a list of DTMF digits on receipt of which will
// terminate the playback of the queue. You may set this to AllDTMF
// in order to match any DTMF digit.
ExitOnDTMF string
// Done is an optional channel for receiving notification when the playback
// is complete. This is useful if the playback is to be executed asynchronously.
// This channel will be closed by the playback when playback the is complete.
Done chan<- struct{}
}
// Queue represents a sequence of audio playbacks
// which are to be played on the associated Player
type Queue struct {
queue []string // List of mediaURI to be played
mu sync.Mutex
receivedDTMF string // Storage for received DTMF, if we are listening for them
}
// NewQueue creates (but does not start) a new playback queue.
func NewQueue() *Queue {
return &Queue{}
}
// Add appends one or more mediaURIs to the playback queue
func (pq *Queue) Add(mediaURIs ...string) {
// Make sure our queue exists
pq.mu.Lock()
if pq.queue == nil {
pq.queue = []string{}
}
pq.mu.Unlock()
// Add each media URI to the queue
for _, u := range mediaURIs {
if u == "" {
continue
}
pq.mu.Lock()
pq.queue = append(pq.queue, u)
pq.mu.Unlock()
}
}
// Flush empties a playback queue.
// NOTE that this does NOT stop the current playback.
func (pq *Queue) Flush() {
pq.mu.Lock()
pq.queue = []string{}
pq.mu.Unlock()
}
// ReceivedDTMF returns any DTMF which has been received
// by the PlaybackQueue.
func (pq *Queue) ReceivedDTMF() string {
return pq.receivedDTMF
}
// Play starts the playback of the queue to the Player.
func (pq *Queue) Play(ctx context.Context, p Player, opts *Options) (Status, error) {
if opts == nil {
opts = &Options{}
}
if opts.Done != nil {
defer close(opts.Done)
}
queue := make(chan string)
dtmfSub := p.Subscribe(ari.Events.ChannelDtmfReceived)
defer dtmfSub.Cancel()
pq.queue = append(pq.queue, "")
dtmfExit := make(chan struct{})
go func() {
defer close(queue)
for i := 0; i != len(pq.queue); {
select {
case e := <-dtmfSub.Events(): // read dtmf input
d := e.(*ari.ChannelDtmfReceived)
pq.receivedDTMF += d.Digit
if strings.Contains(opts.ExitOnDTMF, d.Digit) {
close(dtmfExit)
return
}
case queue <- pq.queue[i]: // send next item
i++
continue
case <-ctx.Done(): // wait for cancellation
return
}
}
}()
// Start the playback
for q := range queue {
if q == "" {
break
}
pb := PlayAsync(ctx, p, q)
select {
case <-dtmfExit:
pb.Cancel()
return Finished, nil
case <-pb.Stopped():
if pb.Status() > Finished {
return pb.Status(), pb.Err()
}
case <-ctx.Done():
// should be caught in PlayAsync but just in case..
return Canceled, ctx.Err()
}
}
return Finished, nil
}