Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Report per destination bytes in counters (#91)
Browse files Browse the repository at this point in the history
This patch makes sure we report the bytes in counters for the following:
* per destination
* per extent
* per host

We only report the delta from the last round and so we should reset it
once we report it to the controller.
  • Loading branch information
aravindvs authored Mar 10, 2017
1 parent a5b36da commit c9d5ceb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 1 deletion.
8 changes: 7 additions & 1 deletion services/inputhost/exthost.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ func (conn *extHost) sendMessage(pr *inPutMessage, extSendTimer *common.Timer, w
// just get the size of the actual data here
// ignoring other headers for the size computation
// Note: size of will not work here
conn.currSizeBytes += int64(len(pr.putMsg.Data))
msgLength := int64(len(pr.putMsg.Data))
conn.currSizeBytes += msgLength
conn.extMetrics.Add(load.ExtentMetricBytesIn, msgLength)

// If we reach the max sequence number or reach the max allowed size for this extent,
// notify the extent controller but keep the pumps open.
Expand Down Expand Up @@ -802,9 +804,13 @@ func (conn *extHost) Report(reporter common.LoadReporter) {
}

msgsInPerSec := conn.extMetrics.GetAndReset(load.ExtentMetricMsgsIn) / intervalSecs
// Note: we just report the delta from the last round, which means we can
// reset the counter after this report and report the value we got this time
bytesInSinceLastReport := conn.extMetrics.GetAndReset(load.ExtentMetricBytesIn)

metric := controller.DestinationExtentMetrics{
IncomingMessagesCounter: common.Int64Ptr(msgsInPerSec),
IncomingBytesCounter: common.Int64Ptr(bytesInSinceLastReport),
}
reporter.ReportDestinationExtentMetric(conn.destUUID, conn.extUUID, metric)
}
Expand Down
10 changes: 10 additions & 0 deletions services/inputhost/inputhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func (h *InputHost) PutMessageBatch(ctx thrift.Context, request *cherami.PutMess
path := request.GetDestinationPath()
messages := request.GetMessages()
lclLg := h.logger.WithField(common.TagDstPth, common.FmtDstPth(path))
var msgLength int64

// If we are already shutting down, no need to do anything here
if atomic.AddInt32(&h.loadShutdownRef, 1) <= 0 {
Expand Down Expand Up @@ -483,6 +484,11 @@ func (h *InputHost) PutMessageBatch(ctx thrift.Context, request *cherami.PutMess
inflightMsgMap := make(map[string]struct{})

for _, msg := range messages {
msgLength = int64(len(msg.Data))
// make sure we increment the bytes in counter for this path and this host
pathCache.dstMetrics.Add(load.DstMetricBytesIn, msgLength)
pathCache.hostMetrics.Add(load.HostMetricBytesIn, msgLength)

inMsg := &inPutMessage{
putMsg: msg,
putMsgAckCh: ackChannel,
Expand Down Expand Up @@ -761,11 +767,15 @@ func (h *InputHost) Report(reporter common.LoadReporter) {
}

msgsInPerSec := h.hostMetrics.GetAndReset(load.HostMetricMsgsIn) / intervalSecs
// We just report the delta for the bytes in counter. so get the value and
// reset it.
bytesInSinceLastReport := h.hostMetrics.GetAndReset(load.HostMetricBytesIn)

hostMetrics := controller.NodeMetrics{
NumberOfActiveExtents: common.Int64Ptr(h.hostMetrics.Get(load.HostMetricNumOpenExtents)),
NumberOfConnections: common.Int64Ptr(h.hostMetrics.Get(load.HostMetricNumOpenConns)),
IncomingMessagesCounter: common.Int64Ptr(msgsInPerSec),
IncomingBytesCounter: common.Int64Ptr(bytesInSinceLastReport),
}

reporter.ReportHostMetric(hostMetrics)
Expand Down
4 changes: 4 additions & 0 deletions services/inputhost/pathCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,15 @@ func (pathCache *inPathCache) Report(reporter common.LoadReporter) {
numConnections := pathCache.dstMetrics.Get(load.DstMetricNumOpenConns)
numExtents := pathCache.dstMetrics.Get(load.DstMetricNumOpenExtents)
numMsgsInPerSec := pathCache.dstMetrics.GetAndReset(load.DstMetricMsgsIn) / diffSecs
// We just report the delta for the bytes in counter. so get the value and
// reset it.
bytesInSinceLastReport := pathCache.dstMetrics.GetAndReset(load.DstMetricBytesIn)

metric := controller.DestinationMetrics{
NumberOfConnections: common.Int64Ptr(numConnections),
NumberOfActiveExtents: common.Int64Ptr(numExtents),
IncomingMessagesCounter: common.Int64Ptr(numMsgsInPerSec),
IncomingBytesCounter: common.Int64Ptr(bytesInSinceLastReport),
}

pathCache.lastDstLoadReportedTime = now
Expand Down
7 changes: 7 additions & 0 deletions services/inputhost/pubconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (conn *pubConnection) close() {
// the writeAcksStream by sending a message to the intermediary "replyCh"
func (conn *pubConnection) readRequestStream() {
defer conn.waitWG.Done()
var msgLength int64

// Setup the connIdleTimer
connIdleTimer := common.NewTimer(conn.cacheTimeout)
Expand Down Expand Up @@ -218,6 +219,12 @@ func (conn *pubConnection) readRequestStream() {
conn.pathCache.destM3Client.IncCounter(metrics.PubConnectionScope, metrics.InputhostDestMessageReceived)
conn.pathCache.dstMetrics.Increment(load.DstMetricOverallNumMsgs)

// Note: we increment the destination bytes in counter here because we could throttle this message
// even before it reaches any of the extents (which increments the extent specific bytes in counter)
msgLength = int64(len(msg.Data))
conn.pathCache.dstMetrics.Add(load.DstMetricBytesIn, msgLength)
conn.pathCache.hostMetrics.Add(load.HostMetricBytesIn, msgLength)

conn.recvMsgs++

inMsg := &inPutMessage{
Expand Down

0 comments on commit c9d5ceb

Please sign in to comment.