Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Unblock read acks pump from inputhost
Browse files Browse the repository at this point in the history
If the server is draining gracefully, then the server will send a
DRAIN command. The DRAIN command used to simply stop the write pump and
wait for the server to close the stream.

Even though server successfully will close the stream after finishing
the DRAIN, the readAcks pump will never see the EOF because we don't do
a stream.Read() unless we have some messages inflight.

This patch tries to solve that by waiting for a default of a minute and
explicitly closing the connection. In addition this patch also adds some
additional logs and metrics to make sure we can track retries and
failures on publish.
  • Loading branch information
Aravind Srinivasan committed Apr 5, 2017
1 parent de07d3a commit 98855f3
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 14 deletions.
58 changes: 51 additions & 7 deletions client/cherami/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ type (
opened int32
closed int32
drained int32

sentMsgs int64 // no: of messages sent to the inputhost
sentAcks int64 // no: of messages successfully acked
sentNacks int64 // no: of messages nacked (STATUS_FAILED)
sentThrottled int64 // no: of messages throttled
failedMsgs int64 // no: of failed messages *after* we successfully wrote to the stream
writeFailedMsgs int64 // no: of messages failed to even write to the stream
}

// This struct is created by writePump after writing message to stream.
Expand All @@ -80,6 +87,7 @@ type (
const (
defaultMaxInflightMessages = 1000
defaultWGTimeout = time.Minute
defaultDrainTimeout = time.Minute
)

func newConnection(client cherami.TChanBIn, wsConnector WSConnector, path string, messages <-chan putMessageRequest,
Expand Down Expand Up @@ -168,6 +176,9 @@ func (conn *connection) stopWritePumpWithLock() {
}
conn.logger.Info("stopped write pump")
atomic.StoreInt32(&conn.drained, 1)
// now make sure we spawn a routine to wait for drain and
// close the connection if needed
go conn.waitForDrain()
}
}
func (conn *connection) close() {
Expand All @@ -193,7 +204,14 @@ func (conn *connection) close() {
}

atomic.StoreInt32(&conn.closed, 1)
conn.logger.Info("Input host connection closed.")
conn.logger.WithFields(bark.Fields{
`sentMsgs`: conn.sentMsgs,
`sentAcks`: conn.sentAcks,
`sentNacks`: conn.sentNacks,
`sentThrottled`: conn.sentThrottled,
`failedMsgs`: conn.failedMsgs,
`writeFailedMsgs`: conn.writeFailedMsgs,
}).Info("Input host connection closed.")

// trigger a reconfiguration due to connection closed
select {
Expand Down Expand Up @@ -222,6 +240,7 @@ func (conn *connection) writeMessagesPump() {

if err == nil {
conn.replyCh <- &messageResponse{pr.message.GetID(), pr.messageAck, pr.message.GetUserContext()}
conn.sentMsgs++
} else {
conn.reporter.IncCounter(metrics.PublishMessageFailedRate, nil, 1)
conn.logger.WithField(common.TagMsgID, common.FmtMsgID(pr.message.GetID())).Infof("Error writing message to stream: %v", err)
Expand All @@ -231,6 +250,7 @@ func (conn *connection) writeMessagesPump() {
Error: err,
UserContext: pr.message.GetUserContext(),
}
conn.writeFailedMsgs++

// Write failed, rebuild connection
go conn.close()
Expand All @@ -256,7 +276,7 @@ func (conn *connection) readAcksPump() {
inflightMessages := make(map[string]*messageResponse)
// This map is needed when we receive a reply out of order before the inflightMessages is populated
earlyReplyAcks := make(map[string]*PublisherReceipt)
defer failInflightMessages(inflightMessages)
defer conn.failInflightMessages(inflightMessages)

// Flag which is set when AckStream is closed by InputHost
isEOF := false
Expand Down Expand Up @@ -310,10 +330,10 @@ func (conn *connection) readAcksPump() {
// Probably we received the ack even before the inflightMessages map is populated.
// Let's put it in the earlyReplyAcks map so we can immediately complete the response when seen by this pump.
//conn.logger.WithField(common.TagAckID, common.FmtAckID(ack.GetID())).Debug("Received Ack before populating inflight messages.")
earlyReplyAcks[ack.GetID()] = processMessageAck(ack)
earlyReplyAcks[ack.GetID()] = conn.processMessageAck(ack)
} else {
delete(inflightMessages, ack.GetID())
messageResp.completion <- processMessageAck(ack)
messageResp.completion <- conn.processMessageAck(ack)
}
} else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1)
Expand Down Expand Up @@ -358,21 +378,44 @@ func (e *ackChannelClosedError) Error() string {
return "Ack channel closed."
}

func processMessageAck(messageAck *cherami.PutMessageAck) *PublisherReceipt {
func (conn *connection) processMessageAck(messageAck *cherami.PutMessageAck) *PublisherReceipt {
ret := &PublisherReceipt{
ID: messageAck.GetID(),
UserContext: messageAck.GetUserContext(),
}

if messageAck.GetStatus() != cherami.Status_OK {
stat := messageAck.GetStatus()
if stat != cherami.Status_OK {
if stat == cherami.Status_THROTTLED {
conn.sentThrottled++
} else {
conn.sentNacks++
}
ret.Error = errors.New(messageAck.GetMessage())
} else {
conn.sentAcks++
ret.Receipt = messageAck.GetReceipt()
}

return ret
}

func (conn *connection) waitForDrain() {
// wait for some timeout period and close the connection
// this is needed because even though server closes the
// connection, we never call "stream.Read" until we have some
// messages in-flight, which will not be the case when we have already drained
drainTimer := time.NewTimer(defaultDrainTimeout)
defer drainTimer.Stop()
select {
case <-conn.closeCh:
// already closed
return
case <-drainTimer.C:
go conn.close()
}
}

// populateInflightMapUtil is used to populate the inflightMessages Map,
// based on the acks we received as well.
func populateInflightMapUtil(inflightMessages map[string]*messageResponse, earlyReplyAcks map[string]*PublisherReceipt, resCh *messageResponse) {
Expand All @@ -388,12 +431,13 @@ func populateInflightMapUtil(inflightMessages map[string]*messageResponse, early
}
}

func failInflightMessages(inflightMessages map[string]*messageResponse) {
func (conn *connection) failInflightMessages(inflightMessages map[string]*messageResponse) {
for id, messageResp := range inflightMessages {
messageResp.completion <- &PublisherReceipt{
ID: id,
Error: &ackChannelClosedError{},
UserContext: messageResp.userContext,
}
conn.failedMsgs++
}
}
16 changes: 9 additions & 7 deletions client/cherami/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"sync/atomic"
"time"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/backoff"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"

Expand Down Expand Up @@ -70,12 +70,12 @@ var _ Publisher = (*publisherImpl)(nil)
// NewPublisher constructs a new Publisher object
func NewPublisher(client *clientImpl, path string, maxInflightMessagesPerConnection int) Publisher {
base := basePublisher{
client: client,
retryPolicy: createDefaultPublisherRetryPolicy(),
path: path,
logger: client.options.Logger.WithField(common.TagDstPth, common.FmtDstPth(path)),
reporter: client.options.MetricsReporter,
reconfigurationPollingInterval: client.options.ReconfigurationPollingInterval,
client: client,
retryPolicy: createDefaultPublisherRetryPolicy(),
path: path,
logger: client.options.Logger.WithField(common.TagDstPth, common.FmtDstPth(path)),
reporter: client.options.MetricsReporter,
reconfigurationPollingInterval: client.options.ReconfigurationPollingInterval,
}
publisher := &publisherImpl{
basePublisher: base,
Expand Down Expand Up @@ -182,8 +182,10 @@ func (s *publisherImpl) Publish(message *PublisherMessage) *PublisherReceipt {

select {
case receipt = <-srCh:
s.reporter.IncCounter(metrics.PublisherMessageFailed, nil, 1)
return receipt.Error
case <-timeoutTimer.C:
s.reporter.IncCounter(metrics.PublisherMessageTimedout, nil, 1)
return ErrMessageTimedout
}
}
Expand Down
7 changes: 7 additions & 0 deletions common/metrics/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ const (
PublishNumConnections = "cherami.publish.connections"
// PublishNumInflightMessagess is the number of inflight messages hold locally by publisher
PublishNumInflightMessagess = "cherami.publish.message.inflights"
// PublisherMessageFailed is the number of failed messages on the publisher
PublisherMessageFailed = "cherami.publisher.message.failed"
// PublisherMessageTimedout is the number of messages timed out on the publisher
PublisherMessageTimedout = "cherami.publisher.message.timedout"

// ConsumeMessageRate is the rate of message got from output
ConsumeMessageRate = "cherami.consume.message.rate"
Expand Down Expand Up @@ -91,8 +95,11 @@ var MetricDefs = map[MetricName]MetricType{
PublishMessageLatency: Timer,
PublishAckRate: Counter,
PublishReconfigureRate: Counter,
PublishDrainRate: Counter,
PublishNumConnections: Gauge,
PublishNumInflightMessagess: Gauge,
PublisherMessageFailed: Counter,
PublisherMessageTimedout: Counter,

ConsumeMessageRate: Counter,
ConsumeCreditRate: Counter,
Expand Down

0 comments on commit 98855f3

Please sign in to comment.