-
Notifications
You must be signed in to change notification settings - Fork 9
RSDK-11248 RSDK-11266 RSDK-11900 RSDK-11901 Add restart checking logic #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3e2a1e4
733f22e
a26c1ad
db833fc
68dae75
15f9525
94af599
6d38b08
31e82d4
ce061b8
3297b56
2d4252b
5c184be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/url" | ||
"os" | ||
"regexp" | ||
|
@@ -21,13 +22,18 @@ import ( | |
"github.com/viamrobotics/agent/subsystems/viamserver" | ||
"github.com/viamrobotics/agent/utils" | ||
pb "go.viam.com/api/app/agent/v1" | ||
apppb "go.viam.com/api/app/v1" | ||
"go.viam.com/rdk/logging" | ||
goutils "go.viam.com/utils" | ||
"go.viam.com/utils/rpc" | ||
) | ||
|
||
const ( | ||
minimalCheckInterval = time.Second * 5 | ||
// The minimal (and default) interval for checking for config updates via DeviceAgentConfig. | ||
minimalDeviceAgentConfigCheckInterval = time.Second * 5 | ||
// The minimal (and default) interval for checking whether agent needs to be restarted. | ||
minimalNeedsRestartCheckInterval = time.Second * 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now have two background check goroutines: one checks for a new config every 5s (existing), and another checks for a restart every 1s (new). Each check can receive a new, different interval from the app call, so they need to be running at different cadences in different goroutines. You'll also notice that I renamed some |
||
|
||
defaultNetworkTimeout = time.Second * 15 | ||
// stopAllTimeout must be lower than systemd subsystems/viamagent/viam-agent.service timeout of 4mins | ||
// and higher than subsystems/viamserver/viamserver.go timeout of 2mins. | ||
|
@@ -42,7 +48,6 @@ type Manager struct { | |
|
||
connMu sync.RWMutex | ||
conn rpc.ClientConn | ||
client pb.AgentDeviceServiceClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this was a pointless field to store on the manager; creating a gRPC client on top of |
||
cloudConfig *logging.CloudConfig | ||
|
||
logger logging.Logger | ||
|
@@ -209,7 +214,7 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) { | |
m.logger.Warn(err) | ||
} | ||
if m.viamAgentNeedsRestart { | ||
m.Exit() | ||
m.Exit(fmt.Sprintf("A new version of %s has been installed", SubsystemName)) | ||
return | ||
} | ||
} else { | ||
|
@@ -221,17 +226,19 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) { | |
needRestartConfigChange := m.viamServer.Update(ctx, m.cfg) | ||
|
||
if needRestart || needRestartConfigChange || m.viamServerNeedsRestart || m.viamAgentNeedsRestart { | ||
if m.viamServer.(viamserver.RestartCheck).SafeToRestart(ctx) { | ||
if m.viamServer.Property(ctx, viamserver.RestartPropertyRestartAllowed) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the |
||
m.logger.Infof("%s has allowed a restart; will restart", viamserver.SubsysName) | ||
if err := m.viamServer.Stop(ctx); err != nil { | ||
m.logger.Warn(err) | ||
} else { | ||
m.viamServerNeedsRestart = false | ||
} | ||
if m.viamAgentNeedsRestart { | ||
m.Exit() | ||
m.Exit(fmt.Sprintf("A new version of %s has been installed", SubsystemName)) | ||
return | ||
} | ||
} else { | ||
m.logger.Warnf("%s has NOT allowed a restart; will NOT restart", viamserver.SubsysName) | ||
m.viamServerNeedsRestart = true | ||
} | ||
} | ||
|
@@ -280,26 +287,26 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) { | |
// CheckUpdates retrieves an updated config from the cloud, and then passes it to SubsystemUpdates(). | ||
func (m *Manager) CheckUpdates(ctx context.Context) time.Duration { | ||
defer utils.Recover(m.logger, nil) | ||
m.logger.Debug("Checking cloud for update") | ||
interval, err := m.GetConfig(ctx) | ||
m.logger.Debug("Checking cloud for device agent config updates") | ||
deviceAgentConfigCheckInterval, err := m.GetConfig(ctx) | ||
|
||
if interval < minimalCheckInterval { | ||
interval = minimalCheckInterval | ||
if deviceAgentConfigCheckInterval < minimalDeviceAgentConfigCheckInterval { | ||
deviceAgentConfigCheckInterval = minimalDeviceAgentConfigCheckInterval | ||
} | ||
|
||
// randomly fuzz the interval by +/- 5% | ||
interval = utils.FuzzTime(interval, 0.05) | ||
deviceAgentConfigCheckInterval = utils.FuzzTime(deviceAgentConfigCheckInterval, 0.05) | ||
|
||
// we already log in all error cases inside GetConfig, so | ||
// no need to log again. | ||
if err != nil { | ||
return interval | ||
return deviceAgentConfigCheckInterval | ||
} | ||
|
||
// update and (re)start subsystems | ||
m.SubsystemUpdates(ctx) | ||
|
||
return interval | ||
return deviceAgentConfigCheckInterval | ||
} | ||
|
||
func (m *Manager) setDebug(debug bool) { | ||
|
@@ -380,13 +387,51 @@ func (m *Manager) SubsystemHealthChecks(ctx context.Context) { | |
} | ||
} | ||
|
||
// CheckIfNeedsRestart returns the check restart interval and whether the agent (and | ||
// therefore all its subsystems) has been forcibly restarted by app. | ||
func (m *Manager) CheckIfNeedsRestart(ctx context.Context) (time.Duration, bool) { | ||
m.logger.Debug("Checking cloud for forced restarts") | ||
if m.cloudConfig == nil { | ||
m.logger.Warn("can't CheckIfNeedsRestart until successful config load") | ||
return minimalNeedsRestartCheckInterval, false | ||
} | ||
|
||
// Only continue this check if viam-server does not handle restart checking itself | ||
// (return early if viamserver _does_ handle restart checking). | ||
if !m.viamServer.Property(ctx, viamserver.RestartPropertyDoesNotHandleNeedsRestart) { | ||
return minimalNeedsRestartCheckInterval, false | ||
} | ||
|
||
m.logger.Debug("Checking cloud for forced restarts") | ||
timeoutCtx, cancelFunc := context.WithTimeout(ctx, defaultNetworkTimeout) | ||
defer cancelFunc() | ||
|
||
if err := m.dial(timeoutCtx); err != nil { | ||
m.logger.Warn(errw.Wrapf(err, "dialing to check if restart needed")) | ||
return minimalNeedsRestartCheckInterval, false | ||
} | ||
|
||
robotServiceClient := apppb.NewRobotServiceClient(m.conn) | ||
req := &apppb.NeedsRestartRequest{Id: m.cloudConfig.ID} | ||
res, err := robotServiceClient.NeedsRestart(timeoutCtx, req) | ||
if err != nil { | ||
m.logger.Warn(errw.Wrapf(err, "checking if restart needed")) | ||
return minimalNeedsRestartCheckInterval, false | ||
} | ||
|
||
return res.GetRestartCheckInterval().AsDuration(), res.GetMustRestart() | ||
} | ||
|
||
// CloseAll stops all subsystems and closes the cloud connection. | ||
func (m *Manager) CloseAll() { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
// Use a slow goroutine watcher to log and continue if shutdown is taking too long. | ||
slowWatcher, slowWatcherCancel := goutils.SlowGoroutineWatcher( | ||
stopAllTimeout, "Agent is taking a while to shut down,", m.logger) | ||
stopAllTimeout, | ||
fmt.Sprintf("Viam agent subsystems and/or background workers failed to shut down within %v", stopAllTimeout), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [drive-by] This log was getting output after agent shutdown timed out, so the message was slightly inaccurate. |
||
m.logger, | ||
) | ||
|
||
slowTicker := time.NewTicker(10 * time.Second) | ||
defer slowTicker.Stop() | ||
|
@@ -430,7 +475,6 @@ func (m *Manager) CloseAll() { | |
} | ||
} | ||
|
||
m.client = nil | ||
m.conn = nil | ||
}) | ||
|
||
|
@@ -479,7 +523,8 @@ func (m *Manager) CloseAll() { | |
} | ||
} | ||
|
||
// StartBackgroundChecks kicks off a go routine that loops on a timer to check for updates and health checks. | ||
// StartBackgroundChecks kicks off go routines that loop on a timerr to check for updates, | ||
// health checks, and restarts. | ||
func (m *Manager) StartBackgroundChecks(ctx context.Context) { | ||
if ctx.Err() != nil { | ||
return | ||
|
@@ -495,18 +540,18 @@ func (m *Manager) StartBackgroundChecks(ctx context.Context) { | |
}) | ||
defer m.activeBackgroundWorkers.Done() | ||
|
||
checkInterval := minimalCheckInterval | ||
deviceAgentConfigCheckInterval := minimalDeviceAgentConfigCheckInterval | ||
m.cfgMu.RLock() | ||
wait := m.cfg.AdvancedSettings.WaitForUpdateCheck.Get() | ||
m.cfgMu.RUnlock() | ||
if wait { | ||
checkInterval = m.CheckUpdates(ctx) | ||
deviceAgentConfigCheckInterval = m.CheckUpdates(ctx) | ||
} else { | ||
// premptively start things before we go into the regular update/check/restart | ||
m.SubsystemHealthChecks(ctx) | ||
} | ||
|
||
timer := time.NewTimer(checkInterval) | ||
timer := time.NewTimer(deviceAgentConfigCheckInterval) | ||
defer timer.Stop() | ||
for { | ||
if ctx.Err() != nil { | ||
|
@@ -516,9 +561,39 @@ func (m *Manager) StartBackgroundChecks(ctx context.Context) { | |
case <-ctx.Done(): | ||
return | ||
case <-timer.C: | ||
checkInterval = m.CheckUpdates(ctx) | ||
deviceAgentConfigCheckInterval = m.CheckUpdates(ctx) | ||
m.SubsystemHealthChecks(ctx) | ||
timer.Reset(checkInterval) | ||
timer.Reset(deviceAgentConfigCheckInterval) | ||
} | ||
} | ||
}() | ||
|
||
m.activeBackgroundWorkers.Add(1) | ||
go func() { | ||
defer m.activeBackgroundWorkers.Done() | ||
|
||
timer := time.NewTimer(minimalNeedsRestartCheckInterval) | ||
defer timer.Stop() | ||
for { | ||
if ctx.Err() != nil { | ||
return | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-timer.C: | ||
needsRestartCheckInterval, needsRestart := m.CheckIfNeedsRestart(ctx) | ||
if needsRestartCheckInterval < minimalNeedsRestartCheckInterval { | ||
needsRestartCheckInterval = minimalNeedsRestartCheckInterval | ||
} | ||
if needsRestart { | ||
// Do not mark m.agentNeedsRestart and instead Exit immediately; we do not want | ||
// to wait for viam-server to allow a restart as it may be in a bad state. | ||
m.Exit(fmt.Sprintf("A restart of %s was requested from app", SubsystemName)) | ||
} | ||
// As with the device agent config check interval, randomly fuzz the interval by | ||
// +/- 5%. | ||
timer.Reset(utils.FuzzTime(needsRestartCheckInterval, 0.05)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were doing this for the config check interval, too... I'm not sure why; anyone know? |
||
} | ||
} | ||
}() | ||
|
@@ -531,11 +606,11 @@ func (m *Manager) dial(ctx context.Context) error { | |
return ctx.Err() | ||
} | ||
if m.cloudConfig == nil { | ||
return errors.New("cannot dial() until successful LoadConfig") | ||
return errors.New("cannot dial() until successful config load") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [drive-by] |
||
} | ||
m.connMu.Lock() | ||
defer m.connMu.Unlock() | ||
if m.client != nil { | ||
if m.conn != nil { | ||
return nil | ||
} | ||
|
||
|
@@ -564,7 +639,6 @@ func (m *Manager) dial(ctx context.Context) error { | |
return err | ||
} | ||
m.conn = conn | ||
m.client = pb.NewAgentDeviceServiceClient(m.conn) | ||
|
||
if m.netAppender != nil { | ||
m.netAppender.SetConn(conn, true) | ||
|
@@ -577,27 +651,28 @@ func (m *Manager) dial(ctx context.Context) error { | |
// GetConfig retrieves the configuration from the cloud. | ||
func (m *Manager) GetConfig(ctx context.Context) (time.Duration, error) { | ||
if m.cloudConfig == nil { | ||
err := errors.New("can't GetConfig until successful LoadConfig") | ||
err := errors.New("can't GetConfig until successful config load") | ||
m.logger.Warn(err) | ||
return minimalCheckInterval, err | ||
return minimalDeviceAgentConfigCheckInterval, err | ||
} | ||
timeoutCtx, cancelFunc := context.WithTimeout(ctx, defaultNetworkTimeout) | ||
defer cancelFunc() | ||
|
||
if err := m.dial(timeoutCtx); err != nil { | ||
m.logger.Warn(errw.Wrapf(err, "fetching %s config", SubsystemName)) | ||
return minimalCheckInterval, err | ||
m.logger.Warn(errw.Wrapf(err, "dialing to fetch %s config", SubsystemName)) | ||
return minimalDeviceAgentConfigCheckInterval, err | ||
} | ||
|
||
agentDeviceServiceClient := pb.NewAgentDeviceServiceClient(m.conn) | ||
req := &pb.DeviceAgentConfigRequest{ | ||
Id: m.cloudConfig.ID, | ||
HostInfo: m.getHostInfo(), | ||
VersionInfo: m.getVersions(), | ||
} | ||
resp, err := m.client.DeviceAgentConfig(timeoutCtx, req) | ||
resp, err := agentDeviceServiceClient.DeviceAgentConfig(timeoutCtx, req) | ||
if err != nil { | ||
m.logger.Warn(errw.Wrapf(err, "fetching %s config", SubsystemName)) | ||
return minimalCheckInterval, err | ||
return minimalDeviceAgentConfigCheckInterval, err | ||
} | ||
fixWindowsPaths(resp) | ||
|
||
|
@@ -699,7 +774,7 @@ func (m *Manager) getVersions() *pb.VersionInfo { | |
return vers | ||
} | ||
|
||
func (m *Manager) Exit() { | ||
m.logger.Info("A new viam-agent has been installed. Will now exit to be restarted by service manager.") | ||
func (m *Manager) Exit(reason string) { | ||
m.logger.Infof("%s. %s will now exit to be restarted by service manager", reason, SubsystemName) | ||
m.globalCancel() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,9 @@ type Subsystem interface { | |
|
||
// HealthCheck reports if a subsystem is running correctly (it is restarted if not) | ||
HealthCheck(ctx context.Context) error | ||
|
||
// Property gets an arbitrary property about the running subystem. | ||
Property(ctx context.Context, property string) bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here's the new method I added. I realize the interface here is meant to reveal a limited API from manager -> subsystems, but I found that the manager truly needed to know a couple "properties" of the running viamserver subsystem (whether restart was currently allowed and whether viamserver was already handling restart checking logic), so I thought this was worth adding despite it opening a pretty generic API to subsystems. |
||
} | ||
|
||
// Dummy is a fake subsystem for when a particular OS doesn't (yet) have support. | ||
|
@@ -39,3 +42,7 @@ func (d *Dummy) Update(_ context.Context, _ utils.AgentConfig) bool { | |
func (d *Dummy) HealthCheck(_ context.Context) error { | ||
return nil | ||
} | ||
|
||
func (d *Dummy) Property(_ context.Context, _ string) bool { | ||
return false | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change, and the addition and usage of
reason
for theExit
method inmanager.go
are for RSDK-11266. Hopefully these messages will be useful for debugging.