Skip to content

Commit

Permalink
Allocate inventory queue (#70)
Browse files Browse the repository at this point in the history
* fix: inventory error count increasing on ingest error not 429
* refactor: extract sendInventory
* allocate configurable inventory queue for 10 payloads
  • Loading branch information
varas committed Aug 14, 2020
1 parent 68a8dd3 commit a7b17c6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 36 deletions.
76 changes: 40 additions & 36 deletions internal/agent/agent.go
Expand Up @@ -66,6 +66,7 @@ type registerableSender interface {
}

type Agent struct {
inv inventoryState
plugins []Plugin // Slice of registered plugins
oldPlugins []ids.PluginID // Deprecated plugins whose cached data must be removed, if existing
agentDir string // Base data directory for the agent
Expand All @@ -87,6 +88,11 @@ type Agent struct {
notificationHandler *ctl.NotificationHandlerWithCancellation // Handle ipc messaging.
}

type inventoryState struct {
readyToReap bool
sendErrorCount uint32
}

// inventory holds the reaper and sender for the inventories of a given entity (local or remote), as well as their status
type inventory struct {
reaper *patchReaper
Expand Down Expand Up @@ -442,7 +448,7 @@ func New(
}

// Create input channel for plugins to feed data back to the agent
a.Context.ch = make(chan PluginOutput)
a.Context.ch = make(chan PluginOutput, a.Context.cfg.InventoryQueueLen)
a.Context.activeEntities = make(chan string, activeEntitiesBufferLength)

if cfg.RegisterEnabled {
Expand Down Expand Up @@ -767,10 +773,6 @@ func (a *Agent) Run() (err error) {
}
}

// State variables
var readyToReap bool // Do we need to execute a reap phase?
var sendErrorCount uint32 = 0 // Send error counter

// Timers
reapTimer := time.NewTicker(cfg.FirstReapInterval)
sendTimer := time.NewTimer(cfg.SendInterval) // Send any deltas every X seconds
Expand Down Expand Up @@ -876,10 +878,10 @@ func (a *Agent) Run() (err error) {
case <-reapTimer.C:
{
for _, inventory := range a.inventories {
if !readyToReap {
if !a.inv.readyToReap {
if len(distinctPlugins) <= len(idsReporting) {
alog.Debug("Signalling initial reap.")
readyToReap = true
a.inv.readyToReap = true
inventory.needsCleanup = true
} else {
pluginIds := make([]ids.PluginID, 0)
Expand All @@ -891,7 +893,7 @@ func (a *Agent) Run() (err error) {
alog.WithField("pluginIds", pluginIds).Debug("Still waiting on plugins.")
}
}
if readyToReap && inventory.needsReaping {
if a.inv.readyToReap && inventory.needsReaping {
reapTimer.Stop()
reapTimer = time.NewTicker(cfg.ReapInterval)
inventory.reaper.Reap()
Expand All @@ -905,40 +907,15 @@ func (a *Agent) Run() (err error) {
}
case <-initialReapTimeout.C:
// If we've waited too long and still not received data from all plugins, we can just send what we have.
if !readyToReap {
if !a.inv.readyToReap {
alog.Debug("Maximum initial reap delay exceeded - marking inventory as ready to report.")
readyToReap = true
a.inv.readyToReap = true
for _, inventory := range a.inventories {
inventory.needsCleanup = true
}
}
case <-sendTimer.C:
{
backoffMax := config.MAX_BACKOFF
for _, inventory := range a.inventories {
err := inventory.sender.Process()
if err != nil {
if ingestError, ok := err.(*inventoryapi.IngestError); ok {
if ingestError.StatusCode == http.StatusTooManyRequests {
alog.Warn("server is rate limiting inventory for this Infrastructure Agent")
backoffMax = config.RATE_LIMITED_BACKOFF
sendErrorCount = helpers.MaxBackoffErrorCount
}
} else {
sendErrorCount++
}
alog.WithError(err).WithField("errorCount", sendErrorCount).
Debug("Inventory sender can't process after retrying.")
break // Assuming break will try to send later the data from the missing inventory senders
} else {
sendErrorCount = 0
}
}
sendTimerVal := helpers.ExpBackoff(cfg.SendInterval,
time.Duration(backoffMax)*time.Second,
sendErrorCount)
sendTimer.Reset(sendTimerVal)
}
a.sendInventory(sendTimer)
case <-debugTimer:
{
debugInfo, err := a.debugProvide()
Expand All @@ -957,6 +934,33 @@ func (a *Agent) Run() (err error) {
}
}

func (a *Agent) sendInventory(sendTimer *time.Timer) {
backoffMax := config.MAX_BACKOFF
for _, i := range a.inventories {
err := i.sender.Process()
if err != nil {
if ingestError, ok := err.(*inventoryapi.IngestError); ok &&
ingestError.StatusCode == http.StatusTooManyRequests {
alog.Warn("server is rate limiting inventory submission")
backoffMax = config.RATE_LIMITED_BACKOFF
a.inv.sendErrorCount = helpers.MaxBackoffErrorCount
} else {
a.inv.sendErrorCount++
}
alog.WithError(err).WithField("errorCount", a.inv.sendErrorCount).
Debug("Inventory sender can't process after retrying.")
// Assuming break will try to send later the data from the missing inventory senders
break
} else {
a.inv.sendErrorCount = 0
}
}
sendTimerVal := helpers.ExpBackoff(a.Context.cfg.SendInterval,
time.Duration(backoffMax)*time.Second,
a.inv.sendErrorCount)
sendTimer.Reset(sendTimerVal)
}

func (a *Agent) removeOutdatedEntities(reportedEntities map[string]bool) {
alog.Debug("Triggered periodic removal of outdated entities.")
// The entities to remove are those entities that haven't reported activity in the last period and
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Expand Up @@ -592,6 +592,11 @@ type Config struct {
// Public: No
BatchQueueDepth int `yaml:"batch_queue_depth" envconfig:"batch_queue_depth" public:"false"` // See event_sender.go

// InventoryQueue sets the inventory processing queue size.
// Default: 10
// Public: Yes
InventoryQueueLen int

// EnableWinUpdatePlugin enables the windows updates plugin which retrieves the lists of hotfix that are installed
// on the host.
// Default: False
Expand Down Expand Up @@ -1245,6 +1250,7 @@ func NewConfig() *Config {
SmartVerboseModeEntryLimit: DefaultSmartVerboseModeEntryLimit,
DefaultIntegrationsTempDir: defaultIntegrationsTempDir,
IncludeMetricsMatchers: defaultMetricsMatcherConfig,
InventoryQueueLen: DefaultInventoryQueue,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/defaults.go
Expand Up @@ -43,6 +43,7 @@ var (
DefaultStripCommandLine = true
DefaultSmartVerboseModeEntryLimit = 1000
DefaultIntegrationsDir = "newrelic-integrations"
DefaultInventoryQueue = 10

// private
defaultAppDataDir = ""
Expand Down

0 comments on commit a7b17c6

Please sign in to comment.