From d76acb99735761f28698f530f71927e1999e5adb Mon Sep 17 00:00:00 2001 From: Ruben Ruiz de Gauna Date: Thu, 10 Mar 2022 11:58:26 +0100 Subject: [PATCH] report queue metrics with ticker. Instrument events sent/errors --- internal/agent/agent.go | 4 ++++ internal/agent/connect_service.go | 14 ++++++++++++-- internal/agent/event_sender.go | 30 ++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 1bd58d88a..d0547921e 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -1018,6 +1018,9 @@ func (c *context) ActiveEntitiesChannel() chan string { } func (c *context) SendEvent(event sample.Event, entityKey entity.Key) { + _, txn := instrumentation.SelfInstrumentation.StartTransaction(context2.Background(), "agent.queue_event") + defer txn.End() + if c.eventSender == nil { aclog. WithField("entity_key", entityKey.String()). @@ -1050,6 +1053,7 @@ func (c *context) SendEvent(event sample.Event, entityKey entity.Key) { } if err := c.eventSender.QueueEvent(event, entityKey); err != nil { + txn.NoticeError(err) alog.WithField( "entityKey", entityKey, ).WithError(err).Error("could not queue event") diff --git a/internal/agent/connect_service.go b/internal/agent/connect_service.go index 7e9d7d029..44e872930 100644 --- a/internal/agent/connect_service.go +++ b/internal/agent/connect_service.go @@ -3,15 +3,16 @@ package agent import ( + goContext "context" "errors" "time" - "github.com/newrelic/infrastructure-agent/pkg/log" - + "github.com/newrelic/infrastructure-agent/internal/agent/instrumentation" "github.com/newrelic/infrastructure-agent/pkg/backend/backoff" "github.com/newrelic/infrastructure-agent/pkg/backend/identityapi" "github.com/newrelic/infrastructure-agent/pkg/entity" "github.com/newrelic/infrastructure-agent/pkg/helpers/fingerprint" + "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/newrelic/infrastructure-agent/pkg/trace" ) @@ -36,6 +37,9 @@ func NewIdentityConnectService(client identityapi.IdentityConnectClient, fingerp func (ic *identityConnectService) Connect() entity.Identity { var retryBO *backoff.Backoff + _, txn := instrumentation.SelfInstrumentation.StartTransaction(goContext.Background(), "agent.connect") + defer txn.End() + for { f, err := ic.fingerprintHarvest.Harvest() if err != nil { @@ -81,6 +85,9 @@ func (ic *identityConnectService) Connect() entity.Identity { // ConnectUpdate will check for system fingerprint changes and will update it if it's the case. // It returns the same ID provided as argument if there is an error func (ic *identityConnectService) ConnectUpdate(agentIdn entity.Identity) (entityIdn entity.Identity, err error) { + _, txn := instrumentation.SelfInstrumentation.StartTransaction(goContext.Background(), "agent.connect_update") + defer txn.End() + if agentIdn.ID.IsEmpty() { logger.Warn(ErrEmptyEntityID.Error()) } @@ -127,6 +134,9 @@ func (ic *identityConnectService) ConnectUpdate(agentIdn entity.Identity) (entit // Disconnect is used to signal the backend that the agent will stop. func (ic *identityConnectService) Disconnect(agentID entity.ID, state identityapi.DisconnectReason) error { + _, txn := instrumentation.SelfInstrumentation.StartTransaction(goContext.Background(), "agent.disconnect") + defer txn.End() + logger.WithField("state", state).Info("calling disconnect") if agentID.IsEmpty() { diff --git a/internal/agent/event_sender.go b/internal/agent/event_sender.go index 26892ba15..e15ed5a85 100644 --- a/internal/agent/event_sender.go +++ b/internal/agent/event_sender.go @@ -140,7 +140,12 @@ func (sender *metricsIngestSender) Start() (err error) { sender.stopChannel = make(chan bool) // Wait for accumulateBatches and sendBatches to complete - sender.internalRoutineWaits.Add(2) + sender.internalRoutineWaits.Add(3) + + go func() { + defer sender.internalRoutineWaits.Done() + reportEventQueueMetrics(sender.eventQueue, sender.stopChannel) + }() go func() { defer sender.internalRoutineWaits.Done() @@ -195,19 +200,28 @@ func (sender *metricsIngestSender) QueueEvent(event sample.Event, key entity.Key select { case sender.eventQueue <- queuedEvent: - reportEventQueueMetrics(sender.eventQueue) return nil default: - reportEventQueueMetrics(sender.eventQueue) return fmt.Errorf("could not queue event: queue is full") } } -func reportEventQueueMetrics(queue chan eventData) { - metric := instrumentation.NewGauge("agent.eventQueueSize", float64(len(queue))) - instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric) - metric = instrumentation.NewGauge("agent.eventQueueCapacity", float64(cap(queue))) - instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric) +func reportEventQueueMetrics(queue chan eventData, stopChannel chan bool) { + sendTimer := time.NewTicker(time.Millisecond * 500) + for { + select { + case <-sendTimer.C: + metric := instrumentation.NewGauge("agent.eventQueueSize", float64(len(queue))) + instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric) + metric = instrumentation.NewGauge("agent.eventQueueCapacity", float64(cap(queue))) + instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric) + metric = instrumentation.NewGauge("agent.eventQueueUtilization", float64((len(queue)*100)/cap(queue))) + instrumentation.SelfInstrumentation.RecordMetric(goContext.Background(), metric) + case <-stopChannel: + sendTimer.Stop() + return + } + } } // Collect events from the queue and accumulate them into batches which can be sent up to metrics ingest.