Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Fix inputhost connection close
Browse files Browse the repository at this point in the history
Fix connection close to make sure we always wait for sometime to give
a chance to drain.

We can avoid using the atomic variables for closed and we can just rely
on the shutting down channel to make sure we close is idempotent.
  • Loading branch information
Aravind Srinivasan committed Apr 6, 2017
1 parent 9e68374 commit 82196a0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 51 deletions.
76 changes: 34 additions & 42 deletions client/cherami/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ type (
logger bark.Logger
reporter metrics.Reporter

lk sync.Mutex
opened int32
closed int32
drained int32
lk sync.Mutex
opened int32

sentMsgs int64 // no: of messages sent to the inputhost
sentAcks int64 // no: of messages successfully acked
Expand Down Expand Up @@ -176,12 +174,15 @@ func (conn *connection) stopWritePumpWithLockAndClose() {
conn.logger.Warn("writeMsgPumpWG timed out")
}
conn.logger.Info("stopped write pump")
atomic.StoreInt32(&conn.drained, 1)
// now make sure we spawn a routine to wait for drain and
// close the connection if needed
go conn.waitForDrainAndClose()
}
}

// close always stops the write pump first and then
// waits for some time to make sure we give a chance to
// drain inflight messages and then close the read pump
func (conn *connection) close() {
// First shutdown the write pump to make sure we don't leave any message without ack
conn.stopWritePumpAndClose()
Expand Down Expand Up @@ -336,11 +337,7 @@ func (conn *connection) isOpened() bool {
// the controller returns the same old extent which is draining,
// the server will anyway reject the connection
func (conn *connection) isClosed() bool {
return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0)
}

func (conn *connection) isDraining() bool {
return atomic.LoadInt32(&conn.drained) != 0
return conn.isShuttingDown()
}

func (e *ackChannelClosedError) Error() string {
Expand Down Expand Up @@ -385,43 +382,40 @@ func (conn *connection) finalClose() {
conn.lk.Lock()
defer conn.lk.Unlock()

// XXX: do we even need this now. At this point we know for sure that this is going to be just one call
if atomic.LoadInt32(&conn.closed) == 0 {
// Now shutdown the read pump and drain all inflight messages
close(conn.closeCh)
if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("readAckPumpWG timed out")
}
// Now shutdown the read pump and drain all inflight messages
close(conn.closeCh)
if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("readAckPumpWG timed out")
}

// Both pumps are shutdown. Close the underlying stream.
if conn.cancel != nil {
conn.cancel()
}
// Both pumps are shutdown. Close the underlying stream.
if conn.cancel != nil {
conn.cancel()
}

if conn.inputHostStream != nil {
conn.inputHostStream.Done()
}
if conn.inputHostStream != nil {
conn.inputHostStream.Done()
}

atomic.StoreInt32(&conn.closed, 1)
conn.logger.WithFields(bark.Fields{
`sentMsgs`: conn.sentMsgs,
`sentAcks`: conn.sentAcks,
`sentNacks`: conn.sentNacks,
`sentThrottled`: conn.sentThrottled,
`failedMsgs`: conn.failedMsgs,
`writeFailedMsgs`: conn.writeFailedMsgs,
}).Info("Input host connection closed.")

// trigger a reconfiguration due to connection closed
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: connClosedReconfigureType, reconfigureID: conn.connKey}:
default:
conn.logger.Info("Reconfigure channel is full. Drop reconfigure command due to connection close.")
}
conn.logger.WithFields(bark.Fields{
`sentMsgs`: conn.sentMsgs,
`sentAcks`: conn.sentAcks,
`sentNacks`: conn.sentNacks,
`sentThrottled`: conn.sentThrottled,
`failedMsgs`: conn.failedMsgs,
`writeFailedMsgs`: conn.writeFailedMsgs,
}).Info("Input host connection closed.")

// trigger a reconfiguration due to connection closed
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: connClosedReconfigureType, reconfigureID: conn.connKey}:
default:
conn.logger.Info("Reconfigure channel is full. Drop reconfigure command due to connection close.")
}
}

func (conn *connection) waitForDrainAndClose() {
defer conn.finalClose()
// wait for some timeout period and close the connection
// this is needed because even though server closes the
// connection, we never call "stream.Read" until we have some
Expand All @@ -439,11 +433,9 @@ func (conn *connection) waitForDrainAndClose() {
// check if we got the acks for all sent messages
if conn.isAlreadyDrained() {
conn.logger.Infof("Inputhost connection drained completely")
conn.finalClose()
return
}
case <-drainTimer.C:
conn.finalClose()
return
}
}
Expand Down
5 changes: 1 addition & 4 deletions client/cherami/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package cherami
import (
"errors"
"io"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -224,9 +223,7 @@ func (s *ConnectionSuite) TestClientDrain() {

messagesCh <- putMessageRequest{message, requestDone}
<-time.After(10 * time.Millisecond)
// drain must be set
s.Equal(int32(1), atomic.LoadInt32(&conn.drained))
// closed must return true as well
// we must have drained and so closed must return true
s.True(conn.isClosed())
}

Expand Down
7 changes: 2 additions & 5 deletions client/cherami/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,8 @@ func (s *publisherImpl) reconfigurePublisher() {
for host, inputHostConn := range s.connections {
if _, ok := currentHosts[host]; !ok {
connLogger := s.logger.WithField(common.TagHostIP, common.FmtHostIP(inputHostConn.connKey))
// only close if we are not draining
if !inputHostConn.isDraining() {
connLogger.Info("Closing connection to InputHost after reconfiguration.")
inputHostConn.close()
}
connLogger.Info("Closing connection to InputHost after reconfiguration.")
inputHostConn.close()
}
}

Expand Down

0 comments on commit 82196a0

Please sign in to comment.