@@ -14,6 +14,9 @@ import (
1414 "time"
1515
1616 "github.com/cosi-project/runtime/pkg/state"
17+ "github.com/dustin/go-humanize"
18+ "github.com/stretchr/testify/require"
19+ "gopkg.in/yaml.v3"
1720
1821 "github.com/siderolabs/talos/internal/integration/base"
1922 "github.com/siderolabs/talos/pkg/machinery/client"
@@ -26,16 +29,8 @@ type OomSuite struct {
2629 base.K8sSuite
2730}
2831
29- var (
30- //go:embed testdata/oom.yaml
31- oomPodSpec []byte
32-
33- //go:embed testdata/oom-50-replicas.yaml
34- oom50ReplicasPatch []byte
35-
36- //go:embed testdata/oom-1-replica.yaml
37- oom1ReplicaPatch []byte
38- )
32+ //go:embed testdata/oom.yaml
33+ var oomPodSpec []byte
3934
4035// SuiteName returns the name of the suite.
4136func (suite * OomSuite ) SuiteName () string {
@@ -72,40 +67,80 @@ func (suite *OomSuite) TestOom() {
7267
7368 suite .Require ().NoError (suite .WaitForDeploymentAvailable (ctx , time .Minute , "default" , "stress-mem" , 2 ))
7469
75- // Scale to 50
76- suite .PatchK8sObject (ctx , "default" , "apps" , "Deployment" , "v1" , "stress-mem" , oom50ReplicasPatch )
70+ // Figure out number of replicas, this is ballpark estimation of 15 replicas per 2GB of memory (per worker node)
71+ numWorkers := len (suite .DiscoverNodeInternalIPsByType (ctx , machine .TypeWorker ))
72+ suite .Require ().Greaterf (numWorkers , 0 , "at least one worker node is required for the test" )
73+
74+ memInfo , err := suite .Client .Memory (client .WithNode (ctx , suite .RandomDiscoveredNodeInternalIP (machine .TypeWorker )))
75+ suite .Require ().NoError (err )
76+
77+ memoryBytes := memInfo .GetMessages ()[0 ].GetMeminfo ().GetMemtotal () * 1024
78+ numReplicas := int ((memoryBytes / 1024 / 1024 + 2048 - 1 )/ 2048 ) * numWorkers * 15
79+
80+ suite .T ().Logf ("detected total memory: %s, workers %d => scaling to %d replicas" ,
81+ humanize .IBytes (memoryBytes ), numWorkers , numReplicas )
82+
83+ // Scale to discovered number of replicas
84+ suite .PatchK8sObject (ctx , "default" , "apps" , "Deployment" , "v1" , "stress-mem" , patchToReplicas (suite .T (), numReplicas ))
7785
7886 // Expect at least one OOM kill of stress-ng within 15 seconds
79- suite .Assert ().True (suite .waitForOOMKilled (ctx , 15 * time .Second , "stress-ng" ))
87+ suite .Assert ().True (suite .waitForOOMKilled (ctx , 15 * time .Second , 2 * time . Minute , "stress-ng" ))
8088
8189 // Scale to 1, wait for deployment to scale down, proving system is operational
82- suite .PatchK8sObject (ctx , "default" , "apps" , "Deployment" , "v1" , "stress-mem" , oom1ReplicaPatch )
90+ suite .PatchK8sObject (ctx , "default" , "apps" , "Deployment" , "v1" , "stress-mem" , patchToReplicas ( suite . T (), 1 ) )
8391 suite .Require ().NoError (suite .WaitForDeploymentAvailable (ctx , time .Minute , "default" , "stress-mem" , 1 ))
8492
8593 suite .APISuite .AssertClusterHealthy (ctx )
8694}
8795
96+ func patchToReplicas (t * testing.T , replicas int ) []byte {
97+ spec := map [string ]any {
98+ "spec" : map [string ]any {
99+ "replicas" : replicas ,
100+ },
101+ }
102+
103+ patch , err := yaml .Marshal (spec )
104+ require .NoError (t , err )
105+
106+ return patch
107+ }
108+
88109// Waits for a period of time and return returns whether or not OOM events containing a specified process have been observed.
89- func (suite * OomSuite ) waitForOOMKilled (ctx context.Context , timeout time.Duration , substr string ) bool {
110+ //
111+ //nolint:gocyclo
112+ func (suite * OomSuite ) waitForOOMKilled (ctx context.Context , timeToObserve , timeout time.Duration , substr string ) bool {
90113 startTime := time .Now ()
91114
92115 watchCh := make (chan state.Event )
93- workerNode := suite .RandomDiscoveredNodeInternalIP (machine .TypeWorker )
94- workerCtx := client .WithNode (ctx , workerNode )
95-
96- suite .Assert ().NoError (suite .Client .COSI .WatchKind (
97- workerCtx ,
98- runtime .NewOOMActionSpec (runtime .NamespaceName , "" ).Metadata (),
99- watchCh ,
100- ))
116+ workerNodes := suite .DiscoverNodeInternalIPsByType (ctx , machine .TypeWorker )
117+
118+ // start watching OOM events on all worker nodes
119+ for _ , workerNode := range workerNodes {
120+ suite .Assert ().NoError (suite .Client .COSI .WatchKind (
121+ client .WithNode (ctx , workerNode ),
122+ runtime .NewOOMActionSpec (runtime .NamespaceName , "" ).Metadata (),
123+ watchCh ,
124+ ))
125+ }
101126
102127 timeoutCh := time .After (timeout )
103- ret := false
128+ timeToObserveCh := time .After (timeToObserve )
129+ numOOMObserved := 0
104130
105131 for {
106132 select {
107133 case <- timeoutCh :
108- return ret
134+ suite .T ().Logf ("observed %d OOM events containing process substring %q" , numOOMObserved , substr )
135+
136+ return numOOMObserved > 0
137+ case <- timeToObserveCh :
138+ if numOOMObserved > 0 {
139+ // if we already observed some OOM events, consider it a success
140+ suite .T ().Logf ("observed %d OOM events containing process substring %q" , numOOMObserved , substr )
141+
142+ return true
143+ }
109144 case ev := <- watchCh :
110145 if ev .Type != state .Created || ev .Resource .Metadata ().Created ().Before (startTime ) {
111146 continue
@@ -115,7 +150,7 @@ func (suite *OomSuite) waitForOOMKilled(ctx context.Context, timeout time.Durati
115150
116151 for _ , proc := range res .Processes {
117152 if strings .Contains (proc , substr ) {
118- ret = true
153+ numOOMObserved ++
119154 }
120155 }
121156 }
0 commit comments