Skip to content

Commit 4e12df8

Browse files
committed
test: integration test for OOM controller
- Improve docs - Disable OOM controller in container mode - Log OOM events - Add an integration test verifying the system can sustain an OOM event Signed-off-by: Dmitrii Sharshakov <dmitry.sharshakov@siderolabs.com>
1 parent 7e498fa commit 4e12df8

File tree

21 files changed

+739
-75
lines changed

21 files changed

+739
-75
lines changed

api/resource/definitions/runtime/runtime.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ message MountStatusSpec {
123123
repeated string encryption_providers = 6;
124124
}
125125

126+
// OOMActionSpec describes the OOM action record resource properties.
127+
message OOMActionSpec {
128+
string trigger_context = 1;
129+
double score = 2;
130+
repeated string processes = 3;
131+
}
132+
126133
// PlatformMetadataSpec describes platform metadata properties.
127134
message PlatformMetadataSpec {
128135
string platform = 1;

internal/app/machined/pkg/controllers/runtime/oom.go

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
package runtime
66

77
import (
8+
"bytes"
89
"context"
10+
"encoding/json"
911
"fmt"
1012
"math"
1113
"os"
1214
"path/filepath"
15+
"strconv"
1316
"sync"
1417
"syscall"
1518
"time"
@@ -22,16 +25,27 @@ import (
2225
"golang.org/x/sys/unix"
2326

2427
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/runtime/internal/oom"
28+
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
2529
"github.com/siderolabs/talos/pkg/machinery/cel"
2630
"github.com/siderolabs/talos/pkg/machinery/cel/celenv"
2731
"github.com/siderolabs/talos/pkg/machinery/constants"
2832
"github.com/siderolabs/talos/pkg/machinery/resources/config"
33+
runtimeres "github.com/siderolabs/talos/pkg/machinery/resources/runtime"
2934
)
3035

36+
type actionLogItem struct {
37+
runtimeres.OOMActionSpec
38+
39+
ID int
40+
}
41+
3142
// OOMController is a controller that monitors memory PSI and handles near-OOM situations.
3243
type OOMController struct {
3344
CgroupRoot string
3445
ActionTriggered time.Time
46+
V1Alpha1Mode runtime.Mode
47+
actionLog []actionLogItem
48+
idSeq int
3549
}
3650

3751
// Name implements controller.Controller interface.
@@ -53,7 +67,12 @@ func (ctrl *OOMController) Inputs() []controller.Input {
5367

5468
// Outputs implements controller.Controller interface.
5569
func (ctrl *OOMController) Outputs() []controller.Output {
56-
return nil
70+
return []controller.Output{
71+
{
72+
Type: runtimeres.OOMActionType,
73+
Kind: controller.OutputExclusive,
74+
},
75+
}
5776
}
5877

5978
var defaultTriggerExpr = sync.OnceValue(func() cel.Expression {
@@ -91,6 +110,10 @@ const defaultSampleInterval = 500 * time.Millisecond
91110
//
92111
//nolint:gocyclo
93112
func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
113+
if ctrl.V1Alpha1Mode.InContainer() {
114+
return nil
115+
}
116+
94117
triggerExpr := defaultTriggerExpr()
95118
scoringExpr := defaultScoringExpr()
96119
sampleInterval := defaultSampleInterval
@@ -134,10 +157,48 @@ func (ctrl *OOMController) Run(ctx context.Context, r controller.Runtime, logger
134157
continue
135158
}
136159

160+
r.StartTrackingOutputs()
161+
162+
for _, action := range ctrl.actionLog {
163+
if err := safe.WriterModify(ctx, r, runtimeres.NewOOMActionSpec(runtimeres.NamespaceName, strconv.Itoa(action.ID)),
164+
func(item *runtimeres.OOMAction) error {
165+
*item.TypedSpec() = action.OOMActionSpec
166+
167+
return nil
168+
}); err != nil {
169+
return fmt.Errorf("failed to create OOM action log: %w", err)
170+
}
171+
}
172+
173+
if err = safe.CleanupOutputs[*runtimeres.OOMAction](ctx, r); err != nil {
174+
return err
175+
}
176+
137177
// TODO: evaluate on different cgroups, not only root. E.g. action when podruntime experiences high PSI.
138178
if trigger {
179+
score, processes := ctrl.OomAction(logger, ctrl.CgroupRoot, scoringExpr)
180+
181+
ctxString, err := json.Marshal(evalContext)
182+
if err != nil {
183+
return fmt.Errorf("failed to marshal trigger context: %w", err)
184+
}
185+
186+
ctrl.actionLog = append(ctrl.actionLog, actionLogItem{
187+
ID: ctrl.idSeq,
188+
OOMActionSpec: runtimeres.OOMActionSpec{
189+
TriggerContext: string(ctxString),
190+
Processes: processes,
191+
Score: score,
192+
},
193+
})
194+
195+
ctrl.idSeq++
196+
197+
if len(ctrl.actionLog) > 10 {
198+
ctrl.actionLog = ctrl.actionLog[len(ctrl.actionLog)-10:]
199+
}
200+
139201
ctrl.ActionTriggered = time.Now()
140-
ctrl.OomAction(logger, ctrl.CgroupRoot, scoringExpr)
141202
}
142203
}
143204
}
@@ -177,13 +238,13 @@ func (*OOMController) getConfig(cfg *config.MachineConfig) (cel.Expression, cel.
177238
}
178239

179240
// OomAction handles out of memory conditions by selecting and killing cgroups based on memory usage data.
180-
func (ctrl *OOMController) OomAction(logger *zap.Logger, root string, scoringExpr cel.Expression) {
241+
func (ctrl *OOMController) OomAction(logger *zap.Logger, root string, scoringExpr cel.Expression) (float64, []string) {
181242
logger.Info("OOM controller triggered")
182243

183244
ranking := oom.RankCgroups(logger, root, scoringExpr)
184245

185246
if len(ranking) == 0 {
186-
return
247+
return 0, []string{}
187248
}
188249

189250
var (
@@ -198,13 +259,15 @@ func (ctrl *OOMController) OomAction(logger *zap.Logger, root string, scoringExp
198259
}
199260
}
200261

201-
err := reapCg(logger, cgroupToKill.Path)
262+
processes, err := reapCg(logger, cgroupToKill.Path)
202263
if err != nil {
203264
logger.Error("cannot reap cgroup", zap.String("cgroup", cgroupToKill.Path), zap.Error(err))
204265
}
266+
267+
return maxScore, processes
205268
}
206269

207-
func reapCg(logger *zap.Logger, cgroupPath string) error {
270+
func reapCg(logger *zap.Logger, cgroupPath string) ([]string, error) {
208271
logger.Warn("Sending SIGKILL to cgroup", zap.String("cgroup", cgroupPath))
209272

210273
processes := oom.ListCgroupProcs(cgroupPath)
@@ -213,8 +276,17 @@ func reapCg(logger *zap.Logger, cgroupPath string) error {
213276
// Open pidfd's of all the processes in cgroup to accelerate kernel
214277
// garbage-collecting those processes via mrelease.
215278
pidfds := []int{}
279+
cmdlines := []string{}
216280

217281
for _, pid := range processes {
282+
cmdBytes, err := os.ReadFile(filepath.Join("/proc", strconv.Itoa(pid), "cmdline"))
283+
if err == nil {
284+
cmdlines = append(
285+
cmdlines,
286+
string(bytes.ReplaceAll(bytes.TrimRight(cmdBytes, "\x00"), []byte{0}, []byte{' '})),
287+
)
288+
}
289+
218290
// pidfd is always opened with CLOEXEC:
219291
// https://github.com/torvalds/linux/blob/bf40f4b87761e2ec16efc8e49b9ca0d81f4115d8/kernel/pid.c#L637
220292
pidfd, err := unix.PidfdOpen(pid, 0)
@@ -232,7 +304,7 @@ func reapCg(logger *zap.Logger, cgroupPath string) error {
232304
if err != nil {
233305
logger.Error("failed to send SIGKILL", zap.String("cgroup", cgroupPath), zap.Error(err))
234306

235-
return err
307+
return cmdlines, err
236308
}
237309

238310
for _, pidfd := range pidfds {
@@ -245,5 +317,5 @@ func reapCg(logger *zap.Logger, cgroupPath string) error {
245317
}
246318
}
247319

248-
return nil
320+
return cmdlines, nil
249321
}

internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,9 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error
444444
&runtimecontrollers.VersionController{},
445445
&runtimecontrollers.WatchdogTimerConfigController{},
446446
&runtimecontrollers.WatchdogTimerController{},
447-
&runtimecontrollers.OOMController{},
447+
&runtimecontrollers.OOMController{
448+
V1Alpha1Mode: ctrl.v1alpha1Runtime.State().Platform().Mode(),
449+
},
448450
&secrets.APICertSANsController{},
449451
&secrets.APIController{},
450452
&secrets.EncryptionSaltController{},

internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ func NewState() (*State, error) {
225225
&runtime.MetaKey{},
226226
&runtime.MetaLoaded{},
227227
&runtime.MountStatus{},
228+
&runtime.OOMAction{},
228229
&runtime.PlatformMetadata{},
229230
&runtime.SBOMItem{},
230231
&runtime.SecurityState{},

internal/integration/base/k8s.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/siderolabs/gen/xslices"
2626
"github.com/siderolabs/go-pointer"
2727
"github.com/siderolabs/go-retry/retry"
28+
appsv1 "k8s.io/api/apps/v1"
2829
corev1 "k8s.io/api/core/v1"
2930
eventsv1 "k8s.io/api/events/v1"
3031
"k8s.io/apimachinery/pkg/api/errors"
@@ -514,6 +515,41 @@ func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout tim
514515
}
515516
}
516517

518+
// WaitForDeploymentAvailable waits for the deployment with the given namespace and name to be running with the requested replicas.
519+
func (k8sSuite *K8sSuite) WaitForDeploymentAvailable(ctx context.Context, timeout time.Duration, namespace, deplName string, replicas int32) error {
520+
ctx, cancel := context.WithTimeout(ctx, timeout)
521+
defer cancel()
522+
523+
watcher, err := k8sSuite.Clientset.AppsV1().Deployments(namespace).Watch(ctx, metav1.ListOptions{
524+
FieldSelector: fields.OneTermEqualSelector("metadata.name", deplName).String(),
525+
})
526+
if err != nil {
527+
return err
528+
}
529+
530+
defer watcher.Stop()
531+
532+
for {
533+
select {
534+
case <-ctx.Done():
535+
return ctx.Err()
536+
case event := <-watcher.ResultChan():
537+
if event.Type == watch.Error {
538+
return fmt.Errorf("error watching deployment: %v", event.Object)
539+
}
540+
541+
deployment, ok := event.Object.(*appsv1.Deployment)
542+
if !ok {
543+
continue
544+
}
545+
546+
if deployment.Name == deplName && deployment.Status.AvailableReplicas == replicas {
547+
return nil
548+
}
549+
}
550+
}
551+
}
552+
517553
// LogPodLogsByLabel logs the logs of the pod with the given namespace and label.
518554
func (k8sSuite *K8sSuite) LogPodLogsByLabel(ctx context.Context, namespace, label, value string) {
519555
ctx, cancel := context.WithTimeout(ctx, time.Minute)

internal/integration/k8s/oom.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
//go:build integration_k8s
6+
7+
package k8s
8+
9+
import (
10+
"context"
11+
_ "embed"
12+
"strings"
13+
"testing"
14+
"time"
15+
16+
"github.com/cosi-project/runtime/pkg/state"
17+
18+
"github.com/siderolabs/talos/internal/integration/base"
19+
"github.com/siderolabs/talos/pkg/machinery/client"
20+
"github.com/siderolabs/talos/pkg/machinery/config/machine"
21+
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
22+
)
23+
24+
// OomSuite verifies that userspace OOM handler will kill excessive replicas of a heavy memory consumer deployment.
25+
type OomSuite struct {
26+
base.K8sSuite
27+
}
28+
29+
var (
30+
//go:embed testdata/oom.yaml
31+
oomPodSpec []byte
32+
33+
//go:embed testdata/oom-50-replicas.yaml
34+
oom50ReplicasPatch []byte
35+
36+
//go:embed testdata/oom-1-replica.yaml
37+
oom1ReplicaPatch []byte
38+
)
39+
40+
// SuiteName returns the name of the suite.
41+
func (suite *OomSuite) SuiteName() string {
42+
return "k8s.OomSuite"
43+
}
44+
45+
// TestOom verifies that system remains stable after handling an OOM event.
46+
func (suite *OomSuite) TestOom() {
47+
if suite.Cluster == nil {
48+
suite.T().Skip("without full cluster state reaching out to the node IP is not reliable")
49+
}
50+
51+
if testing.Short() {
52+
suite.T().Skip("skipping in short mode")
53+
}
54+
55+
if suite.Cluster.Provisioner() != base.ProvisionerQEMU {
56+
suite.T().Skip("skipping OOM test since provisioner is not qemu")
57+
}
58+
59+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
60+
suite.T().Cleanup(cancel)
61+
62+
oomPodManifest := suite.ParseManifests(oomPodSpec)
63+
64+
suite.T().Cleanup(func() {
65+
cleanUpCtx, cleanupCancel := context.WithTimeout(context.Background(), time.Minute)
66+
defer cleanupCancel()
67+
68+
suite.DeleteManifests(cleanUpCtx, oomPodManifest)
69+
})
70+
71+
suite.ApplyManifests(ctx, oomPodManifest)
72+
73+
suite.Require().NoError(suite.WaitForDeploymentAvailable(ctx, time.Minute, "default", "stress-mem", 2))
74+
75+
// Scale to 50
76+
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", oom50ReplicasPatch)
77+
78+
// Expect at least one OOM kill of stress-ng within 15 seconds
79+
suite.Assert().True(suite.waitForOOMKilled(ctx, 15*time.Second, "stress-ng"))
80+
81+
// Scale to 1, wait for deployment to scale down, proving system is operational
82+
suite.PatchK8sObject(ctx, "default", "apps", "Deployment", "v1", "stress-mem", oom1ReplicaPatch)
83+
suite.Require().NoError(suite.WaitForDeploymentAvailable(ctx, time.Minute, "default", "stress-mem", 1))
84+
85+
suite.APISuite.AssertClusterHealthy(ctx)
86+
}
87+
88+
// Waits for a period of time and return returns whether or not OOM events containing a specified process have been observed.
89+
func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeout time.Duration, substr string) bool {
90+
startTime := time.Now()
91+
92+
watchCh := make(chan state.Event)
93+
workerNode := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
94+
workerCtx := client.WithNode(ctx, workerNode)
95+
96+
suite.Assert().NoError(suite.Client.COSI.WatchKind(
97+
workerCtx,
98+
runtime.NewOOMActionSpec(runtime.NamespaceName, "").Metadata(),
99+
watchCh,
100+
))
101+
102+
timeoutCh := time.After(timeout)
103+
ret := false
104+
105+
for {
106+
select {
107+
case <-timeoutCh:
108+
return ret
109+
case ev := <-watchCh:
110+
if ev.Type != state.Created || ev.Resource.Metadata().Created().Before(startTime) {
111+
continue
112+
}
113+
114+
res := ev.Resource.(*runtime.OOMAction).TypedSpec()
115+
116+
for _, proc := range res.Processes {
117+
if strings.Contains(proc, substr) {
118+
ret = true
119+
}
120+
}
121+
}
122+
}
123+
}
124+
125+
func init() {
126+
allSuites = append(allSuites, new(OomSuite))
127+
}

0 commit comments

Comments
 (0)