Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart crashed plugins #1236

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ func run(ctx context.Context) (err error) {
err = reportFatalError("while waiting for goroutines to finish gracefully", multiErr.ErrorOrNil())
}()

schedulerChan := make(chan string)
collector := plugin.NewCollector(logger)
enabledPluginExecutors, enabledPluginSources := collector.GetAllEnabledAndUsedPlugins(conf)
pluginManager := plugin.NewManager(logger, conf.Settings.Log, conf.Plugins, enabledPluginExecutors, enabledPluginSources)
pluginManager := plugin.NewManager(logger, conf.Settings.Log, conf.Plugins, enabledPluginExecutors, enabledPluginSources, schedulerChan)

err = pluginManager.Start(ctx)
if err != nil {
Expand Down Expand Up @@ -391,7 +392,7 @@ func run(ctx context.Context) (err error) {
actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory)

sourcePluginDispatcher := source.NewDispatcher(logger, conf.Settings.ClusterName, bots, sinkNotifiers, pluginManager, actionProvider, reporter, auditReporter, kubeConfig)
scheduler := source.NewScheduler(logger, conf, sourcePluginDispatcher)
scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan)
err = scheduler.Start(ctx)
if err != nil {
return fmt.Errorf("while starting source plugin event dispatcher: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions cmd/executor/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (*EchoExecutor) Execute(_ context.Context, in executor.ExecuteInput) (execu
return executor.ExecuteOutput{}, errors.New("The @fail label was specified. Failing execution.")
}

if strings.Contains(data, "@panic") {
panic("The @panic label was specified. Panicking.")
}

if cfg.ChangeResponseToUpperCase != nil && *cfg.ChangeResponseToUpperCase {
data = strings.ToUpper(data)
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/source/cm-watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func listenEvents(ctx context.Context, kubeConfig []byte, obj Object, sink chan
infiniteWatch := func(event watch.Event) (bool, error) {
if event.Type == obj.Event {
cm := event.Object.(*corev1.ConfigMap)

if cm.Annotations["die"] == "true" {
exitOnError(fmt.Errorf("die annotation set to true"))
}

msg := fmt.Sprintf("Plugin %s detected `%s` event on `%s/%s`", pluginName, obj.Event, cm.Namespace, cm.Name)
sink <- source.Event{
Message: api.NewPlaintextMessage(msg, true),
Expand Down
74 changes: 38 additions & 36 deletions helm/botkube/README.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions helm/botkube/e2e-test-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ plugins:
repositories:
botkube:
url: http://host.k3d.internal:3000/botkube.yaml
restartPolicy:
type: "DeactivatePlugin"
threshold: 5
healthCheckInterval: 10s

actions:
'get-created-resource':
Expand Down
7 changes: 7 additions & 0 deletions helm/botkube/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,13 @@ plugins:
enabled: true
port: 2115
targetPort: 2115
# -- Botkube Restart Policy on plugin failure.
restartPolicy:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, to better understand: It will do the heatlhcheck each 10s and if it restarts 5 times, the plugin will be disabled with this setup, am I correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, however I think that we can bump the disable threshold to ~10 because for now it's for the whole life of the plugin, until the botkube pod won't be restarted. I would like to change it to but probably in the follow-up PRs.

# -- Restart policy type. Allowed values: "RestartAgent", "DeactivatePlugin".
type: "DeactivatePlugin"
# -- Number of restarts before policy takes into effect.
threshold: 10
healthCheckInterval: 10s

# -- Configuration for synchronizing Botkube configuration.
config:
Expand Down
129 changes: 129 additions & 0 deletions internal/plugin/health_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package plugin

import (
"context"
"time"

"github.com/sirupsen/logrus"

"github.com/kubeshop/botkube/pkg/api/executor"
"github.com/kubeshop/botkube/pkg/api/source"
"github.com/kubeshop/botkube/pkg/config"
)

// HealthMonitor restarts a failed plugin process and inform scheduler to start dispatching loop again with a new client that was generated.
type HealthMonitor struct {
log logrus.FieldLogger
logConfig config.Logger
sourceSupervisorChan chan pluginMetadata
executorSupervisorChan chan pluginMetadata
schedulerChan chan string
executorsStore *store[executor.Executor]
sourcesStore *store[source.Source]
policy config.PluginRestartPolicy
pluginRestartStats map[string]int
healthCheckInterval time.Duration
}

// NewHealthMonitor returns a new HealthMonitor instance.
func NewHealthMonitor(logger logrus.FieldLogger, logCfg config.Logger, policy config.PluginRestartPolicy, schedulerChan chan string, sourceSupervisorChan, executorSupervisorChan chan pluginMetadata, executorsStore *store[executor.Executor], sourcesStore *store[source.Source], healthCheckInterval time.Duration) *HealthMonitor {
return &HealthMonitor{
log: logger,
logConfig: logCfg,
policy: policy,
schedulerChan: schedulerChan,
sourceSupervisorChan: sourceSupervisorChan,
executorSupervisorChan: executorSupervisorChan,
executorsStore: executorsStore,
sourcesStore: sourcesStore,
pluginRestartStats: make(map[string]int),
healthCheckInterval: healthCheckInterval,
}
}

// Start starts monitor processes for sources and executors.
func (m *HealthMonitor) Start(ctx context.Context) {
go m.monitorSourcePluginHealth(ctx)
go m.monitorExecutorPluginHealth(ctx)
}

func (m *HealthMonitor) monitorSourcePluginHealth(ctx context.Context) {
m.log.Info("Starting source plugin supervisor...")
for {
select {
case <-ctx.Done():
return
case plugin := <-m.sourceSupervisorChan:
m.log.Infof("Restarting source plugin %q, attempt %d/%d...", plugin.pluginKey, m.pluginRestartStats[plugin.pluginKey]+1, m.policy.Threshold)
if source, ok := m.sourcesStore.EnabledPlugins.Get(plugin.pluginKey); ok && source.Cleanup != nil {
m.log.Debugf("Releasing resources of source plugin %q...", plugin.pluginKey)
source.Cleanup()
}

m.sourcesStore.EnabledPlugins.Delete(plugin.pluginKey)

if ok := m.shouldRestartPlugin(plugin.pluginKey); !ok {
m.log.Warnf("Plugin %q has been restarted too many times. Deactivating...", plugin.pluginKey)
continue
}

p, err := createGRPCClient[source.Source](ctx, m.log, m.logConfig, plugin, TypeSource, m.sourceSupervisorChan, m.healthCheckInterval)
if err != nil {
m.log.WithError(err).Errorf("Failed to restart plugin %q.", plugin.pluginKey)
continue
}

m.sourcesStore.EnabledPlugins.Insert(plugin.pluginKey, p)
m.schedulerChan <- plugin.pluginKey
}
}
}

func (m *HealthMonitor) monitorExecutorPluginHealth(ctx context.Context) {
m.log.Info("Starting executor plugin supervisor...")
for {
select {
case <-ctx.Done():
return
case plugin := <-m.executorSupervisorChan:
m.log.Infof("Restarting executor plugin %q, attempt %d/%d...", plugin.pluginKey, m.pluginRestartStats[plugin.pluginKey]+1, m.policy.Threshold)

if executor, ok := m.executorsStore.EnabledPlugins.Get(plugin.pluginKey); ok && executor.Cleanup != nil {
m.log.Infof("Releasing executors of executor plugin %q...", plugin.pluginKey)
executor.Cleanup()
}

m.executorsStore.EnabledPlugins.Delete(plugin.pluginKey)
if ok := m.shouldRestartPlugin(plugin.pluginKey); !ok {
m.log.Warnf("Plugin %q has been restarted too many times. Deactivating...", plugin.pluginKey)
continue
}

p, err := createGRPCClient[executor.Executor](ctx, m.log, m.logConfig, plugin, TypeExecutor, m.executorSupervisorChan, m.healthCheckInterval)
if err != nil {
m.log.WithError(err).Errorf("Failed to restart plugin %q.", plugin.pluginKey)
continue
}

m.executorsStore.EnabledPlugins.Insert(plugin.pluginKey, p)
}
}
}

func (m *HealthMonitor) shouldRestartPlugin(plugin string) bool {
restarts := m.pluginRestartStats[plugin]
m.pluginRestartStats[plugin]++

switch m.policy.Type.ToLower() {
case config.KeepAgentRunningWhenThresholdReached.ToLower():
return restarts < m.policy.Threshold
case config.RestartAgentWhenThresholdReached.ToLower():
if restarts >= m.policy.Threshold {
m.log.Fatalf("Plugin %q has been restarted %d times and selected restartPolicy is %q. Exiting...", plugin, restarts, m.policy.Type)
}
return true
default:
m.log.Errorf("Unknown restart policy %q.", m.policy.Type)
return false
}
}
15 changes: 9 additions & 6 deletions internal/plugin/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"regexp"
"runtime"
"strings"
"time"

"github.com/hashicorp/go-getter"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -173,23 +174,25 @@ func (*IndexBuilder) dependenciesForBinary(bin pluginBinariesIndex, deps map[str
return out
}

func (i *IndexBuilder) getPluginMetadata(dir string, bins []pluginBinariesIndex) (*api.MetadataOutput, error) {
func (i *IndexBuilder) getPluginMetadata(dir string, index []pluginBinariesIndex) (*api.MetadataOutput, error) {
os, arch := runtime.GOOS, runtime.GOARCH

for _, item := range bins {
for _, item := range index {
if item.Arch != arch || item.OS != os {
continue
}

bins := map[string]string{
item.Type.String(): filepath.Join(dir, item.BinaryPath),
bins := map[string]pluginMetadata{
item.Type.String(): {
binPath: filepath.Join(dir, item.BinaryPath),
},
}
clients, err := createGRPCClients[metadataGetter](i.log, config.Logger{}, bins, item.Type)
clients, err := createGRPCClients[metadataGetter](context.Background(), i.log, config.Logger{}, bins, item.Type, make(chan pluginMetadata), 10*time.Second)
if err != nil {
return nil, fmt.Errorf("while creating gRPC client: %w", err)
}

cli := clients[item.Type.String()]
cli := clients.data[item.Type.String()]
meta, err := cli.Client.Metadata(context.Background())
if err != nil {
return nil, fmt.Errorf("while calling metadata RPC: %w", err)
Expand Down
Loading
Loading