forked from ionorg/ion-sfu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffer.go
446 lines (380 loc) · 10.6 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
package buffer
import (
"encoding/binary"
"io"
"strings"
"sync"
"sync/atomic"
"time"
log "github.com/pion/ion-log"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
)
const (
maxSN = 1 << 16
// default buffer time by ms
defaultBufferTime = 1000
reportDelta = 1e9
)
type pendingPackets struct {
arrivalTime int64
packet []byte
}
// Buffer contains all packets
type Buffer struct {
sync.Mutex
bucket *Bucket
codecType webrtc.RTPCodecType
videoPool *sync.Pool
audioPool *sync.Pool
packetChan chan rtp.Packet
pPackets []pendingPackets
closeOnce sync.Once
mediaSSRC uint32
clockRate uint32
maxBitrate uint64
lastReport int64
twccExt uint8
bound bool
closed bool
// supported feedbacks
remb bool
nack bool
tcc bool
lastSRNTPTime uint64
lastSRRTPTime uint32
lastSRRecv int64 // Represents wall clock of the most recent sender report arrival
baseSN uint16
cycles uint32
lastRtcpPacketTime int64 // Time the last RTCP packet was received.
lastRtcpSrTime int64 // Time the last RTCP SR was received. Required for DLSR computation.
lastTransit uint32
maxSeqNo uint16 // The highest sequence number received in an RTP data packet
stats Stats
latestTimestamp uint32 // latest received RTP timestamp on packet
latestTimestampTime int64 // Time of the latest timestamp (in nanos since unix epoch)
// callbacks
onClose func()
feedbackCB func([]rtcp.Packet)
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
}
type Stats struct {
LastExpected uint32
LastReceived uint32
LostRate float32
PacketCount uint32 // Number of packets received from this source.
Jitter float64 // An estimate of the statistical variance of the RTP data packet inter-arrival time.
TotalByte uint64
}
// BufferOptions provides configuration options for the buffer
type Options struct {
BufferTime int
MaxBitRate uint64
}
// NewBuffer constructs a new Buffer
func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
b := &Buffer{
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
packetChan: make(chan rtp.Packet, 100),
}
return b
}
func (b *Buffer) PacketChan() chan rtp.Packet {
return b.packetChan
}
func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) {
b.Lock()
defer b.Unlock()
codec := params.Codecs[0]
b.clockRate = codec.ClockRate
b.maxBitrate = o.MaxBitRate
switch {
case strings.HasPrefix(codec.MimeType, "audio/"):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = NewBucket(b.audioPool.Get().([]byte), false)
case strings.HasPrefix(codec.MimeType, "video/"):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = NewBucket(b.videoPool.Get().([]byte), true)
default:
b.codecType = webrtc.RTPCodecType(0)
}
for _, ext := range params.HeaderExtensions {
if ext.URI == sdp.TransportCCURI {
b.twccExt = uint8(ext.ID)
break
}
}
if o.BufferTime <= 0 {
o.BufferTime = defaultBufferTime
}
for _, fb := range codec.RTCPFeedback {
switch fb.Type {
case webrtc.TypeRTCPFBGoogREMB:
log.Debugf("Setting feedback %s", webrtc.TypeRTCPFBGoogREMB)
b.remb = true
case webrtc.TypeRTCPFBTransportCC:
log.Debugf("Setting feedback %s", webrtc.TypeRTCPFBTransportCC)
b.tcc = true
case webrtc.TypeRTCPFBNACK:
log.Debugf("Setting feedback %s", webrtc.TypeRTCPFBNACK)
b.nack = true
}
}
b.bucket.onLost = func(nacks []rtcp.NackPair, askKeyframe bool) {
pkts := []rtcp.Packet{&rtcp.TransportLayerNack{
MediaSSRC: b.mediaSSRC,
Nacks: nacks,
}}
if askKeyframe {
pkts = append(pkts, &rtcp.PictureLossIndication{
MediaSSRC: b.mediaSSRC,
})
}
b.feedbackCB(pkts)
}
for _, pp := range b.pPackets {
b.calc(pp.packet, pp.arrivalTime)
}
b.pPackets = nil
b.bound = true
log.Debugf("NewBuffer BufferOptions=%v", o)
}
// Write adds a RTP Packet, out of order, new packet may be arrived later
func (b *Buffer) Write(pkt []byte) (n int, err error) {
b.Lock()
defer b.Unlock()
if b.closed {
err = io.EOF
return
}
if !b.bound {
packet := make([]byte, len(pkt))
copy(packet, pkt)
b.pPackets = append(b.pPackets, pendingPackets{
packet: packet,
arrivalTime: time.Now().UnixNano(),
})
return
}
b.calc(pkt, time.Now().UnixNano())
return
}
func (b *Buffer) Read(buff []byte) (n int, err error) {
for {
if b.closed {
err = io.EOF
return
}
b.Lock()
if b.pPackets != nil && len(b.pPackets) > 0 {
if len(buff) < len(b.pPackets[0].packet) {
err = errBufferTooSmall
b.Unlock()
return
}
n = len(b.pPackets[0].packet)
copy(buff, b.pPackets[0].packet)
b.Unlock()
return
}
b.Unlock()
}
}
func (b *Buffer) Close() error {
b.Lock()
defer b.Unlock()
b.closeOnce.Do(func() {
b.closed = true
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeVideo {
b.videoPool.Put(b.bucket.buf)
}
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeAudio {
b.audioPool.Put(b.bucket.buf)
}
b.onClose()
close(b.packetChan)
})
return nil
}
func (b *Buffer) OnClose(fn func()) {
b.onClose = fn
}
func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
sn := binary.BigEndian.Uint16(pkt[2:4])
if b.stats.PacketCount == 0 {
b.baseSN = sn
b.maxSeqNo = sn
b.bucket.headSN = sn - 1
b.lastReport = arrivalTime
} else if (sn-b.maxSeqNo)&0x8000 == 0 {
if sn < b.maxSeqNo {
b.cycles += maxSN
}
b.maxSeqNo = sn
}
b.stats.TotalByte += uint64(len(pkt))
b.stats.PacketCount++
var p rtp.Packet
if err := p.Unmarshal(b.bucket.addPacket(pkt, sn, sn == b.maxSeqNo)); err != nil {
return
}
b.packetChan <- p
// if first time update or the timestamp is later (factoring timestamp wrap around)
if (b.latestTimestampTime == 0) || IsLaterTimestamp(p.Timestamp, b.latestTimestamp) {
b.latestTimestamp = p.Timestamp
b.latestTimestampTime = arrivalTime
}
arrival := uint32(arrivalTime / 1e6 * int64(b.clockRate/1e3))
transit := arrival - p.Timestamp
if b.lastTransit != 0 {
d := int32(transit - b.lastTransit)
if d < 0 {
d = -d
}
b.stats.Jitter += (float64(d) - b.stats.Jitter) / 16
}
b.lastTransit = transit
if b.tcc {
if ext := p.GetExtension(b.twccExt); ext != nil && len(ext) > 1 {
b.feedbackTWCC(binary.BigEndian.Uint16(ext[0:2]), arrivalTime, (pkt[1]>>7&0x1) > 0)
}
}
if arrivalTime-b.lastReport >= reportDelta {
b.feedbackCB(b.getRTCP())
b.lastReport = arrivalTime
}
}
func (b *Buffer) buildREMBPacket() *rtcp.ReceiverEstimatedMaximumBitrate {
br := b.stats.TotalByte * 8
if b.stats.LostRate < 0.02 {
br = uint64(float64(br)*1.09) + 2000
}
if b.stats.LostRate > .1 {
br = uint64(float64(br) * float64(1-0.5*b.stats.LostRate))
}
if br > b.maxBitrate {
br = b.maxBitrate
}
if br < 100000 {
br = 100000
}
b.stats.TotalByte = 0
return &rtcp.ReceiverEstimatedMaximumBitrate{
Bitrate: br,
SSRCs: []uint32{b.mediaSSRC},
}
}
func (b *Buffer) buildReceptionReport() rtcp.ReceptionReport {
extMaxSeq := b.cycles | uint32(b.maxSeqNo)
expected := extMaxSeq - uint32(b.baseSN) + 1
lost := expected - b.stats.PacketCount
if b.stats.PacketCount == 0 {
lost = 0
}
expectedInterval := expected - b.stats.LastExpected
b.stats.LastExpected = expected
receivedInterval := b.stats.PacketCount - b.stats.LastReceived
b.stats.LastReceived = b.stats.PacketCount
lostInterval := expectedInterval - receivedInterval
b.stats.LostRate = float32(lostInterval) / float32(expectedInterval)
var fracLost uint8
if expectedInterval != 0 && lostInterval > 0 {
fracLost = uint8((lostInterval << 8) / expectedInterval)
}
var dlsr uint32
if b.lastSRRecv != 0 {
delayMS := uint32((time.Now().UnixNano() - b.lastSRRecv) / 1e6)
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
rr := rtcp.ReceptionReport{
SSRC: b.mediaSSRC,
FractionLost: fracLost,
TotalLost: lost,
LastSequenceNumber: extMaxSeq,
Jitter: uint32(b.stats.Jitter),
LastSenderReport: uint32(b.lastSRNTPTime >> 16),
Delay: dlsr,
}
return rr
}
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
b.Lock()
b.lastSRRTPTime = rtpTime
b.lastSRNTPTime = ntpTime
b.lastSRRecv = time.Now().UnixNano()
b.Unlock()
}
func (b *Buffer) getRTCP() []rtcp.Packet {
var pkts []rtcp.Packet
pkts = append(pkts, &rtcp.ReceiverReport{
Reports: []rtcp.ReceptionReport{b.buildReceptionReport()},
})
if b.remb && !b.tcc {
pkts = append(pkts, b.buildREMBPacket())
}
return pkts
}
func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) {
b.Lock()
defer b.Unlock()
if b.closed {
return 0, io.EOF
}
return b.bucket.getPacket(buff, sn)
}
func (b *Buffer) OnTransportWideCC(fn func(sn uint16, timeNS int64, marker bool)) {
b.feedbackTWCC = fn
}
func (b *Buffer) OnFeedback(fn func(fb []rtcp.Packet)) {
b.feedbackCB = fn
}
// GetMediaSSRC returns the associated SSRC of the RTP stream
func (b *Buffer) GetMediaSSRC() uint32 {
return b.mediaSSRC
}
// GetClockRate returns the RTP clock rate
func (b *Buffer) GetClockRate() uint32 {
return b.clockRate
}
// GetSenderReportData returns the rtp, ntp and nanos of the last sender report
func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime uint64, lastReceivedTimeInNanosSinceEpoch int64) {
rtpTime = atomic.LoadUint32(&b.lastSRRTPTime)
ntpTime = atomic.LoadUint64(&b.lastSRNTPTime)
lastReceivedTimeInNanosSinceEpoch = atomic.LoadInt64(&b.lastSRRecv)
return rtpTime, ntpTime, lastReceivedTimeInNanosSinceEpoch
}
// GetStats returns the raw statistics about a particular buffer state
func (b *Buffer) GetStats() (stats Stats) {
b.Lock()
stats = b.stats
b.Unlock()
return
}
// GetLatestTimestamp returns the latest RTP timestamp factoring in potential RTP timestamp wrap-around
func (b *Buffer) GetLatestTimestamp() (latestTimestamp uint32, latestTimestampTimeInNanosSinceEpoch int64) {
latestTimestamp = atomic.LoadUint32(&b.latestTimestamp)
latestTimestampTimeInNanosSinceEpoch = atomic.LoadInt64(&b.latestTimestampTime)
return latestTimestamp, latestTimestampTimeInNanosSinceEpoch
}
// IsTimestampWrapAround returns true if wrap around happens from timestamp1 to timestamp2
func IsTimestampWrapAround(timestamp1 uint32, timestamp2 uint32) bool {
return (timestamp1&0xC000000 == 0) && (timestamp2&0xC000000 == 0xC000000)
}
// IsLaterTimestamp returns true if timestamp1 is later in time than timestamp2 factoring in timestamp wrap-around
func IsLaterTimestamp(timestamp1 uint32, timestamp2 uint32) bool {
if timestamp1 > timestamp2 {
if IsTimestampWrapAround(timestamp2, timestamp1) {
return false
}
return true
}
if IsTimestampWrapAround(timestamp1, timestamp2) {
return true
}
return false
}