/
playlistmanager.go
368 lines (322 loc) · 10 KB
/
playlistmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
package core
import (
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-tools/drivers"
ffmpeg "github.com/livepeer/lpms/ffmpeg"
"github.com/livepeer/m3u8"
)
const LIVE_LIST_LENGTH uint = 6
const (
jsonPlaylistRotationInterval = 60 * 60 * 1000 // 1 hour (in ms)
jsonPlaylistMaxRetries = 30
JsonPlaylistInitialTimeout = 5 * time.Second
JsonPlaylistMaxTimeout = 120 * time.Second
)
var JsonPlaylistQuitTimeout = 60 * time.Second
// PlaylistManager manages playlists and data for one video stream, backed by one object storage.
type PlaylistManager interface {
ManifestID() ManifestID
// Implicitly creates master and media playlists
// Inserts in media playlist given a link to a segment
InsertHLSSegment(profile *ffmpeg.VideoProfile, seqNo uint64, uri string, duration float64) error
InsertHLSSegmentJSON(profile *ffmpeg.VideoProfile, seqNo uint64, uri string, duration float64)
GetHLSMasterPlaylist() *m3u8.MasterPlaylist
GetHLSMediaPlaylist(rendition string) *m3u8.MediaPlaylist
GetOSSession() drivers.OSSession
GetRecordOSSession() drivers.OSSession
FlushRecord()
Cleanup()
}
type BasicPlaylistManager struct {
storageSession drivers.OSSession
recordSession drivers.OSSession
manifestID ManifestID
// Live playlist used for broadcasting
masterPList *m3u8.MasterPlaylist
mediaLists map[string]*m3u8.MediaPlaylist
mapSync *sync.RWMutex
jsonList *JsonPlaylist
jsonListWriteQueue *drivers.OverwriteQueue
jsonListSync *sync.Mutex
}
type jsonSeg struct {
SeqNo uint64 `json:"seq_no,omitempty"`
URI string `json:"uri,omitempty"`
DurationMs uint64 `json:"duration_ms,omitempty"`
discontinuity bool
}
func (js *jsonSeg) GetDiscontinuity() bool {
return js.discontinuity
}
type JsonPlaylist struct {
name string
DurationMs uint64 `json:"duration_ms,omitempty"` // total duration of the saved sagments
Tracks []JsonMediaTrack `json:"tracks,omitempty"`
Segments map[string][]jsonSeg `json:"segments,omitempty"`
}
type JsonMediaTrack struct {
Name string `json:"name,omitempty"`
Bandwidth uint32 `json:"bandwidth,omitempty"`
Resolution string `json:"resolution,omitempty"`
}
func NewJSONPlaylist() *JsonPlaylist {
return &JsonPlaylist{
name: fmt.Sprintf("playlist_%d.json", time.Now().UnixNano()),
Segments: make(map[string][]jsonSeg),
}
}
// AddMaster adds data about tracks
func (jpl *JsonPlaylist) AddMaster(ajpl *JsonPlaylist) {
for _, track := range ajpl.Tracks {
if !jpl.hasTrack(track.Name) {
jpl.Tracks = append(jpl.Tracks, track)
}
}
}
// AddSegmentsToMPL adds segments to the MediaPlaylist
func (jpl *JsonPlaylist) AddSegmentsToMPL(manifestIDs []string, trackName string, mpl *m3u8.MediaPlaylist, extURL string) {
for _, seg := range jpl.Segments[trackName] {
// make relative URL from absolute one
uri := seg.URI
mindex, manifestIDlen := indexOf(uri, manifestIDs)
if mindex != -1 {
// If extURL was specified we will put absolute URL to the segment into manifest
// extURL points to the root of object store, so we should take part of the 'uri'
// which contains manifestID.
// If extURL is not specified then we're serving relative URL to the segment,
// and address at which manifest is served already contains manfiestID
// (broadcaster.com/recordings/manifestID/index.m3u8), so we're taking
// part of the 'uri' after the manifestID
if extURL != "" {
uri = common.JoinURL(extURL, uri[mindex:])
} else {
uri = uri[mindex+manifestIDlen+1:]
}
}
mseg := &m3u8.MediaSegment{
URI: uri,
Duration: float64(seg.DurationMs) / 1000.0,
Discontinuity: seg.discontinuity,
}
mpl.InsertSegment(seg.SeqNo, mseg)
}
}
func (jpl *JsonPlaylist) hasTrack(trackName string) bool {
for _, track := range jpl.Tracks {
if track.Name == trackName {
return true
}
}
return false
}
// AddDiscontinuedTrack appends all segments for specified rendition, mark first one as discontinued
func (jpl *JsonPlaylist) AddDiscontinuedTrack(ajpl *JsonPlaylist, trackName string) {
curSegs := jpl.Segments[trackName]
var lastSeq uint64
var disc bool
if len(curSegs) > 0 {
lastSeq = curSegs[len(curSegs)-1].SeqNo
lastSeq++
disc = true
}
for i, seg := range ajpl.Segments[trackName] {
if i == 0 {
seg.discontinuity = disc
}
seg.SeqNo += lastSeq
curSegs = append(curSegs, seg)
}
jpl.Segments[trackName] = curSegs
}
// AddTrack adds segments data for specified rendition
func (jpl *JsonPlaylist) AddTrack(ajpl *JsonPlaylist, trackName string) {
curSegs := jpl.Segments[trackName]
var lastSeq uint64
if len(curSegs) > 0 {
lastSeq = curSegs[len(curSegs)-1].SeqNo
}
for _, seg := range ajpl.Segments[trackName] {
needSort := false
if seg.SeqNo > lastSeq {
curSegs = append(curSegs, seg)
} else {
i := sort.Search(len(curSegs), func(i int) bool {
return curSegs[i].SeqNo >= seg.SeqNo
})
if i < len(curSegs) && curSegs[i].SeqNo == seg.SeqNo {
// x is present at data[i]
} else {
// x is not present in data,
// but i is the index where it would be inserted.
if i < len(curSegs) {
needSort = true
}
curSegs = append(curSegs, seg)
}
}
if needSort {
sort.Slice(curSegs, func(i, j int) bool {
return curSegs[i].SeqNo < curSegs[j].SeqNo
})
}
lastSeq = curSegs[len(curSegs)-1].SeqNo
}
jpl.Segments[trackName] = curSegs
}
func (jpl *JsonPlaylist) InsertHLSSegment(profile *ffmpeg.VideoProfile, seqNo uint64, uri string,
duration float64) {
durationMs := uint64(duration * 1000)
if profile.Name == "source" {
jpl.DurationMs += durationMs
}
if _, has := jpl.Segments[profile.Name]; !has {
vParams := ffmpeg.VideoProfileToVariantParams(*profile)
jpl.Tracks = append(jpl.Tracks, JsonMediaTrack{
Name: profile.Name,
Bandwidth: vParams.Bandwidth,
Resolution: vParams.Resolution,
})
}
jpl.Segments[profile.Name] = append(jpl.Segments[profile.Name], jsonSeg{
URI: uri,
DurationMs: durationMs,
SeqNo: seqNo,
})
}
// NewBasicPlaylistManager create new BasicPlaylistManager struct
func NewBasicPlaylistManager(manifestID ManifestID,
storageSession, recordSession drivers.OSSession) *BasicPlaylistManager {
bplm := &BasicPlaylistManager{
storageSession: storageSession,
recordSession: recordSession,
manifestID: manifestID,
masterPList: m3u8.NewMasterPlaylist(),
mediaLists: make(map[string]*m3u8.MediaPlaylist),
mapSync: &sync.RWMutex{},
}
if recordSession != nil {
bplm.jsonList = NewJSONPlaylist()
bplm.jsonListSync = &sync.Mutex{}
bplm.makeNewOverwriteQueue()
}
return bplm
}
func (mgr *BasicPlaylistManager) makeNewOverwriteQueue() {
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name,
fmt.Sprintf("json playlist for manifestId=%s", mgr.manifestID),
jsonPlaylistMaxRetries, JsonPlaylistInitialTimeout, JsonPlaylistMaxTimeout)
}
func (mgr *BasicPlaylistManager) ManifestID() ManifestID {
return mgr.manifestID
}
func (mgr *BasicPlaylistManager) Cleanup() {
if mgr.storageSession != nil {
mgr.storageSession.EndSession()
}
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
}
func (mgr *BasicPlaylistManager) GetOSSession() drivers.OSSession {
return mgr.storageSession
}
func (mgr *BasicPlaylistManager) GetRecordOSSession() drivers.OSSession {
return mgr.recordSession
}
func (mgr *BasicPlaylistManager) FlushRecord() {
if mgr.recordSession != nil {
mgr.jsonListSync.Lock()
defer mgr.jsonListSync.Unlock()
b, err := json.Marshal(mgr.jsonList)
if err != nil {
glog.Error("Error encoding playlist: ", err)
return
}
go mgr.jsonListWriteQueue.Save(b)
if mgr.jsonList.DurationMs > jsonPlaylistRotationInterval {
mgr.jsonList = NewJSONPlaylist()
mgr.makeNewOverwriteQueue()
}
}
}
func (mgr *BasicPlaylistManager) getPL(rendition string) *m3u8.MediaPlaylist {
mgr.mapSync.RLock()
mpl := mgr.mediaLists[rendition]
mgr.mapSync.RUnlock()
return mpl
}
func (mgr *BasicPlaylistManager) getOrCreatePL(profile *ffmpeg.VideoProfile) (*m3u8.MediaPlaylist, error) {
mgr.mapSync.Lock()
defer mgr.mapSync.Unlock()
if pl, ok := mgr.mediaLists[profile.Name]; ok {
return pl, nil
}
mpl, err := m3u8.NewMediaPlaylist(LIVE_LIST_LENGTH, LIVE_LIST_LENGTH)
if err != nil {
glog.Error(err)
return nil, err
}
mgr.mediaLists[profile.Name] = mpl
vParams := ffmpeg.VideoProfileToVariantParams(*profile)
url := fmt.Sprintf("%v/%v.m3u8", mgr.manifestID, profile.Name)
mgr.masterPList.Append(url, mpl, vParams)
return mpl, nil
}
func (mgr *BasicPlaylistManager) InsertHLSSegmentJSON(profile *ffmpeg.VideoProfile, seqNo uint64, uri string,
duration float64) {
if mgr.jsonList != nil {
mgr.jsonListSync.Lock()
mgr.jsonList.InsertHLSSegment(profile, seqNo, uri, duration)
mgr.jsonListSync.Unlock()
}
}
func (mgr *BasicPlaylistManager) InsertHLSSegment(profile *ffmpeg.VideoProfile, seqNo uint64, uri string,
duration float64) error {
mpl, err := mgr.getOrCreatePL(profile)
if err != nil {
return err
}
mseg := newMediaSegment(uri, duration)
if mpl.Count() >= mpl.WinSize() {
mpl.Remove()
}
if mpl.Count() == 0 {
mpl.SeqNo = mseg.SeqId
}
return mpl.InsertSegment(seqNo, mseg)
}
// GetHLSMasterPlaylist ..
func (mgr *BasicPlaylistManager) GetHLSMasterPlaylist() *m3u8.MasterPlaylist {
return mgr.masterPList
}
// GetHLSMediaPlaylist ...
func (mgr *BasicPlaylistManager) GetHLSMediaPlaylist(rendition string) *m3u8.MediaPlaylist {
return mgr.getPL(rendition)
}
func newMediaSegment(uri string, duration float64) *m3u8.MediaSegment {
return &m3u8.MediaSegment{
URI: uri,
Duration: duration,
}
}
// indexOf finds index of one of substrings
// returns index and length of substring
func indexOf(str string, substrs []string) (int, int) {
for _, sub := range substrs {
mindex := strings.Index(str, sub)
if mindex != -1 {
return mindex, len(sub)
}
}
return -1, 0
}