Skip to content

Commit

Permalink
Allow agent to monitor endpoint (elastic#4789)
Browse files Browse the repository at this point in the history
* first attempt

* still tinkering

* first draft change

* cleanup

* clean up

* add changelog

* format

* fix component lookup

* fix state tests

* complete integration tests

* basic cleanup

* update metrics setup

* remove mage changes

* spelling

* fix test

* figured out weird test failures

* still fixing unit tests

* major test improvements

* finish up tests

* remove log line

* update name

* fix comments

* fix bool logic

* fix tests

* fix merge

* add warning and mock output to inspect
  • Loading branch information
fearful-symmetry committed Jun 16, 2024
1 parent 2b2e8d0 commit c929f79
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 146 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1716320508-add-endpoint-monitoring.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add monitoring of endpoint usage metrics

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/4789

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/4083
46 changes: 41 additions & 5 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -82,7 +83,12 @@ type MonitorManager interface {
Reload(rawConfig *config.Config) error

// MonitoringConfig injects monitoring configuration into resolved ast tree.
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string) (map[string]interface{}, error)
// args:
// - the existing config policy
// - a list of the expected running components
// - a map of component IDs to binary names
// - a map of component IDs to the PIDs of the running components.
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error)
}

// Runner provides interface to run a manager and receive running errors.
Expand Down Expand Up @@ -284,6 +290,14 @@ type Coordinator struct {
// loop in runLoopIteration() is active and listening.
// Should only be interacted with via CoordinatorActive() or runLoopIteration()
heartbeatChan chan struct{}

// if a component (mostly endpoint) has a new PID, we need to update
// the monitoring components so they have a PID to monitor
// however, if endpoint is in some kind of restart loop,
// we could DOS the config system. Instead,
// run a ticker that checks to see if we have a new PID.
componentPIDTicker *time.Ticker
componentPidRequiresUpdate *atomic.Bool
}

// The channels Coordinator reads to receive updates from the various managers.
Expand Down Expand Up @@ -374,10 +388,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
// synchronization in the subscriber API, just set the input buffer to 0.
stateBroadcaster: broadcaster.New(state, 64, 32),

logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
upgradeDetailsChan: make(chan *details.Details),
heartbeatChan: make(chan struct{}),
logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
upgradeDetailsChan: make(chan *details.Details),
heartbeatChan: make(chan struct{}),
componentPIDTicker: time.NewTicker(time.Second * 30),
componentPidRequiresUpdate: &atomic.Bool{},
}
// Setup communication channels for any non-nil components. This pattern
// lets us transparently accept nil managers / simulated events during
Expand Down Expand Up @@ -926,6 +942,8 @@ func (c *Coordinator) runner(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

defer c.componentPIDTicker.Stop()

// We run nil checks before starting the various managers so that unit tests
// only have to initialize / mock the specific components they're testing.
// If a manager is nil, we prebuffer its return channel with nil also so
Expand Down Expand Up @@ -1038,6 +1056,18 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {

case c.heartbeatChan <- struct{}{}:

case <-c.componentPIDTicker.C:
// if we hit the ticker and we've got a new PID,
// reload the component model
if c.componentPidRequiresUpdate.Swap(false) {
err := c.refreshComponentModel(ctx)
if err != nil {
err = fmt.Errorf("error refreshing component model for PID update: %w", err)
c.setConfigManagerError(err)
c.logger.Errorf("%s", err)
}
}

case componentState := <-c.managerChans.runtimeManagerUpdate:
// New component change reported by the runtime manager via
// Coordinator.watchRuntimeComponents(), merge it with the
Expand Down Expand Up @@ -1277,11 +1307,17 @@ func (c *Coordinator) generateComponentModel() (err error) {
configInjector = c.monitorMgr.MonitoringConfig
}

var existingCompState = make(map[string]uint64, len(c.state.Components))
for _, comp := range c.state.Components {
existingCompState[comp.Component.ID] = comp.State.Pid
}

comps, err := c.specs.ToComponents(
cfg,
configInjector,
c.state.LogLevel,
c.agentInfo,
existingCompState,
)
if err != nil {
return fmt.Errorf("failed to render components: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,24 @@ func (c *Coordinator) refreshState() {
// Coordinator state and sets stateNeedsRefresh.
// Must be called on the main Coordinator goroutine.
func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) {

// check for any component updates to the known PID, so we can update the component monitoring
found := false
for i, other := range c.state.Components {
if other.Component.ID == state.Component.ID {
if other.State.Pid != state.State.Pid {
c.componentPidRequiresUpdate.Store(true)
}
c.state.Components[i] = state
found = true
break
}
}
if !found {
c.state.Components = append(c.state.Components, state)
if state.State.Pid != 0 {
c.componentPidRequiresUpdate.Store(true)
}
}

// In the case that the component has stopped, it is now removed.
Expand All @@ -160,6 +168,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
}

c.stateNeedsRefresh = true

}

// generateReportableState aggregates the internal state of the Coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
resultChan <- ctx.Err()
return
case state := <-subChan:
t.Logf("%+v", state)
if len(state.Components) == 3 {
compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0")
compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1")
Expand All @@ -599,6 +598,11 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
(unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") {
resultChan <- nil
return
} else if unit0.State == client.UnitStateFailed && unit1.State == client.UnitStateFailed {
// if you get a really strange failed state, check to make sure the mock binaries in
// elastic-agent/pkg/component/fake/ are updated
t.Fail()
t.Logf("got units with failed state: %#v / %#v", unit1, unit0)
}
}
}
Expand Down Expand Up @@ -1007,7 +1011,7 @@ func (*testMonitoringManager) Prepare(_ string) error
func (*testMonitoringManager) Cleanup(string) error { return nil }
func (*testMonitoringManager) Enabled() bool { return false }
func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil }
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string) (map[string]interface{}, error) {
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ map[string]uint64) (map[string]interface{}, error) {
return nil, nil
}

Expand Down
39 changes: 25 additions & 14 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestVarsManagerError(t *testing.T) {
managerChans: managerChans{
varsManagerError: varsErrorChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}
// Send an error via the vars manager channel, and let Coordinator update
const errorStr = "force error"
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestCoordinatorReportsUnhealthyComponents(t *testing.T) {
managerChans: managerChans{
runtimeManagerUpdate: runtimeChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}

unhealthyComponent := runtime.ComponentComponentState{
Expand Down Expand Up @@ -186,6 +188,7 @@ func TestCoordinatorComponentStatesAreSeparate(t *testing.T) {
managerChans: managerChans{
runtimeManagerUpdate: runtimeChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}

comp1 := runtime.ComponentComponentState{
Expand Down Expand Up @@ -256,6 +259,7 @@ func TestCoordinatorReportsUnhealthyUnits(t *testing.T) {
managerChans: managerChans{
runtimeManagerUpdate: runtimeChan,
},
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a healthy component with healthy input and output units
Expand Down Expand Up @@ -375,8 +379,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
runtimeMgr: &fakeRuntimeManager{},

// Set valid but empty initial values for ast and vars
vars: emptyVars(t),
ast: emptyAST(t),
vars: emptyVars(t),
ast: emptyAST(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Send an invalid config update and confirm that Coordinator reports
Expand All @@ -389,7 +394,6 @@ agent.download.sourceURI:
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)

assert.True(t, cfgChange.failed, "Policy with invalid field should have reported failed config change")
require.ErrorContainsf(t,
cfgChange.err,
Expand Down Expand Up @@ -420,6 +424,7 @@ agent.download.sourceURI:
// (This check is based on a previous bug in which a vars update could
// discard active policy errors.)
varsChan <- emptyVars(t)
t.Logf("after emptyVars statement")
coord.runLoopIteration(ctx)

assert.Error(t, coord.configErr, "Vars update shouldn't affect configErr")
Expand Down Expand Up @@ -489,8 +494,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) {
runtimeMgr: &fakeRuntimeManager{},

// Set valid but empty initial values for ast and vars
vars: emptyVars(t),
ast: emptyAST(t),
vars: emptyVars(t),
ast: emptyAST(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// This configuration produces a valid AST but its EQL condition is
Expand Down Expand Up @@ -583,8 +589,9 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}
coord.RegisterMonitoringServer(monitoringReloader)

Expand Down Expand Up @@ -711,8 +718,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a policy with one input and one output
Expand Down Expand Up @@ -798,8 +806,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
// manager, so it receives the update result.
runtimeManagerError: updateErrChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Send an empty policy which should forward an empty component model to
Expand Down Expand Up @@ -860,8 +869,9 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) {
configManagerUpdate: configChan,
varsManagerUpdate: varsChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
runtimeMgr: runtimeManager,
vars: emptyVars(t),
componentPIDTicker: time.NewTicker(time.Second * 30),
}

// Create a policy with one input and one output
Expand Down Expand Up @@ -936,7 +946,8 @@ func TestCoordinatorReportsOverrideState(t *testing.T) {
stateBroadcaster: &broadcaster.Broadcaster[State]{
InputChan: stateChan,
},
overrideStateChan: overrideStateChan,
overrideStateChan: overrideStateChan,
componentPIDTicker: time.NewTicker(time.Second * 30),
}
// Send an error via the vars manager channel, and let Coordinator update
overrideStateChan <- &coordinatorOverrideState{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ log_level: "warning"
components:
- id: "comp-1"
state:
pid: 0
state: 3
message: "degraded message"
features_idx: 0
Expand Down Expand Up @@ -570,6 +571,7 @@ log_level: "warning"
components:
- id: "comp-1"
state:
pid: 0
state: 3
message: "degraded message"
features_idx: 0
Expand Down
Loading

0 comments on commit c929f79

Please sign in to comment.