From de07d3a434a354d72c5cea1c6afd40d9d26c62f3 Mon Sep 17 00:00:00 2001 From: aravindvs Date: Fri, 31 Mar 2017 16:07:42 -0700 Subject: [PATCH] Graceful drain on client (#16) * Add support for DRAIN command on the client This patch makes sure the client can handle the DRAIN command from the inputhost. As soon as it receives the command, it will stop it's write pump and will mark itself as "closed" so that if we get the same inputhost as a result of reconfiguration, we open a new connection object. Add a unit test to test the same. --- client/cherami/connection.go | 82 +++++++++++++++++++++++++------ client/cherami/connection_test.go | 43 ++++++++++++++++ common/metrics/names.go | 2 + glide.lock | 16 +++--- 4 files changed, 119 insertions(+), 24 deletions(-) diff --git a/client/cherami/connection.go b/client/cherami/connection.go index df0c295..c929c67 100644 --- a/client/cherami/connection.go +++ b/client/cherami/connection.go @@ -31,10 +31,10 @@ import ( "time" - "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-client-go/common/metrics" "github.com/uber/cherami-client-go/stream" + "github.com/uber/cherami-thrift/.generated/go/cherami" ) type ( @@ -56,9 +56,10 @@ type ( logger bark.Logger reporter metrics.Reporter - lk sync.Mutex - opened int32 - closed int32 + lk sync.Mutex + opened int32 + closed int32 + drained int32 } // This struct is created by writePump after writing message to stream. @@ -140,17 +141,42 @@ func (conn *connection) open() error { return nil } -func (conn *connection) close() { +func (conn *connection) isShuttingDown() bool { + select { + case <-conn.shuttingDownCh: + // already shutdown + return true + default: + } + + return false +} + +// stopWritePump should drain the pump after getting the lock +func (conn *connection) stopWritePump() { conn.lk.Lock() - defer conn.lk.Unlock() + conn.stopWritePumpWithLock() + conn.lk.Unlock() +} - if atomic.LoadInt32(&conn.closed) == 0 { - // First shutdown the write pump to make sure we don't leave any message without ack +// Note: this needs to be called with the conn lock held! +func (conn *connection) stopWritePumpWithLock() { + if !conn.isShuttingDown() { close(conn.shuttingDownCh) if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok { conn.logger.Warn("writeMsgPumpWG timed out") } + conn.logger.Info("stopped write pump") + atomic.StoreInt32(&conn.drained, 1) + } +} +func (conn *connection) close() { + conn.lk.Lock() + defer conn.lk.Unlock() + if atomic.LoadInt32(&conn.closed) == 0 { + // First shutdown the write pump to make sure we don't leave any message without ack + conn.stopWritePumpWithLock() // Now shutdown the read pump and drain all inflight messages close(conn.closeCh) if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok { @@ -216,6 +242,14 @@ func (conn *connection) writeMessagesPump() { } } +func (conn *connection) handleReconfigCmd(reconfigInfo *cherami.ReconfigureInfo) { + select { + case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}: + default: + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") + } +} + func (conn *connection) readAcksPump() { defer conn.readAckPumpWG.Done() @@ -283,14 +317,22 @@ func (conn *connection) readAcksPump() { } } else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE { conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1) - reconfigInfo := cmd.Reconfigure - conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.") - select { - case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}: - default: - conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") - } + rInfo := cmd.Reconfigure + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.") + conn.handleReconfigCmd(rInfo) + } else if cmd.GetType() == cherami.InputHostCommandType_DRAIN { + // drain all inflight messages + // reconfigure to pick up new extents if any + conn.reporter.IncCounter(metrics.PublishDrainRate, nil, 1) + // start draining by just stopping the write pump. + // this makes sure, we don't send any new messages. + // the read pump will exit when the server completes the drain + go conn.stopWritePump() + rInfo := cmd.Reconfigure + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.") + conn.handleReconfigCmd(rInfo) } + } } } @@ -300,8 +342,16 @@ func (conn *connection) isOpened() bool { return atomic.LoadInt32(&conn.opened) != 0 } +// isClosed should return true if either closed it set +// or if "drain" is set, which means the connection has already +// stopped the write pump. +// This is needed to make sure if reconfigure gives the same +// inputhost, we should open up a new connection object (most +// likely for a new extent). In the worst case, if for some reason +// the controller returns the same old extent which is draining, +// the server will anyway reject the connection func (conn *connection) isClosed() bool { - return atomic.LoadInt32(&conn.closed) != 0 + return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0) } func (e *ackChannelClosedError) Error() string { diff --git a/client/cherami/connection_test.go b/client/cherami/connection_test.go index 9179287..271985e 100644 --- a/client/cherami/connection_test.go +++ b/client/cherami/connection_test.go @@ -23,12 +23,14 @@ package cherami import ( "errors" "io" + "sync/atomic" "testing" "time" _ "fmt" _ "strconv" + "github.com/pborman/uuid" "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-client-go/common/metrics" mc "github.com/uber/cherami-client-go/mocks/clients/cherami" @@ -195,6 +197,39 @@ func (s *ConnectionSuite) TestAckClosedByInputHost() { //inputHostClient.AssertExpectations(s.T()) } +func (s *ConnectionSuite) TestClientDrain() { + conn, inputHostClient, messagesCh := createConnection() + + appendTicker := time.NewTicker(5 * time.Millisecond) + defer appendTicker.Stop() + + // setup inputhost to send a DRAIN command and then an EOF + inputHostClient.On("Write", mock.Anything).Return(nil) + inputHostClient.On("Flush").Return(nil) + inputHostClient.On("Read").Return(wrapDrainInCommand(&cherami.ReconfigureInfo{ + UpdateUUID: common.StringPtr(uuid.New()), + }), nil).WaitUntil(appendTicker.C).Once() + inputHostClient.On("Read").Return(nil, io.EOF).Once() + inputHostClient.On("Done").Return(nil) + + conn.open() + s.True(conn.isOpened(), "Connection not opened.") + + message := &cherami.PutMessage{ + ID: common.StringPtr("1"), + Data: []byte("test"), + } + + requestDone := make(chan *PublisherReceipt, 1) + + messagesCh <- putMessageRequest{message, requestDone} + <-time.After(10 * time.Millisecond) + // drain must be set + s.Equal(int32(1), atomic.LoadInt32(&conn.drained)) + // closed must return true as well + s.True(conn.isClosed()) +} + func (s *ConnectionSuite) TestClientClosed() { conn, inputHostClient, messagesCh := createConnection() @@ -359,3 +394,11 @@ func wrapAckInCommand(ack *cherami.PutMessageAck) *cherami.InputHostCommand { return cmd } + +func wrapDrainInCommand(reconfigure *cherami.ReconfigureInfo) *cherami.InputHostCommand { + cmd := cherami.NewInputHostCommand() + cmd.Type = common.CheramiInputHostCommandTypePtr(cherami.InputHostCommandType_DRAIN) + cmd.Reconfigure = reconfigure + + return cmd +} diff --git a/common/metrics/names.go b/common/metrics/names.go index 19286ab..6ac7916 100644 --- a/common/metrics/names.go +++ b/common/metrics/names.go @@ -49,6 +49,8 @@ const ( PublishAckRate = "cherami.publish.ack.rate" // PublishReconfigureRate is the rate of reconfiguration happening PublishReconfigureRate = "cherami.publish.reconfigure.rate" + // PublishDrainRate is the rate of drain happening + PublishDrainRate = "cherami.publish.drain.rate" // PublishNumConnections is the number of connections with input PublishNumConnections = "cherami.publish.connections" // PublishNumInflightMessagess is the number of inflight messages hold locally by publisher diff --git a/glide.lock b/glide.lock index c37638d..9f68efc 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ hash: 57d13a71d768c6f1054eaf65aeedd138d1f50b98b3a4b9722bdee5acf5839f07 -updated: 2017-02-20T14:20:30.465282917-08:00 +updated: 2017-03-30T13:09:37.238077112-07:00 imports: - name: github.com/apache/thrift - version: 2d6060d882069ed3e3d6302aa63ea7eb4bb155ad + version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/cactus/go-statsd-client @@ -21,13 +21,13 @@ imports: - ext - log - name: github.com/pborman/uuid - version: 1b00554d822231195d1babd97ff4a781231955c9 + version: a97ce2ca70fa5a848076093f05e639a89ca34d06 - name: github.com/pmezard/go-difflib version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: - difflib - name: github.com/Sirupsen/logrus - version: 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d + version: ba1b36c82c5e05c4f912a88eab0dcd91a171688f - name: github.com/stretchr/objx version: 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 - name: github.com/stretchr/testify @@ -42,11 +42,11 @@ imports: - name: github.com/uber-go/atomic version: 3b8db5e93c4c02efbc313e17b2e796b0914a01fb - name: github.com/uber/cherami-thrift - version: 2e84419711eb57a3be7412c18de80f9fe7a0cc60 + version: 8cf6af068f1f1afd930c6a43ecdd9d0dca2289d5 subpackages: - .generated/go/cherami - name: github.com/uber/tchannel-go - version: 79387824978f91318be3bfb43ae12e04c38cfe97 + version: 0b7f160817553b0bacb5b108dd84a5022dbdd1c4 subpackages: - hyperbahn - hyperbahn/gen-go/hyperbahn @@ -59,11 +59,11 @@ imports: - trand - typed - name: golang.org/x/net - version: 6b27048ae5e6ad1ef927e72e437531493de612fe + version: a6577fac2d73be281a500b310739095313165611 subpackages: - context - name: golang.org/x/sys - version: d75a52659825e75fff6158388dddc6a5b04f9ba5 + version: d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 subpackages: - unix testImports: []