/
stream.go
84 lines (67 loc) · 1.72 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package stream
import (
"bytes"
"context"
"errors"
"time"
"github.com/livepeer/m3u8"
)
var ErrBufferFull = errors.New("Stream Buffer Full")
var ErrBufferEmpty = errors.New("Stream Buffer Empty")
var ErrBufferItemType = errors.New("Buffer Item Type Not Recognized")
var ErrDroppedRTMPStream = errors.New("RTMP Stream Stopped Without EOF")
var ErrHttpReqFailed = errors.New("Http Request Failed")
type VideoFormat uint32
var (
HLS = MakeVideoFormatType(avFormatTypeMagic + 1)
RTMP = MakeVideoFormatType(avFormatTypeMagic + 2)
DASH = MakeVideoFormatType(avFormatTypeMagic + 3)
)
func MakeVideoFormatType(base uint32) (c VideoFormat) {
c = VideoFormat(base) << videoFormatOtherBits
return
}
const avFormatTypeMagic = 577777
const videoFormatOtherBits = 1
type RTMPEOF struct{}
type streamBuffer struct {
q *Queue
}
func newStreamBuffer() *streamBuffer {
return &streamBuffer{q: NewQueue(1000)}
}
func (b *streamBuffer) push(in interface{}) error {
b.q.Put(in)
return nil
}
func (b *streamBuffer) poll(ctx context.Context, wait time.Duration) (interface{}, error) {
results, err := b.q.Poll(ctx, 1, wait)
if err != nil {
return nil, err
}
result := results[0]
return result, nil
}
func (b *streamBuffer) pop() (interface{}, error) {
results, err := b.q.Get(1)
if err != nil {
return nil, err
}
result := results[0]
return result, nil
}
func (b *streamBuffer) len() int64 {
return b.q.Len()
}
//We couldn't just use the m3u8 definition
type HLSSegment struct {
SeqNo uint64
Name string
Data []byte
Duration float64
IsZeroFrame bool
}
//Compare playlists by segments
func samePlaylist(p1, p2 m3u8.MediaPlaylist) bool {
return bytes.Compare(p1.Encode().Bytes(), p2.Encode().Bytes()) == 0
}