Skip to content

Commit

Permalink
Merge 76ac8c4 into 3f934ba
Browse files Browse the repository at this point in the history
  • Loading branch information
andrehp committed Feb 12, 2019
2 parents 3f934ba + 76ac8c4 commit c806508
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 23 deletions.
21 changes: 8 additions & 13 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,17 @@ type (
appDieChan chan bool // app die channel
chDie chan struct{} // wait for close
chSend chan pendingMessage // push message queue
chStopHeartbeat chan struct{} // stop heartbeats
chStopWrite chan struct{} // stop writing messages
closeMutex sync.Mutex
conn net.Conn // low-level conn fd
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
lastAt int64 // last heartbeat unix time stamp
messagesBufferSize int // size of the pending messages buffer
serializer serialize.Serializer // message serializer
state int32 // current agent state
lastAt int64 // last heartbeat unix time stamp
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
metricsReporters []metrics.Reporter
serializer serialize.Serializer // message serializer
state int32 // current agent state
}

pendingMessage struct {
Expand Down Expand Up @@ -109,8 +108,6 @@ func NewAgent(
chDie: make(chan struct{}),
chSend: make(chan pendingMessage, messagesBufferSize),
messagesBufferSize: messagesBufferSize,
chStopHeartbeat: make(chan struct{}),
chStopWrite: make(chan struct{}),
conn: conn,
decoder: packetDecoder,
encoder: packetEncoder,
Expand Down Expand Up @@ -193,6 +190,8 @@ func (a *Agent) ResponseMID(ctx context.Context, mid uint, v interface{}, isErro
// Close closes the agent, cleans inner state and closes low-level connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (a *Agent) Close() error {
a.closeMutex.Lock()
defer a.closeMutex.Unlock()
if a.GetStatus() == constants.StatusClosed {
return constants.ErrCloseClosedSession
}
Expand All @@ -206,8 +205,6 @@ func (a *Agent) Close() error {
case <-a.chDie:
// expect
default:
close(a.chStopWrite)
close(a.chStopHeartbeat)
close(a.chDie)
onSessionClosed(a.Session)
}
Expand Down Expand Up @@ -307,8 +304,6 @@ func (a *Agent) heartbeat() {
}
case <-a.chDie:
return
case <-a.chStopHeartbeat:
return
}
}
}
Expand Down Expand Up @@ -401,7 +396,7 @@ func (a *Agent) write() {
if data.typ == message.Response {
metrics.ReportTimingFromCtx(data.ctx, a.metricsReporters, handlerType, m.Err)
}
case <-a.chStopWrite:
case <-a.chDie:
return
}
}
Expand Down
12 changes: 2 additions & 10 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func TestNewAgent(t *testing.T) {
assert.NotNil(t, ag)
assert.IsType(t, make(chan struct{}), ag.chDie)
assert.IsType(t, make(chan pendingMessage), ag.chSend)
assert.IsType(t, make(chan struct{}), ag.chStopHeartbeat)
assert.IsType(t, make(chan struct{}), ag.chStopWrite)
assert.Equal(t, dieChan, ag.appDieChan)
assert.Equal(t, 10, ag.messagesBufferSize)
assert.Equal(t, mockConn, ag.conn)
Expand Down Expand Up @@ -425,16 +423,10 @@ func TestAgentClose(t *testing.T) {
assert.NoError(t, err)

// validate channels are closed
stopWrite := false
stopHeartbeat := false
die := false
go func() {
for {
select {
case <-ag.chStopWrite:
stopWrite = true
case <-ag.chStopHeartbeat:
stopHeartbeat = true
case <-ag.chDie:
die = true
}
Expand All @@ -448,8 +440,8 @@ func TestAgentClose(t *testing.T) {
assert.Equal(t, ag.state, constants.StatusClosed)
assert.True(t, expected)
helpers.ShouldEventuallyReturn(
t, func() bool { return stopWrite && stopHeartbeat && die },
true, 50*time.Millisecond, 500*time.Millisecond)
t, func() bool { return die },
true, 20*time.Millisecond, 500*time.Millisecond)
}

func TestAgentRemoteAddr(t *testing.T) {
Expand Down

0 comments on commit c806508

Please sign in to comment.