Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of reconfig reset requests queued #329

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion association.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
ErrChunkTypeUnhandled = errors.New("unhandled chunk type")
ErrHandshakeInitAck = errors.New("handshake failed (INIT ACK)")
ErrHandshakeCookieEcho = errors.New("handshake failed (COOKIE ECHO)")
ErrTooManyReconfigRequests = errors.New("too many outstanding reconfig requests")
)

const (
Expand Down Expand Up @@ -110,6 +111,8 @@
// irrespective of the receive buffer size
// see Association.getMaxTSNOffset
maxTSNOffset = 40000
// maxReconfigRequests is the maximum number of reconfig requests we will keep outstanding
maxReconfigRequests = 1000
)

func getAssociationStateString(a uint32) string {
Expand Down Expand Up @@ -2140,15 +2143,39 @@
switch p := raw.(type) {
case *paramOutgoingResetRequest:
a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name)
if a.peerLastTSN < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests {
// We have too many reconfig requests outstanding. Drop the request and let
// the peer retransmit. A well behaved peer should only have 1 outstanding
// reconfig request.
//
// RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.1.1
// At any given time, there MUST NOT be more than one request in flight.
// So, if the Re-configuration Timer is running and the RE-CONFIG chunk
// contains at least one request parameter, the chunk MUST be buffered.
// chrome: https://chromium.googlesource.com/external/webrtc/+/refs/heads/main/net/dcsctp/socket/stream_reset_handler.cc#271
return nil, fmt.Errorf("%w: %d", ErrTooManyReconfigRequests, len(a.reconfigRequests))
}
a.reconfigRequests[p.reconfigRequestSequenceNumber] = p
resp := a.resetStreamsIfAny(p)
if resp != nil {
return resp, nil
}
return nil, nil //nolint:nilnil

case *paramReconfigResponse:
a.log.Tracef("[%s] handleReconfigParam (ReconfigResponse)", a.name)
if p.result == reconfigResultInProgress {
// RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.7
//
// If the Result field indicates "In progress", the timer for the
// Re-configuration Request Sequence Number is started again. If
// the timer runs out, the RE-CONFIG chunk MUST be retransmitted
// but the corresponding error counters MUST NOT be incremented.
if _, ok := a.reconfigs[p.reconfigResponseSequenceNumber]; ok {
a.tReconfig.stop()
a.tReconfig.start(a.rtoMgr.getRTO())
}

Check warning on line 2176 in association.go

View check run for this annotation

Codecov / codecov/patch

association.go#L2174-L2176

Added lines #L2174 - L2176 were not covered by tests
return nil, nil //nolint:nilnil
}
delete(a.reconfigs, p.reconfigResponseSequenceNumber)
if len(a.reconfigs) == 0 {
a.tReconfig.stop()
Expand Down
76 changes: 76 additions & 0 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3393,3 +3393,79 @@ func TestDataChunkBundlingIntoPacket(t *testing.T) {
}
}
}

func TestAssociation_ReconfigRequestsLimited(t *testing.T) {
checkGoroutineLeaks(t)

lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

a1chan, a2chan := make(chan *Association), make(chan *Association)

udp1, udp2 := createUDPConnPair()

go func() {
a1, err := Client(Config{
NetConn: udp1,
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
assert.NoError(t, err)
a1chan <- a1
}()

go func() {
a2, err := Server(Config{
NetConn: udp2,
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
assert.NoError(t, err)
a2chan <- a2
}()

a1, a2 := <-a1chan, <-a2chan

writeStream, err := a1.OpenStream(1, PayloadTypeWebRTCString)
require.NoError(t, err)

readStream, err := a2.OpenStream(1, PayloadTypeWebRTCString)
require.NoError(t, err)

// exchange some data
testData := []byte("test")
_, err = writeStream.Write(testData)
require.NoError(t, err)

buf := make([]byte, len(testData))
_, err = readStream.Read(buf)
assert.NoError(t, err)
assert.Equal(t, testData, buf)

a1.lock.RLock()
tsn := a1.myNextTSN
a1.lock.RUnlock()
for i := 0; i < maxReconfigRequests+100; i++ {
c := &chunkReconfig{
paramA: &paramOutgoingResetRequest{
reconfigRequestSequenceNumber: 10 + uint32(i),
senderLastTSN: tsn + 10, // has to be enqueued
streamIdentifiers: []uint16{uint16(i)},
},
}
p := a1.createPacket([]chunk{c})
buf, err := p.marshal(true)
require.NoError(t, err)
_, err = a1.netConn.Write(buf)
require.NoError(t, err)
if i%100 == 0 {
time.Sleep(100 * time.Millisecond)
}
}
// Let a2 process the requests
time.Sleep(2 * time.Second)
a2.lock.RLock()
require.LessOrEqual(t, len(a2.reconfigRequests), maxReconfigRequests)
a2.lock.RUnlock()

require.NoError(t, a1.Close())
require.NoError(t, a2.Close())
}
Loading