forked from ionorg/ion-sfu
/
sequencer.go
128 lines (118 loc) · 2.88 KB
/
sequencer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package sfu
import (
"sync"
"time"
)
const (
ignoreRetransmission = 100 // Ignore packet retransmission after ignoreRetransmission milliseconds
)
type packetMeta struct {
// Original sequence number from stream.
// The original sequence number is used to find the original
// packet from publisher
sourceSeqNo uint16
// Modified sequence number after offset.
// This sequence number is used for the associated
// down track, is modified according the offsets, and
// must not be shared
targetSeqNo uint16
// Modified timestamp for current associated
// down track.
timestamp uint32
// The last time this packet was nack requested.
// Sometimes clients request the same packet more than once, so keep
// track of the requested packets helps to avoid writing multiple times
// the same packet.
// The resolution is 1 ms counting after the sequencer start time.
lastNack uint32
// Spatial layer of packet
layer uint8
// Information that differs depending the codec
misc uint32
}
func (p packetMeta) setVP8PayloadMeta(tlz0Idx uint8, picID uint16) {
p.misc = uint32(tlz0Idx)<<16 | uint32(picID)
}
func (p packetMeta) getVP8PayloadMeta() (uint8, uint16) {
return uint8(p.misc >> 16), uint16(p.misc)
}
// Sequencer stores the packet sequence received by the down track
type sequencer struct {
sync.Mutex
init bool
max int
seq []packetMeta
step int
headSN uint16
startTime int64
}
func newSequencer(maxTrack int) *sequencer {
return &sequencer{
startTime: time.Now().UnixNano() / 1e6,
max: maxTrack,
seq: make([]packetMeta, maxTrack),
}
}
func (n *sequencer) push(sn, offSn uint16, timeStamp uint32, layer uint8, head bool) *packetMeta {
n.Lock()
defer n.Unlock()
if !n.init {
n.headSN = offSn
n.init = true
}
step := 0
if head {
inc := offSn - n.headSN
for i := uint16(1); i < inc; i++ {
n.step++
if n.step >= n.max {
n.step = 0
}
}
step = n.step
n.headSN = offSn
} else {
step = n.step - int(n.headSN-offSn)
if step < 0 {
if step*-1 >= n.max {
Logger.V(0).Info("Old packet received, can not be sequenced", "head", sn, "received", offSn)
return nil
}
step = n.max + step
}
}
n.seq[n.step] = packetMeta{
sourceSeqNo: sn,
targetSeqNo: offSn,
timestamp: timeStamp,
layer: layer,
}
n.step++
if n.step >= n.max {
n.step = 0
}
return &n.seq[n.step]
}
func (n *sequencer) getSeqNoPairs(seqNo []uint16) []packetMeta {
n.Lock()
meta := make([]packetMeta, 0, 17)
refTime := uint32(time.Now().UnixNano()/1e6 - n.startTime)
for _, sn := range seqNo {
step := n.step - int(n.headSN-sn) - 1
if step < 0 {
if step*-1 >= n.max {
continue
}
step = n.max + step
}
seq := &n.seq[step]
if seq.targetSeqNo == sn {
if seq.lastNack == 0 || refTime-seq.lastNack > ignoreRetransmission {
seq.lastNack = refTime
meta = append(meta, *seq)
}
}
}
n.Unlock()
return meta
}