Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Reduce high-latency log threshold to 1 second and add throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Jul 24, 2017
1 parent a4c04f2 commit d67895e
Showing 1 changed file with 49 additions and 30 deletions.
79 changes: 49 additions & 30 deletions services/inputhost/pubconnection.go
Expand Up @@ -28,13 +28,17 @@ import (
"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/common/throttler"
"github.com/uber/cherami-server/services/inputhost/load"
serverStream "github.com/uber/cherami-server/stream"
"github.com/uber/cherami-thrift/.generated/go/cherami"
)

// timeLatencyToLog is the time threshold for logging
const timeLatencyToLog = 70 * time.Second
// logHighLatencyThreshold is the time threshold for logging
const logHighLatencyThreshold = time.Second

var logThrottler = throttler.New(1, 5*time.Second) // log only once every 5 seconds at max
var logsThrottled int32

type (
pubConnection struct {
Expand Down Expand Up @@ -481,15 +485,20 @@ func (conn *pubConnection) failInflightMessages(inflightMessages map[string]resp
d = time.Since(resp.putMsgRecvTime)
conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, d)
conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, d)
if d > timeLatencyToLog {
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(id),
`duration`: d,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
}).Error(`failInflightMessages: publish message latency`)
if d > logHighLatencyThreshold {
if logThrottler.Allow() {
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(id),
`duration`: d,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
`throttled`: atomic.SwapInt32(&logsThrottled, 0),
}).Error(`failInflightMessages: publish message latency`)
} else {
atomic.AddInt32(&logsThrottled, 1)
}
}
// Record the number of failed messages
conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures)
Expand Down Expand Up @@ -608,15 +617,20 @@ func (conn *pubConnection) updateInflightMap(inflightMessages map[string]respons
conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))
conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime))

if d > timeLatencyToLog {
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(ackID),
`d`: d,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
}).Info(`publish message latency at updateInflightMap`)
if d > logHighLatencyThreshold {
if logThrottler.Allow() {
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(ackID),
`d`: d,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
`throttled`: atomic.SwapInt32(&logsThrottled, 0),
}).Info(`publish message latency at updateInflightMap`)
} else {
atomic.AddInt32(&logsThrottled, 1)
}
}
delete(inflightMessages, ackID)
return ok
Expand All @@ -631,16 +645,21 @@ func (conn *pubConnection) updateEarlyReplyAcks(resCh response, earlyReplyAcks m
d := time.Since(resCh.putMsgRecvTime)
ack, _ := earlyReplyAcks[resCh.ackID]
actualDuration := d - time.Since(ack.ackSentTime)
if d > timeLatencyToLog {
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(resCh.ackID),
`d`: d,
`actualDuration`: actualDuration,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
}).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`)
if d > logHighLatencyThreshold {
if logThrottler.Allow() {
conn.logger.WithFields(bark.Fields{
common.TagDstPth: common.FmtDstPth(conn.destinationPath),
common.TagInPutAckID: common.FmtInPutAckID(resCh.ackID),
`d`: d,
`actualDuration`: actualDuration,
`putMsgChanLen`: len(conn.putMsgCh),
`putMsgAckChanLen`: len(conn.ackChannel),
`replyChanLen`: len(conn.replyCh),
`throttled`: atomic.SwapInt32(&logsThrottled, 0),
}).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`)
} else {
atomic.AddInt32(&logsThrottled, 1)
}
}
conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, actualDuration)
conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, actualDuration)
Expand Down

0 comments on commit d67895e

Please sign in to comment.