diff --git a/Makefile b/Makefile index 6493d0d472..3392fc59ab 100644 --- a/Makefile +++ b/Makefile @@ -207,14 +207,9 @@ SCALABILITY_RUNNER := $(ARTIFACTS)/scalability-runner scalability-runner: $(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(SCALABILITY_RUNNER) test/scalability/runner/main.go -SCALABILITY_RUN_DIR := $(ARTIFACTS)/run-scalability -.PHONY: sclability-run-dir -sclability-run-dir: - mkdir -p $(SCALABILITY_RUN_DIR) - .PHONY: minimalkueue -minimalkueue: sclability-run-dir - $(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(SCALABILITY_RUN_DIR)/minimalkueue test/scalability/minimalkueue/main.go +minimalkueue: + $(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(ARTIFACTS)/minimalkueue test/scalability/minimalkueue/main.go ifdef SCALABILITY_CPU_PROFILE SCALABILITY_EXTRA_ARGS += --withCPUProfile=true @@ -228,14 +223,18 @@ ifdef SCALABILITY_KUEUE_LOGS SCALABILITY_EXTRA_ARGS += --withLogs=true --logToFile=true endif +SCALABILITY_GENERATOR_CONFIG ?= $(PROJECT_DIR)/test/scalability/default_generator_config.yaml + +SCALABILITY_RUN_DIR := $(ARTIFACTS)/run-scalability .PHONY: run-scalability run-scalability: envtest scalability-runner minimalkueue + mkdir -p $(SCALABILITY_RUN_DIR) KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \ $(SCALABILITY_RUNNER) \ --o $(SCALABILITY_RUN_DIR) \ --crds=$(PROJECT_DIR)/config/components/crd/bases \ - --generatorConfig=$(PROJECT_DIR)/test/scalability/default_generator_config.yaml \ - $(SCALABILITY_EXTRA_ARGS) + --generatorConfig=$(SCALABILITY_GENERATOR_CONFIG) \ + --minimalKueue=$(ARTIFACTS)/minimalkueue $(SCALABILITY_EXTRA_ARGS) .PHONY: test-scalability test-scalability: gotestsum run-scalability @@ -244,6 +243,14 @@ test-scalability: gotestsum run-scalability --cmdStats=$(SCALABILITY_RUN_DIR)/minimalkueue.stats.yaml \ --range=$(PROJECT_DIR)/test/scalability/default_rangespec.yaml +.PHONY: run-scalability-in-cluster +run-scalability-in-cluster: envtest scalability-runner + mkdir -p $(ARTIFACTS)/run-scalability-in-cluster + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \ + $(SCALABILITY_RUNNER) \ + --o $(ARTIFACTS)/run-scalability-in-cluster \ + --generatorConfig=$(SCALABILITY_GENERATOR_CONFIG) \ + --qps=1000 --burst=2000 --timeout=15m .PHONY: ci-lint ci-lint: golangci-lint diff --git a/test/scalability/README.md b/test/scalability/README.md new file mode 100644 index 0000000000..25545359ef --- /dev/null +++ b/test/scalability/README.md @@ -0,0 +1,71 @@ +# Scalability test + +Is a test meant to detect regressions int the Kueue's overall scheduling capabilities. + +# Components +In order to achieve this the following components are used: + +## MinmalKueue + +A light version of the Kueue's controller manager consisting only of the core controllers and the scheduler. + + +## Runner + +An application able to: +- generate a set of Kueue specific objects based on a config following the schema of [default_generator_config](`./default_generator_config.yaml`) +- mimic the execution of the workloads +- monitor the created object and generate execution statistics based on the received events + +Optionally it's able to run an instance of [minimalkueue](#MinimalKueue) in a dedicated [envtest](https://book.kubebuilder.io/reference/envtest.html) environment. + +## MinmalKueue + +A light version of the Kueue's controller manager consisting only of the core controllers and the scheduler. + +It is designed to offer the Kueue scheduling capabilities without any additional components which may flood the optional cpu profiles taken during it's execution. + + +## Checker + +Checks the results of a scalability against a set of expected value defined as [default_rangespec](./default_rangespec.yaml). + +# Usage + +## Run in a existing cluster + +```bash +make run-scalability-in-cluster +``` + +Will run a scalability scenario against an existing cluster (connectable by the hosts default kubeconfig), and store the resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability-in-cluster`. + +The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml` + +Check [installation guide](https://kueue.sigs.k8s.io/docs/installation) for cluster and [observability](https://kueue.sigs.k8s.io/docs/installation/#add-metrics-scraping-for-prometheus-operator). + +## Run with minimalkueue + +```bash +make run-scalability +``` + +Will run a scalability scenario against an [envtest](https://book.kubebuilder.io/reference/envtest.html) environment +and an instance of minimalkueue. +The resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability`. + +The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml` + +Setting `SCALABILITY_CPU_PROFILE=1` will generate a cpuprofile of minimalkueue in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.cpu.prof` + +Setting `SCALABILITY_KUEUE_METRICS=1` will dump a set kueue metrics in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.metrics.yaml` + +Setting `SCALABILITY_KUEUE_LOGS=1` will save the logs of minimalkueue in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.out.log` and `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.err.log` + +## Run scalability test + +```bash +make test-scalability +``` + +Runs the scalability with minimalkueue and checks the results against `$(PROJECT_DIR)/test/scalability/default_rangespec.yaml` diff --git a/test/scalability/default_generator_config.yaml b/test/scalability/default_generator_config.yaml index 5fa3ee5594..ae45f525b1 100644 --- a/test/scalability/default_generator_config.yaml +++ b/test/scalability/default_generator_config.yaml @@ -8,21 +8,21 @@ reclaimWithinCohort: Any withinClusterQueue: LowerPriority workloadsSets: - - count: 231 + - count: 350 msBetweenWlCreation: 100 workloads: - className: small msRunning: 200 # will eventually queue priority: 50 request: 1 - - count: 66 + - count: 100 msBetweenWlCreation: 500 workloads: - className: medium msRunning: 500 # will eventually be evicted priority: 100 request: 5 - - count: 33 + - count: 50 msBetweenWlCreation: 1200 workloads: - className: large diff --git a/test/scalability/minimalkueue/main.go b/test/scalability/minimalkueue/main.go index f0907fbe3a..66e857cceb 100644 --- a/test/scalability/minimalkueue/main.go +++ b/test/scalability/minimalkueue/main.go @@ -176,17 +176,15 @@ func main() { } func mainWithExitCode() int { - flag.Parse() - opts := func(o *zap.Options) { - o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder - o.ZapOpts = []zaplog.Option{zaplog.AddCaller()} + opts := zap.Options{ + TimeEncoder: zapcore.RFC3339NanoTimeEncoder, + ZapOpts: []zaplog.Option{zaplog.AddCaller()}, + Development: true, + Level: zaplog.NewAtomicLevelAt(zapcore.ErrorLevel), } - - log := zap.New( - zap.WriteTo(os.Stdout), - zap.UseDevMode(true), - zap.Level(zapcore.Level(-1)), - opts) + opts.BindFlags(flag.CommandLine) + flag.Parse() + log := zap.New(zap.UseFlagOptions(&opts)) ctrl.SetLogger(log) log.Info("Start") diff --git a/test/scalability/runner/generator/generator.go b/test/scalability/runner/generator/generator.go index 6991f0e715..a7d37b5468 100644 --- a/test/scalability/runner/generator/generator.go +++ b/test/scalability/runner/generator/generator.go @@ -37,6 +37,7 @@ const ( resourceFlavorName = "rf" RunningTimeLabel = "kueue.x-k8s.io/scalability-running-ms" ClassLabel = "kueue.x-k8s.io/scalability-class" + CleanupLabel = "kueue.x-k8s.io/scalability-cleanup" ) type WorkloadTemplate struct { @@ -143,6 +144,7 @@ func generateQueue(ctx context.Context, c client.Client, qSet QueuesSet, cohortN WithinClusterQueue: qSet.WithinClusterQueue, }). Label(ClassLabel, qSet.ClassName). + Label(CleanupLabel, "true"). Obj() err := c.Create(ctx, cq) if err != nil { @@ -151,6 +153,7 @@ func generateQueue(ctx context.Context, c client.Client, qSet QueuesSet, cohortN ns := &corev1.Namespace{} ns.Name = cq.Name + ns.Labels = map[string]string{CleanupLabel: "true"} err = c.Create(ctx, ns) if err != nil { return err @@ -199,10 +202,34 @@ func Generate(ctx context.Context, c client.Client, cSets []CohortSet) error { log := ctrl.LoggerFrom(ctx).WithName("generate cohort sets").WithValues("numSets", len(cSets)) log.Info("Start generation") defer log.Info("End generation") - rf := utiltesting.MakeResourceFlavor(resourceFlavorName).Obj() + rf := utiltesting.MakeResourceFlavor(resourceFlavorName).Label(CleanupLabel, "true").Obj() err := c.Create(ctx, rf) if err != nil { return err } return concurrent(cSets, func(s []CohortSet) int { return len(s) }, func(idx int) error { return generateCohortSet(ctx, c, cSets[idx]) }) } + +func Cleanup(ctx context.Context, c client.Client) { + log := ctrl.LoggerFrom(ctx).WithName("cleanup") + log.Info("Start cleanup") + + nsList := corev1.NamespaceList{} + if err := c.List(ctx, &nsList, client.HasLabels{CleanupLabel}); err != nil { + log.Error(err, "Listing namespaces") + } else { + for _, ns := range nsList.Items { + if err := c.Delete(ctx, &ns); err != nil { + log.Error(err, "Deleting namespace") + } + } + } + + if err := c.DeleteAllOf(ctx, &kueue.ClusterQueue{}, client.HasLabels{CleanupLabel}); err != nil { + log.Error(err, "Deleting cluster queues") + } + + if err := c.DeleteAllOf(ctx, &kueue.ResourceFlavor{}, client.HasLabels{CleanupLabel}); err != nil { + log.Error(err, "Deleting resource flavor") + } +} diff --git a/test/scalability/runner/main.go b/test/scalability/runner/main.go index c5b121e132..b0b23557ac 100644 --- a/test/scalability/runner/main.go +++ b/test/scalability/runner/main.go @@ -53,16 +53,20 @@ import ( ) var ( - outputDir = flag.String("o", "", "output directory") - crdsPath = flag.String("crds", "", "crds path") - withCpuProfile = flag.Bool("withCPUProfile", false, "generate a CPU profile for minimalkueue") - withKueueMetrics = flag.Bool("withKueueMetrics", false, "save minimalkueue metrics") - withLogs = flag.Bool("withLogs", false, "capture minimalkueue logs") - logToFile = flag.Bool("logToFile", false, "capture minimalkueue logs to files") - generatorConfig = flag.String("generatorConfig", "", "generator config file") - useExistingCluster = flag.Bool("useexistingcluster", false, "use an existing cluster") - qps = flag.Float64("qps", 0, "qps used by the runner clients, use default if 0") - burst = flag.Int("burst", 0, "qps used by the runner clients, use default if 0") + outputDir = flag.String("o", "", "output directory") + crdsPath = flag.String("crds", "", "crds path") + generatorConfig = flag.String("generatorConfig", "", "generator config file") + timeout = flag.Duration("timeout", 10*time.Minute, "maximum record time") + qps = flag.Float64("qps", 0, "qps used by the runner clients, use default if 0") + burst = flag.Int("burst", 0, "qps used by the runner clients, use default if 0") + + // related to minimalkueue + minimalKueuePath = flag.String("minimalKueue", "", "path to minimalkueue, run in the hosts default cluster if empty") + withCpuProfile = flag.Bool("withCPUProfile", false, "generate a CPU profile for minimalkueue") + withKueueMetrics = flag.Bool("withKueueMetrics", false, "save minimalkueue metrics") + withLogs = flag.Bool("withLogs", false, "capture minimalkueue logs") + logLevel = flag.Int("withLogsLevel", 2, "set minimalkueue logs level") + logToFile = flag.Bool("logToFile", false, "capture minimalkueue logs to files") ) var ( @@ -77,16 +81,15 @@ func init() { } func main() { - flag.Parse() - opts := func(o *zap.Options) { - o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder - o.ZapOpts = []zaplog.Option{zaplog.AddCaller()} + opts := zap.Options{ + TimeEncoder: zapcore.RFC3339NanoTimeEncoder, + ZapOpts: []zaplog.Option{zaplog.AddCaller()}, + Development: true, + Level: zaplog.NewAtomicLevelAt(-2), } - log := zap.New( - zap.WriteTo(os.Stdout), - zap.UseDevMode(true), - zap.Level(zapcore.Level(-3)), - opts) + opts.BindFlags(flag.CommandLine) + flag.Parse() + log := zap.New(zap.UseFlagOptions(&opts)) ctrl.SetLogger(log) @@ -96,7 +99,7 @@ func main() { ctx, ctxCancel := context.WithCancel(ctrl.LoggerInto(context.Background(), log)) var cfg *rest.Config - if !*useExistingCluster { + if *minimalKueuePath != "" { testEnv := &envtest.Environment{ CRDDirectoryPaths: []string{*crdsPath}, ErrorIfCRDPathMissing: true, @@ -131,7 +134,7 @@ func main() { // start the minimal kueue manager process wg.Add(1) - err = runCommand(ctx, *outputDir, "minimalkueue", "kubeconfig", *withCpuProfile, *withLogs, *logToFile, *withKueueMetrics, errCh, wg) + err = runCommand(ctx, *outputDir, *minimalKueuePath, "kubeconfig", *withCpuProfile, *withLogs, *logToFile, *logLevel, *withKueueMetrics, errCh, wg) if err != nil { log.Error(err, "MinimalKueue start") os.Exit(1) @@ -162,7 +165,7 @@ func main() { } wg.Add(1) - recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh) + recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh, *timeout) if err != nil { log.Error(err, "Recorder start") os.Exit(1) @@ -209,33 +212,39 @@ func main() { log.Error(err, "Writing wl csv") os.Exit(1) } + + if *minimalKueuePath == "" { + c, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + log.Error(err, "Create cleanup client") + } else { + generator.Cleanup(context.Background(), c) + } + } } -func runCommand(ctx context.Context, workDir, exe, kubeconfig string, withCPUProf, withLogs, logToFile, withMetrics bool, errCh chan<- error, wg *sync.WaitGroup) error { +func runCommand(ctx context.Context, workDir, cmdPath, kubeconfig string, withCPUProf, withLogs, logToFile bool, logLevel int, withMetrics bool, errCh chan<- error, wg *sync.WaitGroup) error { defer wg.Done() log := ctrl.LoggerFrom(ctx).WithName("Run command") - cmdPath := path.Join(workDir, exe) - - args := []string{ - "--kubeconfig", path.Join(workDir, kubeconfig), + cmd := exec.CommandContext(ctx, cmdPath, "--kubeconfig", path.Join(workDir, kubeconfig)) + cmd.Cancel = func() error { + log.Info("Stop the command") + return cmd.Process.Signal(syscall.SIGINT) } + exe := path.Base(cmdPath) + if withCPUProf { - args = append(args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe))) + cmd.Args = append(cmd.Args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe))) } if withMetrics { - args = append(args, "--metricsfile", path.Join(workDir, fmt.Sprintf("%s.metrics.yaml", exe))) - } - - cmd := exec.CommandContext(ctx, cmdPath, args...) - cmd.Cancel = func() error { - log.Info("Stop the command") - return cmd.Process.Signal(syscall.SIGINT) + cmd.Args = append(cmd.Args, "--metricsfile", path.Join(workDir, fmt.Sprintf("%s.metrics.yaml", exe))) } if withLogs { + cmd.Args = append(cmd.Args, fmt.Sprintf("--zap-log-level=%d", logLevel)) outWriter := os.Stdout errWriter := os.Stderr if logToFile { @@ -336,11 +345,11 @@ func runGenerator(ctx context.Context, cfg *rest.Config, generatorConfig string, return nil } -func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}) (*recorder.Recorder, error) { +func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}, recordTimeout time.Duration) (*recorder.Recorder, error) { defer wg.Done() log := ctrl.LoggerFrom(ctx).WithName("Start recorder") //TODO: make the timeout an arg - recorder := recorder.New(30 * time.Minute) + recorder := recorder.New(recordTimeout) wg.Add(1) go func() { defer wg.Done() @@ -353,7 +362,7 @@ func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, errCh <- err }() - log.Info("Recorder started ") + log.Info("Recorder started", "timeout", recordTimeout) return recorder, nil }