Skip to content

Commit

Permalink
Use Notification Queue from 67cc918 more
Browse files Browse the repository at this point in the history
Deliver Candidates and Selected CandidatePairs using the same queue.
This means that things are delivered in order and we don't have to worry
about blocking
  • Loading branch information
Sean-Der committed Mar 21, 2024
1 parent b386d44 commit d17be4d
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 85 deletions.
83 changes: 35 additions & 48 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ type Agent struct {
gatherCandidateCancel func()
gatherCandidateDone chan struct{}

chanCandidate chan Candidate
chanCandidatePair chan *CandidatePair
stateNotifier *connectionStateNotifier
connectionStateNotifier *handlerNotifier
candidateNotifier *handlerNotifier
selectedCandidatePairNotifier *handlerNotifier

loggerFactory logging.LoggerFactory
log logging.LeveledLogger
Expand Down Expand Up @@ -227,8 +227,6 @@ func (a *Agent) taskLoop() {

after()

close(a.chanCandidate)
close(a.chanCandidatePair)
close(a.taskLoopDone)
}()

Expand Down Expand Up @@ -276,32 +274,30 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
startedCtx, startedFn := context.WithCancel(context.Background())

a := &Agent{
chanTask: make(chan task),
chanCandidate: make(chan Candidate),
chanCandidatePair: make(chan *CandidatePair),
tieBreaker: globalMathRandomGenerator.Uint64(),
lite: config.Lite,
gatheringState: GatheringStateNew,
connectionState: ConnectionStateNew,
localCandidates: make(map[NetworkType][]Candidate),
remoteCandidates: make(map[NetworkType][]Candidate),
urls: config.Urls,
networkTypes: config.NetworkTypes,
onConnected: make(chan struct{}),
buf: packetio.NewBuffer(),
done: make(chan struct{}),
taskLoopDone: make(chan struct{}),
startedCh: startedCtx.Done(),
startedFn: startedFn,
portMin: config.PortMin,
portMax: config.PortMax,
loggerFactory: loggerFactory,
log: log,
net: config.Net,
proxyDialer: config.ProxyDialer,
tcpMux: config.TCPMux,
udpMux: config.UDPMux,
udpMuxSrflx: config.UDPMuxSrflx,
chanTask: make(chan task),
tieBreaker: globalMathRandomGenerator.Uint64(),
lite: config.Lite,
gatheringState: GatheringStateNew,
connectionState: ConnectionStateNew,
localCandidates: make(map[NetworkType][]Candidate),
remoteCandidates: make(map[NetworkType][]Candidate),
urls: config.Urls,
networkTypes: config.NetworkTypes,
onConnected: make(chan struct{}),
buf: packetio.NewBuffer(),
done: make(chan struct{}),
taskLoopDone: make(chan struct{}),
startedCh: startedCtx.Done(),
startedFn: startedFn,
portMin: config.PortMin,
portMax: config.PortMax,
loggerFactory: loggerFactory,
log: log,
net: config.Net,
proxyDialer: config.ProxyDialer,
tcpMux: config.TCPMux,
udpMux: config.UDPMux,
udpMuxSrflx: config.UDPMuxSrflx,

mDNSMode: mDNSMode,
mDNSName: mDNSName,
Expand All @@ -320,7 +316,9 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit

disableActiveTCP: config.DisableActiveTCP,
}
a.stateNotifier = &connectionStateNotifier{NotificationFunc: a.onConnectionStateChange}
a.connectionStateNotifier = &handlerNotifier{connectionStateFunc: a.onConnectionStateChange}
a.candidateNotifier = &handlerNotifier{candidateFunc: a.onCandidate}
a.selectedCandidatePairNotifier = &handlerNotifier{candidatePairFunc: a.onSelectedCandidatePairChange}

if a.net == nil {
a.net, err = stdnet.NewNet()
Expand Down Expand Up @@ -364,12 +362,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit

go a.taskLoop()

// CandidatePair and ConnectionState are usually changed at once.
// Blocking one by the other one causes deadlock.
// Hence, we call handlers from independent Goroutines.
go a.candidatePairRoutine()
go a.candidateRoutine()

// Restart is also used to initialize the agent for the first time
if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
a.closeMulticastConn()
Expand Down Expand Up @@ -501,7 +493,7 @@ func (a *Agent) updateConnectionState(newState ConnectionState) {

a.log.Infof("Setting new connection state: %s", newState)
a.connectionState = newState
a.stateNotifier.Enqueue(newState)
a.connectionStateNotifier.EnqueueConnectionState(newState)
}
}

Expand All @@ -520,12 +512,7 @@ func (a *Agent) setSelectedPair(p *CandidatePair) {
a.updateConnectionState(ConnectionStateConnected)

// Notify when the selected pair changes
a.afterRun(func(ctx context.Context) {
select {
case a.chanCandidatePair <- p:
case <-ctx.Done():
}
})
a.selectedCandidatePairNotifier.EnqueueSelectedCandidatePair(p)

// Signal connected
a.onConnectedOnce.Do(func() { close(a.onConnected) })
Expand Down Expand Up @@ -761,7 +748,7 @@ func (a *Agent) addRemotePassiveTCPCandidate(remoteCandidate Candidate) {

localCandidate.start(a, conn, a.startedCh)
a.localCandidates[localCandidate.NetworkType()] = append(a.localCandidates[localCandidate.NetworkType()], localCandidate)
a.chanCandidate <- localCandidate
a.candidateNotifier.EnqueueCandidate(localCandidate)

a.addPair(localCandidate, remoteCandidate)
}
Expand Down Expand Up @@ -831,7 +818,7 @@ func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net

a.requestConnectivityCheck()

a.chanCandidate <- c
a.candidateNotifier.EnqueueCandidate(c)
})
}

Expand Down Expand Up @@ -1267,7 +1254,7 @@ func (a *Agent) setGatheringState(newState GatheringState) error {
done := make(chan struct{})
if err := a.run(a.context(), func(context.Context, *Agent) {
if a.gatheringState != newState && newState == GatheringStateComplete {
a.chanCandidate <- nil
a.candidateNotifier.EnqueueCandidate(nil)
}

a.gatheringState = newState
Expand Down
110 changes: 79 additions & 31 deletions agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,46 +43,94 @@ func (a *Agent) onConnectionStateChange(s ConnectionState) {
}
}

func (a *Agent) candidatePairRoutine() {
for p := range a.chanCandidatePair {
a.onSelectedCandidatePairChange(p)
}
}

type connectionStateNotifier struct {
type handlerNotifier struct {
sync.Mutex
states []ConnectionState
running bool
NotificationFunc func(ConnectionState)
running bool

connectionStates []ConnectionState
connectionStateFunc func(ConnectionState)

candidates []Candidate
candidateFunc func(Candidate)

selectedCandidatePairs []*CandidatePair
candidatePairFunc func(*CandidatePair)
}

func (c *connectionStateNotifier) Enqueue(s ConnectionState) {
c.Lock()
defer c.Unlock()
c.states = append(c.states, s)
if !c.running {
c.running = true
go c.notify()
func (h *handlerNotifier) EnqueueConnectionState(s ConnectionState) {
h.Lock()
defer h.Unlock()

notify := func() {
for {
h.Lock()
if len(h.connectionStates) == 0 {
h.running = false
h.Unlock()
return
}
notification := h.connectionStates[0]
h.connectionStates = h.connectionStates[1:]
h.Unlock()
h.connectionStateFunc(notification)
}
}

h.connectionStates = append(h.connectionStates, s)
if !h.running {
h.running = true
go notify()
}
}

func (c *connectionStateNotifier) notify() {
for {
c.Lock()
if len(c.states) == 0 {
c.running = false
c.Unlock()
return
func (h *handlerNotifier) EnqueueCandidate(c Candidate) {
h.Lock()
defer h.Unlock()

notify := func() {
for {
h.Lock()
if len(h.candidates) == 0 {
h.running = false
h.Unlock()
return
}
notification := h.candidates[0]
h.candidates = h.candidates[1:]
h.Unlock()
h.candidateFunc(notification)
}
s := c.states[0]
c.states = c.states[1:]
c.Unlock()
c.NotificationFunc(s)
}

h.candidates = append(h.candidates, c)
if !h.running {
h.running = true
go notify()
}
}

func (a *Agent) candidateRoutine() {
for c := range a.chanCandidate {
a.onCandidate(c)
func (h *handlerNotifier) EnqueueSelectedCandidatePair(p *CandidatePair) {
h.Lock()
defer h.Unlock()

notify := func() {
for {
h.Lock()
if len(h.selectedCandidatePairs) == 0 {
h.running = false
h.Unlock()
return
}
notification := h.selectedCandidatePairs[0]
h.selectedCandidatePairs = h.selectedCandidatePairs[1:]
h.Unlock()
h.candidatePairFunc(notification)
}
}

h.selectedCandidatePairs = append(h.selectedCandidatePairs, p)
if !h.running {
h.running = true
go notify()
}
}
12 changes: 6 additions & 6 deletions agent_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ func TestConnectionStateNotifier(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan struct{}, 1)
c := &connectionStateNotifier{
NotificationFunc: func(_ ConnectionState) {
c := &handlerNotifier{
connectionStateFunc: func(_ ConnectionState) {
updates <- struct{}{}
},
}
// Enqueue all updates upfront to ensure that it
// doesn't block
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionStateNew)
c.EnqueueConnectionState(ConnectionStateNew)
}
done := make(chan struct{})
go func() {
Expand All @@ -43,8 +43,8 @@ func TestConnectionStateNotifier(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan ConnectionState)
c := &connectionStateNotifier{
NotificationFunc: func(cs ConnectionState) {
c := &handlerNotifier{
connectionStateFunc: func(cs ConnectionState) {
updates <- cs
},
}
Expand All @@ -64,7 +64,7 @@ func TestConnectionStateNotifier(t *testing.T) {
close(done)
}()
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionState(i))
c.EnqueueConnectionState(ConnectionState(i))
}
<-done
})
Expand Down

0 comments on commit d17be4d

Please sign in to comment.