Skip to content

Commit

Permalink
report queue metrics with ticker. Instrument events sent/errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenruizdegauna committed Mar 10, 2022
1 parent 6538043 commit d76acb9
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
4 changes: 4 additions & 0 deletions internal/agent/agent.go
Expand Up @@ -1018,6 +1018,9 @@ func (c *context) ActiveEntitiesChannel() chan string {
}

func (c *context) SendEvent(event sample.Event, entityKey entity.Key) {
_, txn := instrumentation.SelfInstrumentation.StartTransaction(context2.Background(), "agent.queue_event")
defer txn.End()

if c.eventSender == nil {
aclog.
WithField("entity_key", entityKey.String()).
Expand Down Expand Up @@ -1050,6 +1053,7 @@ func (c *context) SendEvent(event sample.Event, entityKey entity.Key) {
}

if err := c.eventSender.QueueEvent(event, entityKey); err != nil {
txn.NoticeError(err)
alog.WithField(
"entityKey", entityKey,
).WithError(err).Error("could not queue event")
Expand Down
14 changes: 12 additions & 2 deletions internal/agent/connect_service.go
Expand Up @@ -3,15 +3,16 @@
package agent

import (
goContext "context"
"errors"
"time"

"github.com/newrelic/infrastructure-agent/pkg/log"

"github.com/newrelic/infrastructure-agent/internal/agent/instrumentation"
"github.com/newrelic/infrastructure-agent/pkg/backend/backoff"
"github.com/newrelic/infrastructure-agent/pkg/backend/identityapi"
"github.com/newrelic/infrastructure-agent/pkg/entity"
"github.com/newrelic/infrastructure-agent/pkg/helpers/fingerprint"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/newrelic/infrastructure-agent/pkg/trace"
)

Expand All @@ -36,6 +37,9 @@ func NewIdentityConnectService(client identityapi.IdentityConnectClient, fingerp
func (ic *identityConnectService) Connect() entity.Identity {
var retryBO *backoff.Backoff

_, txn := instrumentation.SelfInstrumentation.StartTransaction(goContext.Background(), "agent.connect")
defer txn.End()

for {
f, err := ic.fingerprintHarvest.Harvest()
if err != nil {
Expand Down Expand Up @@ -81,6 +85,9 @@ func (ic *identityConnectService) Connect() entity.Identity {
// ConnectUpdate will check for system fingerprint changes and will update it if it's the case.
// It returns the same ID provided as argument if there is an error
func (ic *identityConnectService) ConnectUpdate(agentIdn entity.Identity) (entityIdn entity.Identity, err error) {
_, txn := instrumentation.SelfInstrumentation.StartTransaction(goContext.Background(), "agent.connect_update")
defer txn.End()

if agentIdn.ID.IsEmpty() {
logger.Warn(ErrEmptyEntityID.Error())
}
Expand Down Expand Up @@ -127,6 +134,9 @@ func (ic *identityConnectService) ConnectUpdate(agentIdn entity.Identity) (entit

// Disconnect is used to signal the backend that the agent will stop.
func (ic *identityConnectService) Disconnect(agentID entity.ID, state identityapi.DisconnectReason) error {
_, txn := instrumentation.SelfInstrumentation.StartTransaction(goContext.Background(), "agent.disconnect")
defer txn.End()

logger.WithField("state", state).Info("calling disconnect")

if agentID.IsEmpty() {
Expand Down
30 changes: 22 additions & 8 deletions internal/agent/event_sender.go
Expand Up @@ -140,7 +140,12 @@ func (sender *metricsIngestSender) Start() (err error) {
sender.stopChannel = make(chan bool)

// Wait for accumulateBatches and sendBatches to complete
sender.internalRoutineWaits.Add(2)
sender.internalRoutineWaits.Add(3)

go func() {
defer sender.internalRoutineWaits.Done()
reportEventQueueMetrics(sender.eventQueue, sender.stopChannel)
}()

go func() {
defer sender.internalRoutineWaits.Done()
Expand Down Expand Up @@ -195,19 +200,28 @@ func (sender *metricsIngestSender) QueueEvent(event sample.Event, key entity.Key

select {
case sender.eventQueue <- queuedEvent:
reportEventQueueMetrics(sender.eventQueue)
return nil
default:
reportEventQueueMetrics(sender.eventQueue)
return fmt.Errorf("could not queue event: queue is full")
}
}

func reportEventQueueMetrics(queue chan eventData) {
metric := instrumentation.NewGauge("agent.eventQueueSize", float64(len(queue)))
instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric)
metric = instrumentation.NewGauge("agent.eventQueueCapacity", float64(cap(queue)))
instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric)
func reportEventQueueMetrics(queue chan eventData, stopChannel chan bool) {
sendTimer := time.NewTicker(time.Millisecond * 500)
for {
select {
case <-sendTimer.C:
metric := instrumentation.NewGauge("agent.eventQueueSize", float64(len(queue)))
instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric)
metric = instrumentation.NewGauge("agent.eventQueueCapacity", float64(cap(queue)))
instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric)
metric = instrumentation.NewGauge("agent.eventQueueUtilization", float64((len(queue)*100)/cap(queue)))
instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric)
case <-stopChannel:
sendTimer.Stop()
return
}
}
}

// Collect events from the queue and accumulate them into batches which can be sent up to metrics ingest.
Expand Down

0 comments on commit d76acb9

Please sign in to comment.