Skip to content

Commit

Permalink
Add runtime flag to allow disabling resource monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed May 10, 2024
1 parent 8cd8fea commit b6783fe
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 14 deletions.
2 changes: 2 additions & 0 deletions config/runtime_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ var (

// RuntimeFlagLegacyIdentityManager enables the legacy identity manager
RuntimeFlagLegacyIdentityManager = runtimeFlag("legacy_identity_manager", false)

RuntimeFlagEnvoyResourceManagerEnabled = runtimeFlag("envoy_resource_manager_enabled", true)
)

// RuntimeFlag is a runtime flag that can flip on/off certain features
Expand Down
2 changes: 1 addition & 1 deletion pkg/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Buil
}
go srv.runProcessCollector(ctx)

if rm, err := NewSharedResourceMonitor(srv.wd); err == nil {
if rm, err := NewSharedResourceMonitor(ctx, src, srv.wd); err == nil {
srv.resourceMonitor = rm
} else {
log.Error(ctx).Err(err).Str("service", "envoy").Msg("not starting resource monitor")
Expand Down
37 changes: 28 additions & 9 deletions pkg/envoy/resource_monitor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
)
Expand Down Expand Up @@ -150,7 +151,7 @@ func WithCgroupDriver(driver CgroupDriver) ResourceMonitorOption {
// memory saturation to envoy as an injected resource. This allows envoy to
// react to actual memory pressure in the cgroup, taking into account memory
// usage from pomerium itself.
func NewSharedResourceMonitor(tempDir string, opts ...ResourceMonitorOption) (ResourceMonitor, error) {
func NewSharedResourceMonitor(ctx context.Context, src config.Source, tempDir string, opts ...ResourceMonitorOption) (ResourceMonitor, error) {
options := ResourceMonitorOptions{}
options.apply(opts...)
if options.driver == nil {
Expand Down Expand Up @@ -179,6 +180,13 @@ func NewSharedResourceMonitor(tempDir string, opts ...ResourceMonitorOption) (Re
cgroup: selfCgroup,
tempDir: filepath.Join(tempDir, "resource_monitor"),
}
readInitialConfig := make(chan struct{})
src.OnConfigChange(ctx, func(ctx context.Context, c *config.Config) {
<-readInitialConfig
s.onConfigChange(ctx, c)
})
s.onConfigChange(ctx, src.GetConfig())
close(readInitialConfig)

if err := s.writeMetricFile(groupMemory, metricCgroupMemorySaturation, "0", 0o644); err != nil {
return nil, fmt.Errorf("failed to initialize metrics: %w", err)
Expand All @@ -190,6 +198,15 @@ type sharedResourceMonitor struct {
ResourceMonitorOptions
cgroup string
tempDir string
enabled atomic.Bool
}

func (s *sharedResourceMonitor) onConfigChange(_ context.Context, cfg *config.Config) {
if cfg == nil || cfg.Options == nil {
s.enabled.Store(config.DefaultRuntimeFlags()[config.RuntimeFlagEnvoyResourceManagerEnabled])
return
}
s.enabled.Store(cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagEnvoyResourceManagerEnabled))
}

func (s *sharedResourceMonitor) metricFilename(group, name string) string {
Expand Down Expand Up @@ -253,7 +270,7 @@ func (s *sharedResourceMonitor) ApplyBootstrapConfig(bootstrap *envoy_config_boo

var (
monitorInitialTickDelay = 1 * time.Second
monitorMaxTickInterval = 5 * time.Second
monitorMaxTickInterval = 10 * time.Second
monitorMinTickInterval = 250 * time.Millisecond
)

Expand Down Expand Up @@ -301,14 +318,16 @@ func (s *sharedResourceMonitor) Run(ctx context.Context, envoyPid int) error {
tick.Stop()
return ctx.Err()
case <-tick.C:
usage, err := s.driver.MemoryUsage(s.cgroup)
if err != nil {
log.Error(ctx).Err(err).Msg("failed to get memory saturation")
continue
}
var saturation float64
if limit := limitWatcher.Value(); limit > 0 {
saturation = float64(usage) / float64(limit)
if s.enabled.Load() {
if limit := limitWatcher.Value(); limit > 0 {
usage, err := s.driver.MemoryUsage(s.cgroup)
if err != nil {
log.Error(ctx).Err(err).Msg("failed to get memory saturation")
continue
}
saturation = float64(usage) / float64(limit)
}
}

saturationStr := fmt.Sprintf("%.6f", saturation)
Expand Down
9 changes: 7 additions & 2 deletions pkg/envoy/resource_monitor_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

package envoy

import "errors"
import (
"context"
"errors"

func NewSharedResourceMonitor(tempDir string) (ResourceMonitor, error) {
"github.com/pomerium/pomerium/config"
)

func NewSharedResourceMonitor(ctx context.Context, src config.Source, tempDir string) (ResourceMonitor, error) {
return nil, errors.New("unsupported platform")
}
30 changes: 28 additions & 2 deletions pkg/envoy/resource_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ func TestSharedResourceMonitor(t *testing.T) {
},
}

monitor, err := NewSharedResourceMonitor(tempDir, WithCgroupDriver(driver))
configSrc := config.NewStaticSource(&config.Config{})
monitor, err := NewSharedResourceMonitor(context.Background(), configSrc, tempDir, WithCgroupDriver(driver))
require.NoError(t, err)

readMemorySaturation := func(t assert.TestingT) string {
Expand Down Expand Up @@ -650,6 +651,31 @@ func TestSharedResourceMonitor(t *testing.T) {
assert.Equal(c, "1.000000", readMemorySaturation(c))
}, timeout, interval)

configSrc.SetConfig(ctx, &config.Config{
Options: &config.Options{
RuntimeFlags: config.RuntimeFlags{
config.RuntimeFlagEnvoyResourceManagerEnabled: false,
},
},
})

assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, "0.000000", readMemorySaturation(c))
}, timeout, interval)

configSrc.SetConfig(ctx, &config.Config{
Options: &config.Options{
RuntimeFlags: config.RuntimeFlags{
config.RuntimeFlagEnvoyResourceManagerEnabled: true,
},
},
})

updateMemoryMax("150")
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, "1.000000", readMemorySaturation(c))
}, timeout, interval)

ca()
assert.ErrorIs(t, <-errC, context.Canceled)
}
Expand All @@ -658,7 +684,7 @@ func TestBootstrapConfig(t *testing.T) {
b := envoyconfig.New("localhost:1111", "localhost:2222", "localhost:3333", filemgr.NewManager(), nil)
testEnvoyPid := 99
tempDir := t.TempDir()
monitor, err := NewSharedResourceMonitor(tempDir, WithCgroupDriver(&cgroupV2Driver{
monitor, err := NewSharedResourceMonitor(context.Background(), config.NewStaticSource(nil), tempDir, WithCgroupDriver(&cgroupV2Driver{
root: "sys/fs/cgroup",
fs: &hybridTestFS{
base: with(v2Fs, fstest.MapFS{
Expand Down

0 comments on commit b6783fe

Please sign in to comment.