Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

const routineKey config.ContextKey = "routine"

const restartBackendChanSize = 5

// Agent is the interface that all agents must implement
type Agent interface {
Start(ctx context.Context, cancelFunc context.CancelFunc) error
Expand All @@ -32,14 +34,15 @@ type orbAgent struct {
logger *slog.Logger
config config.Config
backends map[string]backend.Backend
backendState map[string]*backend.State
backendsCommon config.BackendCommons
ctx context.Context
cancelFunction context.CancelFunc

policyManager policymgr.PolicyManager
configManager configmgr.Manager
secretsManager secretsmgr.Manager
policyManager policymgr.PolicyManager
configManager configmgr.Manager
secretsManager secretsmgr.Manager
backendStateManager backend.StateManager
restartBackendChan chan string
}

var _ Agent = (*orbAgent)(nil)
Expand All @@ -57,17 +60,22 @@ func New(logger *slog.Logger, c config.Config) (Agent, error) {
return nil, err
}

restartBackendChan := make(chan string, restartBackendChanSize)

backendStateManager := backend.NewStateManager(c.OrbAgent.ConfigManager.Active, logger, restartBackendChan)
// Pass a background context to the config manager at construction time. The
// manager keeps its own copy and later derives child contexts from the
// runtime context supplied in Agent.Start.
cm := configmgr.New(logger, pm, c.OrbAgent.ConfigManager.Active)
cm := configmgr.New(logger, pm, c.OrbAgent.ConfigManager.Active, backendStateManager)

return &orbAgent{
logger: logger,
config: c,
policyManager: pm,
configManager: cm,
secretsManager: sm,
logger: logger,
config: c,
policyManager: pm,
configManager: cm,
secretsManager: sm,
backendStateManager: backendStateManager,
restartBackendChan: restartBackendChan,
}, nil
}

Expand All @@ -78,7 +86,6 @@ func (a *orbAgent) startBackends(agentCtx context.Context, cfgBackends map[strin
}
a.ctx = agentCtx
a.backends = make(map[string]backend.Backend, len(cfgBackends))
a.backendState = make(map[string]*backend.State)

var commonConfig config.BackendCommons
if v, prs := cfgBackends["common"]; prs {
Expand Down Expand Up @@ -119,31 +126,35 @@ func (a *orbAgent) startBackends(agentCtx context.Context, cfgBackends map[strin
backendCtx := context.WithValue(agentCtx, routineKey, name)
backendCtx = a.configManager.GetContext(backendCtx)
a.backends[name] = be
initialState := be.GetInitialState()
a.backendState[name] = &backend.State{
Status: initialState,
LastRestartTS: time.Now(),
}
// Create a cancellable context for the backend and ensure we pass both
// the context and its cancel function to Start, matching the Backend
// interface.
runCtx, cancel := context.WithCancel(backendCtx)
if err := be.Start(runCtx, cancel); err != nil {
var errMessage string
if initialState == backend.BackendError {
if be.GetInitialState() == backend.BackendError {
errMessage = err.Error()
}
a.backendState[name] = &backend.State{
Status: initialState,
LastError: errMessage,
LastRestartTS: time.Now(),
}
a.backendStateManager.RegisterError(name, errMessage)
return err
}
a.backendStateManager.StartBackendMonitor(name, be)

go a.waitForRestartRequests()
}
return nil
}

func (a *orbAgent) waitForRestartRequests() {
for name := range a.restartBackendChan {
a.logger.Info("restarting backend", slog.String("backend", name))
err := a.RestartBackend(a.ctx, name, "restart requested by fleet")
if err != nil {
a.logger.Error("failed to restart backend", slog.String("backend", name), slog.Any("error", err))
}
}
}

func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) error {
startTime := time.Now()
defer func(t time.Time) {
Expand Down Expand Up @@ -198,9 +209,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin

be := a.backends[name]
a.logger.Info("restarting backend", slog.String("backend", name), slog.String("reason", reason))
a.backendState[name].RestartCount++
a.backendState[name].LastRestartTS = time.Now()
a.backendState[name].LastRestartReason = reason
a.backendStateManager.RegisterRestart(name, reason)
a.logger.Info("removing policies", slog.String("backend", name))
if err := a.policyManager.RemoveBackendPolicies(be, true); err != nil {
a.logger.Error("failed to remove policies", slog.String("backend", name), slog.Any("error", err))
Expand All @@ -219,8 +228,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin
a.logger.Info("resetting backend", slog.String("backend", name))

if err := be.FullReset(ctx); err != nil {
a.backendState[name].LastError = fmt.Sprintf("failed to reset backend: %v", err)
a.logger.Error("failed to reset backend", slog.String("backend", name), slog.Any("error", err))
a.backendStateManager.RegisterError(name, fmt.Sprintf("failed to reset backend: %v", err))
}

return nil
Expand Down
140 changes: 140 additions & 0 deletions agent/backend/backend_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package backend

import (
"fmt"
"log/slog"
"sync"
"time"
)

// MinRestartTime is the minimum time to wait between restarts
const MinRestartTime = 5 * time.Minute

// BackendMonitorInterval is the interval at which to monitor backends
const BackendMonitorInterval = 10 * time.Second

// StateRetriever provides an interface for accessing backend state information
type StateRetriever interface {
Get() map[string]*State
}

// StateManager provides an interface for managing backend state information
type StateManager interface {
StateRetriever
StartBackendMonitor(name string, be Backend)
RegisterError(name string, errMessage string)
RegisterRestart(name string, reason string)
}

// StateManager manages the state and monitoring of backends
type stateManager struct {
backendState map[string]*State
mu sync.RWMutex
ticker *time.Ticker
logger *slog.Logger
restartBackendChan chan string
}

// NewStateManager creates a new StateManager with the given logger and restart channel
func NewStateManager(activeConfigMgr string, logger *slog.Logger, restartBackendChan chan string) StateManager {
if configMgrSupportsStateMonitoring(activeConfigMgr) {
return &stateManager{
backendState: make(map[string]*State),
ticker: time.NewTicker(BackendMonitorInterval),
logger: logger,
restartBackendChan: restartBackendChan,
}
}
return nullStateManager{}
}

func configMgrSupportsStateMonitoring(activeConfigMgr string) bool {
return activeConfigMgr == "fleet"
}

type nullStateManager struct{}

var _ StateManager = nullStateManager{}

func (n nullStateManager) Get() map[string]*State {
return make(map[string]*State)
}

func (n nullStateManager) StartBackendMonitor(_ string, _ Backend) {}

func (n nullStateManager) RegisterError(_ string, _ string) {}

func (n nullStateManager) RegisterRestart(_ string, _ string) {}

// StartBackendMonitor starts monitoring a backend and manages its state
func (manager *stateManager) StartBackendMonitor(name string, be Backend) {
manager.mu.Lock()
manager.backendState[name] = &State{
Status: be.GetInitialState(),
LastRestartTS: time.Now(),
}
manager.mu.Unlock()

go func() {
for range manager.ticker.C {
manager.mu.Lock()
backendStatus, errMsg, err := be.GetRunningStatus()
manager.backendState[name].Status = backendStatus
if backendStatus != Running {
if err != nil {
manager.backendState[name].LastError = fmt.Sprintf("failed to retrieve backend status: %v", err)
} else if errMsg != "" {
manager.backendState[name].LastError = errMsg
}

// status is not running so we have a current error
if time.Since(be.GetStartTime()) >= MinRestartTime {
manager.restartBackendChan <- name
if err != nil {
manager.logger.Error("failed to restart backend", "error", err, "backend", name)
}
} else {
remainingSecondsUntilRestart := MinRestartTime - time.Since(be.GetStartTime())
manager.logger.Info("waiting to attempt backend restart due to failed status", "remaining_secs", remainingSecondsUntilRestart)
}
}
manager.mu.Unlock()
}
}()
}

// RegisterError registers an error for a backend and updates its state
func (manager *stateManager) RegisterError(name string, errMessage string) {
manager.logger.Error(errMessage, slog.String("backend", name))
manager.mu.Lock()
defer manager.mu.Unlock()
manager.backendState[name] = &State{
Status: BackendError,
LastError: errMessage,
LastRestartTS: time.Now(),
}
}

// RegisterRestart registers a restart event for a backend
func (manager *stateManager) RegisterRestart(name string, reason string) {
manager.mu.Lock()
defer manager.mu.Unlock()
manager.backendState[name].RestartCount++
manager.backendState[name].LastRestartTS = time.Now()
manager.backendState[name].LastRestartReason = reason
}

// Get returns the current state of all backends
func (manager *stateManager) Get() map[string]*State {
manager.mu.RLock()
defer manager.mu.RUnlock()

// Return a copy of the map to prevent external modification
result := make(map[string]*State, len(manager.backendState))
for k, v := range manager.backendState {
// Copy the state to prevent external modification
stateCopy := *v
result[k] = &stateCopy
}
return result
}
Loading
Loading