Skip to content

Commit

Permalink
feat: log truncated or dropped events
Browse files Browse the repository at this point in the history
  • Loading branch information
varas committed Jul 9, 2021
1 parent 73f04e6 commit cb27aca
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions internal/agent/agent.go
Expand Up @@ -96,7 +96,10 @@ type inventory struct {
needsCleanup bool
}

var alog = log.WithComponent("Agent")
var (
alog = log.WithComponent("Agent")
aclog = log.WithComponent("AgentContext")
)

// AgentContext defines the interfaces between plugins and the agent
type AgentContext interface {
Expand Down Expand Up @@ -1002,20 +1005,38 @@ func (c *context) ActiveEntitiesChannel() chan string {
}

func (c *context) SendEvent(event sample.Event, entityKey entity.Key) {
if c.eventSender != nil {
// limits any string field larger than 4095 chars
if c.cfg.TruncTextValues {
event = metric.TruncateLength(event, metric.NRDBLimit)
}
if c.eventSender == nil {
aclog.
WithField("entity_key", entityKey.String()).
Warn("cannot send, event sender not set")
return
}

includeSample := c.shouldIncludeEvent(event)
if includeSample {
if err := c.eventSender.QueueEvent(event, entityKey); err != nil {
alog.WithField(
"entityKey", entityKey,
).WithError(err).Error("could not queue event")
}
}
// truncates string fields larger than 4095 chars
if c.cfg.TruncTextValues {
orig := event
event = metric.TruncateLength(event, metric.NRDBLimit)
aclog.
WithField("entity_key", entityKey.String()).
WithField("length", metric.NRDBLimit).
WithField("original", fmt.Sprintf("+%v", orig)).
WithField("truncated", fmt.Sprintf("+%v", event)).
Warn("event truncated to NRDB limit")
}

includeSample := c.shouldIncludeEvent(event)
if !includeSample {
aclog.
WithField("entity_key", entityKey.String()).
WithField("event", fmt.Sprintf("+%v", event)).
Debug("event excluded by metric matcher")
return
}

if err := c.eventSender.QueueEvent(event, entityKey); err != nil {
alog.WithField(
"entityKey", entityKey,
).WithError(err).Error("could not queue event")
}
}

Expand Down Expand Up @@ -1069,15 +1090,14 @@ func (c *context) AddReconnecting(p Plugin) {

// Reconnect invokes again all the plugins that have been registered with the AddReconnecting function
func (c *context) Reconnect() {
aclog := log.WithComponent("AgentContext")
aclog.Debug("Invoking Run() on all the plugins registered for reconnection.")
c.reconnecting.Range(triggerAddReconnecting(aclog))
}

// triggerAddReconnecting is used with sync.Map.Range to iterate through all plugins and reconnect them
func triggerAddReconnecting(aclog log.Entry) func(pluginID interface{}, plugin interface{}) bool {
func triggerAddReconnecting(l log.Entry) func(pluginID interface{}, plugin interface{}) bool {
return func(pluginID, plugin interface{}) bool {
aclog.WithField("plugin", pluginID).Debug("Reconnecting plugin.")
l.WithField("plugin", pluginID).Debug("Reconnecting plugin.")
func(p Plugin) {
go p.Run()
}(plugin.(Plugin))
Expand Down

0 comments on commit cb27aca

Please sign in to comment.