forked from Monibuca/engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriber.go
120 lines (113 loc) · 2.77 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package engine
import (
"context"
"time"
"github.com/Monibuca/engine/v2/avformat"
"github.com/pkg/errors"
)
// SubscriberInfo 订阅者可序列化信息,用于控制台输出
type SubscriberInfo struct {
ID string
TotalDrop int //总丢帧
TotalPacket int
Type string
BufferLength int
Delay uint32
SubscribeTime time.Time
}
// Subscriber 订阅者实体定义
type Subscriber struct {
context.Context
*Stream
SubscriberInfo
MetaData func(stream *Stream) error
OnData func(*avformat.SendPacket) error
Cancel context.CancelFunc
Sign string
OffsetTime uint32
startTime uint32
avformat.SendPacket
}
// IsClosed 检查订阅者是否已经关闭
func (s *Subscriber) IsClosed() bool {
return s.Context != nil && s.Err() != nil
}
// Close 关闭订阅者
func (s *Subscriber) Close() {
if s.Cancel != nil {
s.Cancel()
}
}
//Subscribe 开始订阅
func (s *Subscriber) Subscribe(streamPath string) (err error) {
if !config.EnableWaitStream && FindStream(streamPath) == nil {
return errors.Errorf("Stream not found:%s", streamPath)
}
GetStream(streamPath).Subscribe(s)
if s.Context == nil {
return errors.Errorf("stream not exist:%s", streamPath)
}
defer s.UnSubscribe(s)
select {
//等待发布者首屏数据,如果发布者尚为发布,则会等待,否则就会往下执行
case <-s.WaitPub:
case <-s.Context.Done():
return s.Err()
}
if s.MetaData != nil {
if err = s.MetaData(s.Stream); err != nil {
return err
}
}
if s.HasVideo {
s.sendAv(s.VideoTag, 0)
packet := s.FirstScreen.Clone()
s.startTime = packet.Timestamp
s.send(packet)
packet.GoNext()
for atsent, dropping, droped := false, false, 0; s.Err() == nil; packet.GoNext() {
s.TotalPacket++
if !dropping {
if packet.Type == avformat.FLV_TAG_TYPE_AUDIO && !atsent {
s.sendAv(s.AudioTag, 0)
atsent = true
}
s.send(packet)
if s.checkDrop(packet) {
dropping = true
droped = 0
}
} else if packet.IsKeyFrame {
//遇到关键帧则退出丢帧
dropping = false
//fmt.Println("drop package ", droped)
s.TotalDrop += droped
s.send(packet)
} else {
droped++
}
}
} else {
s.sendAv(s.AudioTag, 0)
for packet := s.AVRing; s.Err() == nil; packet.GoNext() {
s.TotalPacket++
s.send(packet)
}
}
return s.Err()
}
func (s *Subscriber) sendAv(packet *avformat.AVPacket, t uint32) {
s.AVPacket = packet
s.Timestamp = t
s.OnData(&s.SendPacket)
}
func (s *Subscriber) send(packet *Ring) {
packet.Wait()
s.sendAv(&packet.AVPacket, packet.Timestamp-s.startTime)
}
func (s *Subscriber) checkDrop(packet *Ring) bool {
pIndex := s.AVRing.Index
s.BufferLength = pIndex - packet.Index
s.Delay = s.AVRing.Timestamp - packet.Timestamp
return s.BufferLength > s.AVRing.Size/2
}