-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathNackGenerator.cpp
354 lines (277 loc) · 8.22 KB
/
NackGenerator.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#define MS_CLASS "RTC::NackGenerator"
// #define MS_LOG_DEV_LEVEL 3
#include "RTC/NackGenerator.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include <iterator> // std::ostream_iterator
#include <sstream> // std::ostringstream
#include <utility> // std::make_pair()
namespace RTC
{
/* Static. */
static constexpr size_t MaxPacketAge{ 10000u };
static constexpr size_t MaxNackPackets{ 1000u };
static constexpr uint32_t DefaultRtt{ 100u };
static constexpr uint8_t MaxNackRetries{ 10u };
static constexpr uint64_t TimerInterval{ 40u };
/* Instance methods. */
NackGenerator::NackGenerator(Listener* listener, unsigned int sendNackDelayMs)
: listener(listener), sendNackDelayMs(sendNackDelayMs), rtt(DefaultRtt)
{
MS_TRACE();
// Set the timer.
this->timer = new Timer(this);
}
NackGenerator::~NackGenerator()
{
MS_TRACE();
// Close the timer.
delete this->timer;
}
// Returns true if this is a found nacked packet. False otherwise.
bool NackGenerator::ReceivePacket(RTC::RtpPacket* packet, bool isRecovered)
{
MS_TRACE();
uint16_t seq = packet->GetSequenceNumber();
bool isKeyFrame = packet->IsKeyFrame();
if (!this->started)
{
this->started = true;
this->lastSeq = seq;
if (isKeyFrame)
this->keyFrameList.insert(seq);
return false;
}
// Obviously never nacked, so ignore.
if (seq == this->lastSeq)
return false;
// May be an out of order packet, or already handled retransmitted packet,
// or a retransmitted packet.
if (SeqManager<uint16_t>::IsSeqLowerThan(seq, this->lastSeq))
{
auto it = this->nackList.find(seq);
// It was a nacked packet.
if (it != this->nackList.end())
{
MS_DEBUG_DEV(
"NACKed packet received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", recovered:%s]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
isRecovered ? "true" : "false");
auto retries = it->second.retries;
this->nackList.erase(it);
if (retries != 0)
return true;
else
return false;
}
// Out of order packet or already handled NACKed packet.
if (!isRecovered)
{
MS_WARN_DEV(
"ignoring older packet not present in the NACK list [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
}
return false;
}
// If we are here it means that we may have lost some packets so seq is
// newer than the latest seq seen.
if (isKeyFrame)
this->keyFrameList.insert(seq);
// Remove old keyframes.
{
auto it = this->keyFrameList.lower_bound(seq - MaxPacketAge);
if (it != this->keyFrameList.begin())
this->keyFrameList.erase(this->keyFrameList.begin(), it);
}
if (isRecovered)
{
this->recoveredList.insert(seq);
// Remove old ones so we don't accumulate recovered packets.
auto it = this->recoveredList.lower_bound(seq - MaxPacketAge);
if (it != this->recoveredList.begin())
this->recoveredList.erase(this->recoveredList.begin(), it);
// Do not let a packet pass if it's newer than last seen seq and came via
// RTX.
return false;
}
AddPacketsToNackList(this->lastSeq + 1, seq);
this->lastSeq = seq;
// Check if there are any nacks that are waiting for this seq number.
std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::SEQ);
if (!nackBatch.empty())
this->listener->OnNackGeneratorNackRequired(nackBatch);
// This is important. Otherwise the running timer (filter:TIME) would be
// interrupted and NACKs would never been sent more than once for each seq.
if (!this->timer->IsActive())
MayRunTimer();
return false;
}
void NackGenerator::AddPacketsToNackList(uint16_t seqStart, uint16_t seqEnd)
{
MS_TRACE();
// Remove old packets.
auto it = this->nackList.lower_bound(seqEnd - MaxPacketAge);
this->nackList.erase(this->nackList.begin(), it);
// If the nack list is too large, remove packets from the nack list until
// the latest first packet of a keyframe. If the list is still too large,
// clear it and request a keyframe.
uint16_t numNewNacks = seqEnd - seqStart;
if (static_cast<uint16_t>(this->nackList.size()) + numNewNacks > MaxNackPackets)
{
// clang-format off
while (
RemoveNackItemsUntilKeyFrame() &&
static_cast<uint16_t>(this->nackList.size()) + numNewNacks > MaxNackPackets
)
// clang-format on
{
}
if (static_cast<uint16_t>(this->nackList.size()) + numNewNacks > MaxNackPackets)
{
MS_WARN_TAG(
rtx, "NACK list full, clearing it and requesting a key frame [seqEnd:%" PRIu16 "]", seqEnd);
this->nackList.clear();
this->listener->OnNackGeneratorKeyFrameRequired();
return;
}
}
for (uint16_t seq = seqStart; seq != seqEnd; ++seq)
{
MS_ASSERT(this->nackList.find(seq) == this->nackList.end(), "packet already in the NACK list");
// Do not send NACK for packets that are already recovered by RTX.
if (this->recoveredList.find(seq) != this->recoveredList.end())
continue;
this->nackList.emplace(std::make_pair(
seq,
NackInfo{
DepLibUV::GetTimeMs(),
seq,
seq,
}));
}
}
bool NackGenerator::RemoveNackItemsUntilKeyFrame()
{
MS_TRACE();
while (!this->keyFrameList.empty())
{
auto it = this->nackList.lower_bound(*this->keyFrameList.begin());
if (it != this->nackList.begin())
{
// We have found a keyframe that actually is newer than at least one
// packet in the nack list.
this->nackList.erase(this->nackList.begin(), it);
return true;
}
// If this keyframe is so old it does not remove any packets from the list,
// remove it from the list of keyframes and try the next keyframe.
this->keyFrameList.erase(this->keyFrameList.begin());
}
return false;
}
std::vector<uint16_t> NackGenerator::GetNackBatch(NackFilter filter)
{
MS_TRACE();
uint64_t nowMs = DepLibUV::GetTimeMs();
std::vector<uint16_t> nackBatch;
auto it = this->nackList.begin();
while (it != this->nackList.end())
{
NackInfo& nackInfo = it->second;
uint16_t seq = nackInfo.seq;
if (this->sendNackDelayMs > 0 && nowMs - nackInfo.createdAtMs < this->sendNackDelayMs)
{
++it;
continue;
}
// clang-format off
if (
filter == NackFilter::SEQ &&
nackInfo.sentAtMs == 0 &&
(
nackInfo.sendAtSeq == this->lastSeq ||
SeqManager<uint16_t>::IsSeqHigherThan(this->lastSeq, nackInfo.sendAtSeq)
)
)
// clang-format on
{
nackBatch.emplace_back(seq);
nackInfo.retries++;
nackInfo.sentAtMs = nowMs;
if (nackInfo.retries >= MaxNackRetries)
{
MS_WARN_TAG(
rtx,
"sequence number removed from the NACK list due to max retries [filter:seq, seq:%" PRIu16
"]",
seq);
it = this->nackList.erase(it);
}
else
{
++it;
}
continue;
}
if (filter == NackFilter::TIME && (nackInfo.sentAtMs == 0 || nowMs - nackInfo.sentAtMs >= this->rtt))
{
nackBatch.emplace_back(seq);
nackInfo.retries++;
nackInfo.sentAtMs = nowMs;
if (nackInfo.retries >= MaxNackRetries)
{
MS_WARN_TAG(
rtx,
"sequence number removed from the NACK list due to max retries [filter:time, seq:%" PRIu16
"]",
seq);
it = this->nackList.erase(it);
}
else
{
++it;
}
continue;
}
++it;
}
#if MS_LOG_DEV_LEVEL == 3
if (!nackBatch.empty())
{
std::ostringstream seqsStream;
std::copy(
nackBatch.begin(), nackBatch.end() - 1, std::ostream_iterator<uint32_t>(seqsStream, ","));
seqsStream << nackBatch.back();
if (filter == NackFilter::SEQ)
MS_DEBUG_DEV("[filter:SEQ, asking seqs:%s]", seqsStream.str().c_str());
else
MS_DEBUG_DEV("[filter:TIME, asking seqs:%s]", seqsStream.str().c_str());
}
#endif
return nackBatch;
}
void NackGenerator::Reset()
{
MS_TRACE();
this->nackList.clear();
this->keyFrameList.clear();
this->recoveredList.clear();
this->started = false;
this->lastSeq = 0u;
}
inline void NackGenerator::MayRunTimer() const
{
if (!this->nackList.empty())
this->timer->Start(TimerInterval);
}
inline void NackGenerator::OnTimer(Timer* /*timer*/)
{
MS_TRACE();
std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::TIME);
if (!nackBatch.empty())
this->listener->OnNackGeneratorNackRequired(nackBatch);
MayRunTimer();
}
} // namespace RTC