Skip to content

Commit

Permalink
Merge pull request #148 from victor-carvalho/master
Browse files Browse the repository at this point in the history
Fix agent race condition
  • Loading branch information
leohahn committed Dec 7, 2020
2 parents 1461dd7 + 01af62b commit 7cb7a91
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions agent/agent.go
Expand Up @@ -203,7 +203,11 @@ func (a *Agent) send(pendingMsg pendingMessage) (err error) {
pWrite.err = util.GetErrorFromPayload(a.serializer, m.Data)
}

a.chSend <- pWrite
// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pWrite:
case <-a.chDie:
}
return
}

Expand Down Expand Up @@ -326,10 +330,7 @@ func (a *Agent) Handle() {

go a.write()
go a.heartbeat()
select {
case <-a.chDie: // agent closed signal
return
}
<-a.chDie // agent closed signal
}

// IPVersion returns the remote address ip version.
Expand Down Expand Up @@ -365,7 +366,15 @@ func (a *Agent) heartbeat() {
logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
return
}
a.chSend <- pendingWrite{data: hbd}

// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pendingWrite{data: hbd}:
case <-a.chDie:
return
case <-a.chStopHeartbeat:
return
}
case <-a.chDie:
return
case <-a.chStopHeartbeat:
Expand Down Expand Up @@ -399,7 +408,6 @@ func (a *Agent) SendHandshakeResponse() error {
func (a *Agent) write() {
// clean func
defer func() {
close(a.chSend)
a.Close()
}()

Expand Down

0 comments on commit 7cb7a91

Please sign in to comment.