Skip to content

Commit

Permalink
Fix ConnectionState being reported out of order
Browse files Browse the repository at this point in the history
Before we launched a goroutine to announce every ConnectionState change
to users. These could then be sent to the user out of order.

This commit adds a connectionStateNotifier. The connectionStateNotifier
delivers them sequentially to the user.

Resolves #624
  • Loading branch information
sukunrt authored and Sean-Der committed Mar 20, 2024
1 parent 77cc354 commit 67cc918
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 13 deletions.
13 changes: 3 additions & 10 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Agent struct {

chanCandidate chan Candidate
chanCandidatePair chan *CandidatePair
chanState chan ConnectionState
stateNotifier *connectionStateNotifier

loggerFactory logging.LoggerFactory
log logging.LeveledLogger
Expand Down Expand Up @@ -227,7 +227,6 @@ func (a *Agent) taskLoop() {

after()

close(a.chanState)
close(a.chanCandidate)
close(a.chanCandidatePair)
close(a.taskLoopDone)
Expand Down Expand Up @@ -278,7 +277,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit

a := &Agent{
chanTask: make(chan task),
chanState: make(chan ConnectionState),
chanCandidate: make(chan Candidate),
chanCandidatePair: make(chan *CandidatePair),
tieBreaker: globalMathRandomGenerator.Uint64(),
Expand Down Expand Up @@ -322,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit

disableActiveTCP: config.DisableActiveTCP,
}
a.stateNotifier = &connectionStateNotifier{NotificationFunc: a.onConnectionStateChange}

if a.net == nil {
a.net, err = stdnet.NewNet()
Expand Down Expand Up @@ -369,7 +368,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
// Blocking one by the other one causes deadlock.
// Hence, we call handlers from independent Goroutines.
go a.candidatePairRoutine()
go a.connectionStateRoutine()
go a.candidateRoutine()

// Restart is also used to initialize the agent for the first time
Expand Down Expand Up @@ -503,12 +501,7 @@ func (a *Agent) updateConnectionState(newState ConnectionState) {

a.log.Infof("Setting new connection state: %s", newState)
a.connectionState = newState

// Call handler after finishing current task since we may be holding the agent lock
// and the handler may also require it
a.afterRun(func(_ context.Context) {
a.chanState <- newState
})
a.stateNotifier.Enqueue(newState)
}
}

Expand Down
34 changes: 31 additions & 3 deletions agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package ice

import "sync"

// OnConnectionStateChange sets a handler that is fired when the connection state changes
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
a.onConnectionStateChangeHdlr.Store(f)
Expand Down Expand Up @@ -47,9 +49,35 @@ func (a *Agent) candidatePairRoutine() {
}
}

func (a *Agent) connectionStateRoutine() {
for s := range a.chanState {
go a.onConnectionStateChange(s)
type connectionStateNotifier struct {
sync.Mutex
states []ConnectionState
running bool
NotificationFunc func(ConnectionState)
}

func (c *connectionStateNotifier) Enqueue(s ConnectionState) {
c.Lock()
defer c.Unlock()
c.states = append(c.states, s)
if !c.running {
c.running = true
go c.notify()
}
}

func (c *connectionStateNotifier) notify() {
for {
c.Lock()
if len(c.states) == 0 {
c.running = false
c.Unlock()
return
}
s := c.states[0]
c.states = c.states[1:]
c.Unlock()
c.NotificationFunc(s)
}
}

Expand Down
71 changes: 71 additions & 0 deletions agent_handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package ice

import (
"testing"
"time"

"github.com/pion/transport/v3/test"
)

func TestConnectionStateNotifier(t *testing.T) {
t.Run("TestManyUpdates", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan struct{}, 1)
c := &connectionStateNotifier{
NotificationFunc: func(_ ConnectionState) {
updates <- struct{}{}
},
}
// Enqueue all updates upfront to ensure that it
// doesn't block
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionStateNew)
}
done := make(chan struct{})
go func() {
for i := 0; i < 10000; i++ {
<-updates
}
select {
case <-updates:
t.Errorf("received more updates than expected")
case <-time.After(1 * time.Second):
}
close(done)
}()
<-done
})
t.Run("TestUpdateOrdering", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan ConnectionState)
c := &connectionStateNotifier{
NotificationFunc: func(cs ConnectionState) {
updates <- cs
},
}
done := make(chan struct{})
go func() {
for i := 0; i < 10000; i++ {
x := <-updates
if x != ConnectionState(i) {
t.Errorf("expected %d got %d", x, i)
}
}
select {
case <-updates:
t.Errorf("received more updates than expected")
case <-time.After(1 * time.Second):
}
close(done)
}()
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionState(i))
}
<-done
})
}

0 comments on commit 67cc918

Please sign in to comment.