From b7fd4a30eabb20a1a9e61e9cf9b3d18247385836 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Thu, 4 Apr 2024 14:58:12 +0300 Subject: [PATCH 1/5] [testing] Make RestConfigToKubeConfig an util function. --- pkg/util/testing/kubeconfig.go | 50 +++++++++++++++++++++++ test/integration/multikueue/suite_test.go | 28 +------------ 2 files changed, 52 insertions(+), 26 deletions(-) create mode 100644 pkg/util/testing/kubeconfig.go diff --git a/pkg/util/testing/kubeconfig.go b/pkg/util/testing/kubeconfig.go new file mode 100644 index 0000000000..ac53340340 --- /dev/null +++ b/pkg/util/testing/kubeconfig.go @@ -0,0 +1,50 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" +) + +func RestConfigToKubeConfig(restConfig *rest.Config) ([]byte, error) { + cfg := clientcmdapi.Config{ + Kind: "config", + APIVersion: "v1", + Clusters: map[string]*clientcmdapi.Cluster{ + "default-cluster": { + Server: restConfig.Host, + CertificateAuthorityData: restConfig.CAData, + }, + }, + AuthInfos: map[string]*clientcmdapi.AuthInfo{ + "default-user": { + ClientCertificateData: restConfig.CertData, + ClientKeyData: restConfig.KeyData, + }, + }, + Contexts: map[string]*clientcmdapi.Context{ + "default-context": { + Cluster: "default-cluster", + AuthInfo: "default-user", + }, + }, + CurrentContext: "default-context", + } + return clientcmd.Write(cfg) +} diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 8a021bd0b6..470b2d7da4 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -27,8 +27,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -41,6 +39,7 @@ import ( workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/webhooks" "sigs.k8s.io/kueue/test/integration/framework" // +kubebuilder:scaffold:imports @@ -58,30 +57,7 @@ type cluster struct { } func (c *cluster) kubeConfigBytes() ([]byte, error) { - cfg := clientcmdapi.Config{ - Kind: "config", - APIVersion: "v1", - Clusters: map[string]*clientcmdapi.Cluster{ - "default-cluster": { - Server: c.cfg.Host, - CertificateAuthorityData: c.cfg.CAData, - }, - }, - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "default-user": { - ClientCertificateData: c.cfg.CertData, - ClientKeyData: c.cfg.KeyData, - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "default-context": { - Cluster: "default-cluster", - AuthInfo: "default-user", - }, - }, - CurrentContext: "default-context", - } - return clientcmd.Write(cfg) + return utiltesting.RestConfigToKubeConfig(c.cfg) } var ( From 4f2338cdc544c7b5f26866eb49da5472e8321df0 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Thu, 4 Apr 2024 15:01:07 +0300 Subject: [PATCH 2/5] [scalability] Initial implementation. --- Makefile | 47 ++- pkg/util/testing/wrappers.go | 8 + test/scalability/README.md | 64 +++ test/scalability/checker/checker_test.go | 80 ++++ .../scalability/default_generator_config.yaml | 31 ++ test/scalability/default_rangespec.yaml | 7 + test/scalability/minimalkueue/main.go | 175 ++++++++ .../runner/controller/controller.go | 174 ++++++++ .../scalability/runner/generator/generator.go | 235 +++++++++++ .../runner/generator/generator_test.go | 137 +++++++ test/scalability/runner/main.go | 388 ++++++++++++++++++ test/scalability/runner/recorder/recorder.go | 106 +++++ test/scalability/runner/stats/stats.go | 24 ++ 13 files changed, 1475 insertions(+), 1 deletion(-) create mode 100644 test/scalability/README.md create mode 100644 test/scalability/checker/checker_test.go create mode 100644 test/scalability/default_generator_config.yaml create mode 100644 test/scalability/default_rangespec.yaml create mode 100644 test/scalability/minimalkueue/main.go create mode 100644 test/scalability/runner/controller/controller.go create mode 100644 test/scalability/runner/generator/generator.go create mode 100644 test/scalability/runner/generator/generator_test.go create mode 100644 test/scalability/runner/main.go create mode 100644 test/scalability/runner/recorder/recorder.go create mode 100644 test/scalability/runner/stats/stats.go diff --git a/Makefile b/Makefile index 5d4f0015ab..86b6a1db1b 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,7 @@ ifdef IMAGE_EXTRA_TAG IMAGE_BUILD_EXTRA_OPTS += -t $(IMAGE_EXTRA_TAG) endif +PROJECT_DIR := $(shell dirname $(abspath $(lastword $(MAKEFILE_LIST)))) ARTIFACTS ?= $(PROJECT_DIR)/bin # Use distroless as minimal base image to package the manager binary # Refer to https://github.com/GoogleContainerTools/distroless for more details @@ -212,6 +213,51 @@ run-test-multikueue-e2e-%: FORCE @echo Running multikueue e2e for k8s ${K8S_VERSION} E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) ./hack/multikueue-e2e-test.sh +SCALABILITY_RUNNER := $(ARTIFACTS)/scalability-runner +.PHONY: scalability-runner +scalability-runner: + $(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(SCALABILITY_RUNNER) test/scalability/runner/main.go + +.PHONY: minimalkueue +minimalkueue: + $(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(ARTIFACTS)/minimalkueue test/scalability/minimalkueue/main.go + +ifdef SCALABILITY_CPU_PROFILE +SCALABILITY_EXTRA_ARGS += --withCPUProfile=true +endif + +ifdef SCALABILITY_KUEUE_LOGS +SCALABILITY_EXTRA_ARGS += --withLogs=true --logToFile=true +endif + +SCALABILITY_GENERATOR_CONFIG ?= $(PROJECT_DIR)/test/scalability/default_generator_config.yaml + +SCALABILITY_RUN_DIR := $(ARTIFACTS)/run-scalability +.PHONY: run-scalability +run-scalability: envtest scalability-runner minimalkueue + mkdir -p $(SCALABILITY_RUN_DIR) + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \ + $(SCALABILITY_RUNNER) \ + --o $(SCALABILITY_RUN_DIR) \ + --crds=$(PROJECT_DIR)/config/components/crd/bases \ + --generatorConfig=$(SCALABILITY_GENERATOR_CONFIG) \ + --minimalKueue=$(ARTIFACTS)/minimalkueue $(SCALABILITY_EXTRA_ARGS) + +.PHONY: test-scalability +test-scalability: gotestsum run-scalability + $(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.xml -- $(GO_TEST_FLAGS) ./test/scalability/checker \ + --cmdStats=$(SCALABILITY_RUN_DIR)/minimalkueue.stats.yaml \ + --range=$(PROJECT_DIR)/test/scalability/default_rangespec.yaml + +.PHONY: run-scalability-in-cluster +run-scalability-in-cluster: envtest scalability-runner + mkdir -p $(ARTIFACTS)/run-scalability-in-cluster + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \ + $(SCALABILITY_RUNNER) \ + --o $(ARTIFACTS)/run-scalability-in-cluster \ + --generatorConfig=$(SCALABILITY_GENERATOR_CONFIG) \ + --qps=1000 --burst=2000 --timeout=15m + .PHONY: ci-lint ci-lint: golangci-lint $(GOLANGCI_LINT) run --timeout 15m0s @@ -359,7 +405,6 @@ importer-image: PLATFORMS=linux/amd64 importer-image: PUSH=--load importer-image: importer-image-build -PROJECT_DIR := $(shell dirname $(abspath $(lastword $(MAKEFILE_LIST)))) GOLANGCI_LINT = $(PROJECT_DIR)/bin/golangci-lint .PHONY: golangci-lint golangci-lint: ## Download golangci-lint locally if necessary. diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 8653eae92b..2ea91fe961 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -627,6 +627,14 @@ func (c *ClusterQueueWrapper) StopPolicy(p kueue.StopPolicy) *ClusterQueueWrappe return c } +func (c *ClusterQueueWrapper) Label(k, v string) *ClusterQueueWrapper { + if c.Labels == nil { + c.Labels = make(map[string]string) + } + c.Labels[k] = v + return c +} + // Condition sets a condition on the ClusterQueue. func (c *ClusterQueueWrapper) Condition(conditionType string, status metav1.ConditionStatus, reason, message string) *ClusterQueueWrapper { apimeta.SetStatusCondition(&c.Status.Conditions, metav1.Condition{ diff --git a/test/scalability/README.md b/test/scalability/README.md new file mode 100644 index 0000000000..267d8b8f39 --- /dev/null +++ b/test/scalability/README.md @@ -0,0 +1,64 @@ +# Scalability test + +Is a test meant to detect regressions int the Kueue's overall scheduling capabilities. + +# Components +In order to achieve this the following components are used: + +## Runner + +An application able to: +- generate a set of Kueue specific objects based on a config following the schema of [default_generator_config](`./default_generator_config.yaml`) +- mimic the execution of the workloads +- monitor the created object and generate execution statistics based on the received events + +Optionally it's able to run an instance of [minimalkueue](#MinimalKueue) in a dedicated [envtest](https://book.kubebuilder.io/reference/envtest.html) environment. + +## MinimalKueue + +A light version of the Kueue's controller manager consisting only of the core controllers and the scheduler. + +It is designed to offer the Kueue scheduling capabilities without any additional components which may flood the optional cpu profiles taken during it's execution. + + +## Checker + +Checks the results of a scalability against a set of expected value defined as [default_rangespec](./default_rangespec.yaml). + +# Usage + +## Run in an existing cluster + +```bash +make run-scalability-in-cluster +``` + +Will run a scalability scenario against an existing cluster (connectable by the hosts default kubeconfig), and store the resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability-in-cluster`. + +The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml` + +Check [installation guide](https://kueue.sigs.k8s.io/docs/installation) for cluster and [observability](https://kueue.sigs.k8s.io/docs/installation/#add-metrics-scraping-for-prometheus-operator). + +## Run with minimalkueue + +```bash +make run-scalability +``` + +Will run a scalability scenario against an [envtest](https://book.kubebuilder.io/reference/envtest.html) environment +and an instance of minimalkueue. +The resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability`. + +The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml` + +Setting `SCALABILITY_CPU_PROFILE=1` will generate a cpuprofile of minimalkueue in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.cpu.prof` + +Setting `SCALABILITY_KUEUE_LOGS=1` will save the logs of minimalkueue in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.out.log` and `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.err.log` + +## Run scalability test + +```bash +make test-scalability +``` + +Runs the scalability with minimalkueue and checks the results against `$(PROJECT_DIR)/test/scalability/default_rangespec.yaml` diff --git a/test/scalability/checker/checker_test.go b/test/scalability/checker/checker_test.go new file mode 100644 index 0000000000..e5db019986 --- /dev/null +++ b/test/scalability/checker/checker_test.go @@ -0,0 +1,80 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checker + +import ( + "flag" + "os" + "testing" + + "sigs.k8s.io/yaml" + + "sigs.k8s.io/kueue/test/scalability/runner/stats" +) + +var ( + cmdStatsFile = flag.String("cmdStats", "", "command stats yaml file") + rangeFile = flag.String("range", "", "expectations range file") +) + +type RangeSpec struct { + Cmd struct { + MaxWallMs int64 `json:"maxWallMs"` + MaxUserMs int64 `json:"maxUserMs"` + MaxSysMs int64 `json:"maxSysMs"` + Maxrss uint64 `json:"maxrss"` + } `json:"cmd"` +} + +func TestScalability(t *testing.T) { + cmdStatsBytes, err := os.ReadFile(*cmdStatsFile) + if err != nil { + t.Fatalf("Unable to read command stats: %s", err) + } + + cmdStats := stats.CmdStats{} + err = yaml.UnmarshalStrict(cmdStatsBytes, &cmdStats) + if err != nil { + t.Fatalf("Unable to unmarshal command stats: %s", err) + } + + rangeBytes, err := os.ReadFile(*rangeFile) + if err != nil { + t.Fatalf("Unable to read range spec: %s", err) + } + + rangeSpec := RangeSpec{} + err = yaml.UnmarshalStrict(rangeBytes, &rangeSpec) + if err != nil { + t.Fatalf("Unable to unmarshal range spec: %s", err) + } + + t.Run("CommandStats", func(t *testing.T) { + if cmdStats.WallMs > rangeSpec.Cmd.MaxWallMs { + t.Errorf("Wall time %dms is grater than maximum expected %dms", cmdStats.WallMs, rangeSpec.Cmd.MaxWallMs) + } + if cmdStats.UserMs > rangeSpec.Cmd.MaxUserMs { + t.Errorf("User time %dms is grater than maximum expected %dms", cmdStats.UserMs, rangeSpec.Cmd.MaxUserMs) + } + if cmdStats.SysMs > rangeSpec.Cmd.MaxSysMs { + t.Errorf("Sys time %dms is grater than maximum expected %dms", cmdStats.SysMs, rangeSpec.Cmd.MaxSysMs) + } + if cmdStats.Maxrss > int64(rangeSpec.Cmd.Maxrss) { + t.Errorf("Maxrss %dKib is grater than maximum expected %dKib", cmdStats.Maxrss, rangeSpec.Cmd.Maxrss) + } + }) +} diff --git a/test/scalability/default_generator_config.yaml b/test/scalability/default_generator_config.yaml new file mode 100644 index 0000000000..007e27b6f9 --- /dev/null +++ b/test/scalability/default_generator_config.yaml @@ -0,0 +1,31 @@ +- className: cohort + count: 5 + queuesSets: + - className: cq + count: 6 + nominalQuota: 20 + borrowingLimit: 100 + reclaimWithinCohort: Any + withinClusterQueue: LowerPriority + workloadsSets: + - count: 350 + creationIntervalMs: 100 + workloads: + - className: small + runtimeMs: 200 + priority: 50 + request: 1 + - count: 100 + creationIntervalMs: 500 + workloads: + - className: medium + runtimeMs: 500 + priority: 100 + request: 5 + - count: 50 + creationIntervalMs: 1200 + workloads: + - className: large + runtimeMs: 1000 + priority: 200 + request: 20 diff --git a/test/scalability/default_rangespec.yaml b/test/scalability/default_rangespec.yaml new file mode 100644 index 0000000000..27dbb9afd9 --- /dev/null +++ b/test/scalability/default_rangespec.yaml @@ -0,0 +1,7 @@ +# Until we have a clear picture on how the setup +# performs in CI keep the values "very relaxed" +cmd: + maxWallMs: 3600_000 #ih + maxUserMs: 3600_000 + maxSysMs: 3600_000 + maxrss: 1024_000 #1000MiB diff --git a/test/scalability/minimalkueue/main.go b/test/scalability/minimalkueue/main.go new file mode 100644 index 0000000000..1ba148560a --- /dev/null +++ b/test/scalability/minimalkueue/main.go @@ -0,0 +1,175 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "os" + "os/signal" + "runtime/pprof" + "syscall" + + zaplog "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + crconfig "sigs.k8s.io/controller-runtime/pkg/config" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" +) + +var ( + cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(kueue.AddToScheme(scheme)) + utilruntime.Must(kueuealpha.AddToScheme(scheme)) + utilruntime.Must(configapi.AddToScheme(scheme)) +} + +func main() { + os.Exit(mainWithExitCode()) +} + +func mainWithExitCode() int { + opts := zap.Options{ + TimeEncoder: zapcore.RFC3339NanoTimeEncoder, + ZapOpts: []zaplog.Option{zaplog.AddCaller()}, + Development: true, + Level: zaplog.NewAtomicLevelAt(zapcore.ErrorLevel), + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + log := zap.New(zap.UseFlagOptions(&opts)) + + ctrl.SetLogger(log) + log.Info("Start") + + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Error(err, "Could not create CPU profile") + } + defer f.Close() // error handling omitted for example + if err := pprof.StartCPUProfile(f); err != nil { + log.Error(err, "Could not start CPU profile") + } + defer func() { + log.Info("Stop CPU profile") + pprof.StopCPUProfile() + }() + } + + kubeConfig, err := ctrl.GetConfig() + if err != nil { + log.Error(err, "get kubeconfig") + return 1 + } + + // based on the default config + kubeConfig.QPS = 50 + kubeConfig.Burst = 100 + log.Info("K8S Client", "Host", kubeConfig.Host, "qps", kubeConfig.QPS, "burst", kubeConfig.Burst) + + // based on the default config + options := ctrl.Options{ + Scheme: scheme, + Controller: crconfig.Controller{ + GroupKindConcurrency: map[string]int{ + kueue.GroupVersion.WithKind("Workload").GroupKind().String(): 5, + kueue.GroupVersion.WithKind("LocalQueue").GroupKind().String(): 1, + kueue.GroupVersion.WithKind("ClusterQueue").GroupKind().String(): 1, + kueue.GroupVersion.WithKind("ResourceFlavor").GroupKind().String(): 1, + }, + }, + Metrics: metricsserver.Options{ + BindAddress: "0", + }, + } + mgr, err := ctrl.NewManager(kubeConfig, options) + if err != nil { + log.Error(err, "Unable to created manager") + return 1 + } + + ctx, cancel := context.WithCancel(ctrl.LoggerInto(context.Background(), log)) + defer cancel() + go func() { + done := make(chan os.Signal, 2) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + <-done + log.Info("Cancel the manager's context") + cancel() + }() + + err = indexer.Setup(ctx, mgr.GetFieldIndexer()) + if err != nil { + log.Error(err, "Indexer setup") + return 1 + } + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + go queues.CleanUpOnContext(ctx) + go cCache.CleanUpOnContext(ctx) + + if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &configapi.Configuration{}); err != nil { + log.Error(err, "Unable to create controller", "controller", failedCtrl) + return 1 + } + + sched := scheduler.New( + queues, + cCache, + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.AdmissionName), + ) + + if err := mgr.Add(sched); err != nil { + log.Error(err, "Unable to add scheduler to manager") + return 1 + } + + log.Info("Starting manager") + if err := mgr.Start(ctx); err != nil { + log.Error(err, "Could not run manager") + return 1 + } + + log.Info("Done") + return 0 +} diff --git a/test/scalability/runner/controller/controller.go b/test/scalability/runner/controller/controller.go new file mode 100644 index 0000000000..b0ae36f4fd --- /dev/null +++ b/test/scalability/runner/controller/controller.go @@ -0,0 +1,174 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "strconv" + "sync" + "time" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/test/scalability/runner/generator" + "sigs.k8s.io/kueue/test/scalability/runner/recorder" +) + +type reconciler struct { + client client.Client + // todo add the recorder here + atLock sync.RWMutex + admissionTime map[types.UID]time.Time + recorder *recorder.Recorder +} + +func (r *reconciler) getAdmittedTime(uid types.UID) (time.Time, bool) { + r.atLock.RLock() + defer r.atLock.RUnlock() + t, ok := r.admissionTime[uid] + return t, ok +} + +func (r *reconciler) setAdmittedTime(uid types.UID, admitted bool) { + _, found := r.getAdmittedTime(uid) + if found != admitted { + r.atLock.Lock() + defer r.atLock.Unlock() + if admitted { + r.admissionTime[uid] = time.Now() + } else { + delete(r.admissionTime, uid) + } + } +} + +var _ reconcile.Reconciler = (*reconciler)(nil) +var _ predicate.Predicate = (*reconciler)(nil) + +func (r *reconciler) Create(ev event.CreateEvent) bool { + _, isWl := (ev.Object).(*kueue.Workload) + return !isWl +} + +func (r *reconciler) Delete(_ event.DeleteEvent) bool { + // ignore delete + return false +} + +func (r *reconciler) Update(ev event.UpdateEvent) bool { + wl, isWl := (ev.ObjectNew).(*kueue.Workload) + if !isWl { + return true + } + admitted := apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) + r.setAdmittedTime(wl.UID, admitted) + + return admitted && !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) +} + +func (r *reconciler) Generic(_ event.GenericEvent) bool { + // ignore generic + return false +} + +func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + var wl kueue.Workload + if err := r.client.Get(ctx, req.NamespacedName, &wl); err != nil { + // we'll ignore not-found errors, since there is nothing to do. + return ctrl.Result{}, client.IgnoreNotFound(err) + } + log := ctrl.LoggerFrom(ctx) + // this should only: + // 1. finish the workloads eviction + if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) { + _ = workload.UnsetQuotaReservationWithCondition(&wl, "Pending", "Evicted by the test runner") + err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) + if err == nil { + log.V(5).Info("Finish eviction") + } + return reconcile.Result{}, err + } + + // 2. finish the workload once it's time is up + if cond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadAdmitted); cond != nil && cond.Status == metav1.ConditionTrue { + admittedTime, ok := r.getAdmittedTime(wl.UID) + if !ok { + admittedTime = cond.LastTransitionTime.Time + } + runningMsLabel := wl.Labels[generator.RunningTimeLabel] + runningMs, err := strconv.Atoi(runningMsLabel) + if err != nil { + log.V(2).Error(err, "cannot parse running time", "using", runningMs) + } + runningDuration := time.Millisecond * time.Duration(runningMs) + activeFor := time.Since(admittedTime) + remaining := runningDuration - activeFor + if remaining > 0 { + return reconcile.Result{RequeueAfter: remaining}, nil + } else { + err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName) + if err == nil { + log.V(5).Info("Finish Workload") + } + return reconcile.Result{}, err + } + } + return reconcile.Result{}, nil +} + +func NewReconciler(c client.Client, r *recorder.Recorder) *reconciler { + return &reconciler{ + client: c, + admissionTime: map[types.UID]time.Time{}, + recorder: r, + } +} + +func (r *reconciler) SetupWithManager(mgr ctrl.Manager) error { + cqHandler := handler.Funcs{ + CreateFunc: func(_ context.Context, ev event.CreateEvent, _ workqueue.RateLimitingInterface) { + if cq, isCq := ev.Object.(*kueue.ClusterQueue); isCq { + r.recorder.RecordCQStatus(cq) + } + }, + UpdateFunc: func(_ context.Context, ev event.UpdateEvent, _ workqueue.RateLimitingInterface) { + if cq, isCq := ev.ObjectNew.(*kueue.ClusterQueue); isCq { + r.recorder.RecordCQStatus(cq) + } + }, + } + return ctrl.NewControllerManagedBy(mgr). + For(&kueue.Workload{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). + Watches(&kueue.ClusterQueue{}, cqHandler). + WithEventFilter(r). + Complete(r) +} diff --git a/test/scalability/runner/generator/generator.go b/test/scalability/runner/generator/generator.go new file mode 100644 index 0000000000..b0914ee5e4 --- /dev/null +++ b/test/scalability/runner/generator/generator.go @@ -0,0 +1,235 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generator + +import ( + "context" + "errors" + "fmt" + "os" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +const ( + resourceFlavorName = "rf" + RunningTimeLabel = "kueue.x-k8s.io/scalability-running-ms" + ClassLabel = "kueue.x-k8s.io/scalability-class" + CleanupLabel = "kueue.x-k8s.io/scalability-cleanup" +) + +type WorkloadTemplate struct { + ClassName string `json:"className"` + RuntimeMs uint `json:"runtimeMs"` + Priority int32 `json:"priority"` + Request string `json:"request"` +} + +type WorkloadsSet struct { + Count int `json:"count"` + CreationIntervalMs uint `json:"creationIntervalMs"` + Workloads []WorkloadTemplate `json:"workloads"` +} + +type QueuesSet struct { + ClassName string `json:"className"` + Count int `json:"count"` + NominalQuota string `json:"nominalQuota"` + BorrowingLimit string `json:"borrowingLimit"` + ReclaimWithinCohort kueue.PreemptionPolicy `json:"reclaimWithinCohort"` + WithinClusterQueue kueue.PreemptionPolicy `json:"withinClusterQueue"` + WorkloadsSets []WorkloadsSet `json:"workloadsSets"` +} + +type CohortSet struct { + ClassName string `json:"className"` + Count int `json:"count"` + QueuesSets []QueuesSet `json:"queuesSets"` +} + +func LoadConfig(mappingFile string) ([]CohortSet, error) { + cohorts := []CohortSet{} + yamlFile, err := os.ReadFile(mappingFile) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(yamlFile, &cohorts) + if err != nil { + return nil, fmt.Errorf("decoding %q: %w", mappingFile, err) + } + return cohorts, nil +} + +func concurrent[T any](set T, count func(T) int, call func(int) error) error { + wg := sync.WaitGroup{} + errCh := make(chan error) + var errs []error + go func() { + for err := range errCh { + errs = append(errs, err) + } + }() + total := count(set) + wg.Add(total) + for i := 0; i < total; i++ { + go func() { + defer wg.Done() + err := call(i) + if err != nil { + errCh <- err + } + }() + } + wg.Wait() + close(errCh) + return errors.Join(errs...) +} +func generateWlSet(ctx context.Context, c client.Client, wlSet WorkloadsSet, namespace string, localQueue string, wlSetIdx int) error { + delay := time.Duration(wlSet.CreationIntervalMs) * time.Millisecond + log := ctrl.LoggerFrom(ctx).WithName("generate workload group").WithValues("namespace", namespace, "localQueue", localQueue, "delay", delay) + log.Info("Start generation") + defer log.Info("End generation") + + for si := 0; si < wlSet.Count; si++ { + for i, wlt := range wlSet.Workloads { + <-time.After(delay) + wl := utiltesting.MakeWorkload(fmt.Sprintf("%s-%d-%d-%d", wlt.ClassName, wlSetIdx, si, i), namespace). + Queue(localQueue). + Request(corev1.ResourceCPU, wlt.Request). + Label(RunningTimeLabel, fmt.Sprintf("%d", wlt.RuntimeMs)). + Label(ClassLabel, wlt.ClassName). + Priority(wlt.Priority). + Obj() + err := c.Create(ctx, wl) + if err != nil { + return err + } + } + } + return nil +} + +func generateQueue(ctx context.Context, c client.Client, qSet QueuesSet, cohortName string, queueSetIdx int, queueIndex int) error { + log := ctrl.LoggerFrom(ctx).WithName("generate queue").WithValues("idx", queueIndex, "prefix", qSet.ClassName) + log.Info("Start generation") + defer log.Info("End generation") + cq := utiltesting.MakeClusterQueue(fmt.Sprintf("%s-%d-%d-%s", qSet.ClassName, queueSetIdx, queueIndex, cohortName)). + Cohort(cohortName). + ResourceGroup(*utiltesting.MakeFlavorQuotas(resourceFlavorName). + Resource(corev1.ResourceCPU, qSet.NominalQuota, qSet.BorrowingLimit).Obj()). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: qSet.ReclaimWithinCohort, + WithinClusterQueue: qSet.WithinClusterQueue, + }). + Label(ClassLabel, qSet.ClassName). + Label(CleanupLabel, "true"). + Obj() + err := c.Create(ctx, cq) + if err != nil { + return err + } + + ns := &corev1.Namespace{} + ns.Name = cq.Name + ns.Labels = map[string]string{CleanupLabel: "true"} + err = c.Create(ctx, ns) + if err != nil { + return err + } + + lq := utiltesting.MakeLocalQueue(cq.Name, ns.Name).ClusterQueue(cq.Name).Obj() + err = c.Create(ctx, lq) + if err != nil { + return err + } + + return concurrent(qSet.WorkloadsSets, func(wlSets []WorkloadsSet) int { return len(wlSets) }, func(wlSetIdx int) error { + return generateWlSet(ctx, c, qSet.WorkloadsSets[wlSetIdx], ns.Name, lq.Name, wlSetIdx) + }) +} + +func generateQueueSet(ctx context.Context, c client.Client, qSet QueuesSet, cohortName string, queueSetIdx int) error { + log := ctrl.LoggerFrom(ctx).WithName("generate queue set").WithValues("count", qSet.Count, "prefix", qSet.ClassName) + log.Info("Start generation") + defer log.Info("End generation") + return concurrent(qSet, func(qs QueuesSet) int { return qs.Count }, func(idx int) error { + return generateQueue(ctx, c, qSet, cohortName, queueSetIdx, idx) + }) +} + +func generateCohort(ctx context.Context, c client.Client, cSet CohortSet, cohortIdx int) error { + log := ctrl.LoggerFrom(ctx).WithName("generate cohort").WithValues("idx", cohortIdx, "prefix", cSet.ClassName) + log.Info("Start generation") + defer log.Info("End generation") + cohortName := fmt.Sprintf("%s-%d", cSet.ClassName, cohortIdx) + return concurrent(cSet, func(cs CohortSet) int { return len(cs.QueuesSets) }, func(idx int) error { + return generateQueueSet(ctx, c, cSet.QueuesSets[idx], cohortName, idx) + }) +} + +func generateCohortSet(ctx context.Context, c client.Client, cSet CohortSet) error { + log := ctrl.LoggerFrom(ctx).WithName("generate cohort set").WithValues("count", cSet.Count, "prefix", cSet.ClassName) + log.Info("Start generation") + defer log.Info("End generation") + return concurrent(cSet, func(cs CohortSet) int { return cs.Count }, func(idx int) error { + return generateCohort(ctx, c, cSet, idx) + }) +} + +func Generate(ctx context.Context, c client.Client, cSets []CohortSet) error { + log := ctrl.LoggerFrom(ctx).WithName("generate cohort sets").WithValues("numSets", len(cSets)) + log.Info("Start generation") + defer log.Info("End generation") + rf := utiltesting.MakeResourceFlavor(resourceFlavorName).Label(CleanupLabel, "true").Obj() + err := c.Create(ctx, rf) + if err != nil { + return err + } + return concurrent(cSets, func(s []CohortSet) int { return len(s) }, func(idx int) error { return generateCohortSet(ctx, c, cSets[idx]) }) +} + +func Cleanup(ctx context.Context, c client.Client) { + log := ctrl.LoggerFrom(ctx).WithName("cleanup") + log.Info("Start cleanup") + + nsList := corev1.NamespaceList{} + if err := c.List(ctx, &nsList, client.HasLabels{CleanupLabel}); err != nil { + log.Error(err, "Listing namespaces") + } else { + for _, ns := range nsList.Items { + if err := c.Delete(ctx, &ns); err != nil { + log.Error(err, "Deleting namespace") + } + } + } + + if err := c.DeleteAllOf(ctx, &kueue.ClusterQueue{}, client.HasLabels{CleanupLabel}); err != nil { + log.Error(err, "Deleting cluster queues") + } + + if err := c.DeleteAllOf(ctx, &kueue.ResourceFlavor{}, client.HasLabels{CleanupLabel}); err != nil { + log.Error(err, "Deleting resource flavor") + } +} diff --git a/test/scalability/runner/generator/generator_test.go b/test/scalability/runner/generator/generator_test.go new file mode 100644 index 0000000000..a5b900a5fd --- /dev/null +++ b/test/scalability/runner/generator/generator_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generator + +import ( + "os" + "path" + "testing" + + "github.com/google/go-cmp/cmp" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +func TestLoadConfig(t *testing.T) { + smallWl := WorkloadTemplate{ + ClassName: "small", + RuntimeMs: 10, + Priority: 50, + Request: "1", + } + + mediumWl := WorkloadTemplate{ + ClassName: "medium", + RuntimeMs: 50_000, + Priority: 100, + Request: "5", + } + + largeWl := WorkloadTemplate{ + ClassName: "large", + RuntimeMs: 100_000, + Priority: 200, + Request: "20", + } + + want := []CohortSet{ + { + ClassName: "cohort", + Count: 5, + QueuesSets: []QueuesSet{ + { + ClassName: "cq", + Count: 6, + NominalQuota: "20", + BorrowingLimit: "100", + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + WorkloadsSets: []WorkloadsSet{ + { + Count: 200, + CreationIntervalMs: 100, + Workloads: []WorkloadTemplate{ + smallWl, + smallWl, + smallWl, + smallWl, + smallWl, + smallWl, + smallWl, + mediumWl, + mediumWl, + largeWl, + }, + }, + }, + }, + }, + }, + } + + testContent := ` +- className: cohort + count: 5 + queuesSets: + - className: cq + count: 6 + nominalQuota: 20 + borrowingLimit: 100 + reclaimWithinCohort: Any + withinClusterQueue: LowerPriority + workloadsSets: + - count: 200 + creationIntervalMs: 100 + workloads: + - &small + className: small + runtimeMs: 10 + priority: 50 + request: 1 + - *small + - *small + - *small + - *small + - *small + - *small + - &medium + className: medium + runtimeMs: 50000 + priority: 100 + request: 5 + - *medium + - className: large + runtimeMs: 100000 + priority: 200 + request: 20 +` + tempDir := t.TempDir() + fPath := path.Join(tempDir, "config.yaml") + err := os.WriteFile(fPath, []byte(testContent), os.FileMode(0600)) + if err != nil { + t.Fatalf("unable to create the test file: %s", err) + } + + got, err := LoadConfig(fPath) + if err != nil { + t.Fatalf("unexpected load error: %s", err) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected config(want-/ got+):\n%s", diff) + } +} diff --git a/test/scalability/runner/main.go b/test/scalability/runner/main.go new file mode 100644 index 0000000000..a9fa6138f1 --- /dev/null +++ b/test/scalability/runner/main.go @@ -0,0 +1,388 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/exec" + "os/signal" + "path" + "sync" + "syscall" + "time" + + zaplog "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + crconfig "sigs.k8s.io/controller-runtime/pkg/config" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/yaml" + + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + "sigs.k8s.io/kueue/test/scalability/runner/controller" + "sigs.k8s.io/kueue/test/scalability/runner/generator" + "sigs.k8s.io/kueue/test/scalability/runner/recorder" + "sigs.k8s.io/kueue/test/scalability/runner/stats" +) + +var ( + outputDir = flag.String("o", "", "output directory") + crdsPath = flag.String("crds", "", "crds path") + generatorConfig = flag.String("generatorConfig", "", "generator config file") + timeout = flag.Duration("timeout", 10*time.Minute, "maximum record time") + qps = flag.Float64("qps", 0, "qps used by the runner clients, use default if 0") + burst = flag.Int("burst", 0, "qps used by the runner clients, use default if 0") + + // related to minimalkueue + minimalKueuePath = flag.String("minimalKueue", "", "path to minimalkueue, run in the hosts default cluster if empty") + withCpuProfile = flag.Bool("withCPUProfile", false, "generate a CPU profile for minimalkueue") + withLogs = flag.Bool("withLogs", false, "capture minimalkueue logs") + logLevel = flag.Int("withLogsLevel", 2, "set minimalkueue logs level") + logToFile = flag.Bool("logToFile", false, "capture minimalkueue logs to files") +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(kueue.AddToScheme(scheme)) + utilruntime.Must(kueuealpha.AddToScheme(scheme)) + utilruntime.Must(configapi.AddToScheme(scheme)) +} + +func main() { + opts := zap.Options{ + TimeEncoder: zapcore.RFC3339NanoTimeEncoder, + ZapOpts: []zaplog.Option{zaplog.AddCaller()}, + Development: true, + Level: zaplog.NewAtomicLevelAt(-2), + } + opts.BindFlags(flag.CommandLine) + flag.Parse() + log := zap.New(zap.UseFlagOptions(&opts)) + + ctrl.SetLogger(log) + + log.Info("Start runner", "outputDir", outputDir, "crdsPath", crdsPath) + errCh := make(chan error, 3) + wg := &sync.WaitGroup{} + ctx, ctxCancel := context.WithCancel(ctrl.LoggerInto(context.Background(), log)) + var cfg *rest.Config + + if *minimalKueuePath != "" { + testEnv := &envtest.Environment{ + CRDDirectoryPaths: []string{*crdsPath}, + ErrorIfCRDPathMissing: true, + } + + var err error + cfg, err = testEnv.Start() + if err != nil { + log.Error(err, "Starting test env") + os.Exit(1) + } + defer func() { + err := testEnv.Stop() + if err != nil { + log.Error(err, "Stopping test env") + } + }() + + // prepare the kubeconfig + kubeconfig, err := utiltesting.RestConfigToKubeConfig(cfg) + if err != nil { + log.Error(err, "Generate kubeconfig") + os.Exit(1) + } + + kubeconfigPath := path.Join(*outputDir, "kubeconfig") + err = os.WriteFile(kubeconfigPath, kubeconfig, 00660) + if err != nil { + log.Error(err, "Write kubeconfig") + os.Exit(1) + } + + // start the minimal kueue manager process + wg.Add(1) + err = runCommand(ctx, *outputDir, *minimalKueuePath, "kubeconfig", *withCpuProfile, *withLogs, *logToFile, *logLevel, errCh, wg) + if err != nil { + log.Error(err, "MinimalKueue start") + os.Exit(1) + } + } else { + var err error + cfg, err = ctrl.GetConfig() + if err != nil { + log.Error(err, "Get config") + os.Exit(1) + } + } + + if *qps > 0 { + cfg.QPS = float32(*qps) + } + + if *burst > 0 { + cfg.Burst = *burst + } + + generationDoneCh := make(chan struct{}) + wg.Add(1) + err := runGenerator(ctx, cfg, *generatorConfig, errCh, wg, generationDoneCh) + if err != nil { + log.Error(err, "Generator start") + os.Exit(1) + } + + wg.Add(1) + recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh, *timeout) + if err != nil { + log.Error(err, "Recorder start") + os.Exit(1) + } + + wg.Add(1) + err = runManager(ctx, cfg, errCh, wg, recorder) + if err != nil { + log.Error(err, "manager start") + os.Exit(1) + } + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + endWithError := false + select { + case <-done: + log.Info("Done") + case err := <-errCh: + if err != nil { + log.Error(err, "Error") + endWithError = true + } + } + ctxCancel() + wg.Wait() + + if endWithError { + os.Exit(1) + } + + if *minimalKueuePath == "" { + c, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + log.Error(err, "Create cleanup client") + } else { + generator.Cleanup(context.Background(), c) + } + } +} + +func runCommand(ctx context.Context, workDir, cmdPath, kubeconfig string, withCPUProf, withLogs, logToFile bool, logLevel int, errCh chan<- error, wg *sync.WaitGroup) error { + defer wg.Done() + log := ctrl.LoggerFrom(ctx).WithName("Run command") + + cmd := exec.CommandContext(ctx, cmdPath, "--kubeconfig", path.Join(workDir, kubeconfig)) + cmd.Cancel = func() error { + log.Info("Stop the command") + return cmd.Process.Signal(syscall.SIGINT) + } + + exe := path.Base(cmdPath) + + if withCPUProf { + cmd.Args = append(cmd.Args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe))) + } + + if withLogs { + cmd.Args = append(cmd.Args, fmt.Sprintf("--zap-log-level=%d", logLevel)) + outWriter := os.Stdout + errWriter := os.Stderr + if logToFile { + var err error + outWriter, err = os.Create(path.Join(workDir, fmt.Sprintf("%s.out.log", exe))) + if err != nil { + return err + } + defer outWriter.Close() + + errWriter, err = os.Create(path.Join(workDir, fmt.Sprintf("%s.err.log", exe))) + if err != nil { + return err + } + defer errWriter.Close() + } + cmd.Stdout = outWriter + cmd.Stderr = errWriter + } + + log.Info("Starting process", "path", cmd.Path, "args", cmd.Args) + err := cmd.Start() + if err != nil { + return err + } + startTime := time.Now() + + wg.Add(1) + go func() { + defer wg.Done() + err := cmd.Wait() + if err != nil { + select { + case <-ctx.Done(): + // nothing to do here + default: + errCh <- fmt.Errorf("command: %s", err) + } + } + + cs := stats.CmdStats{ + WallMs: time.Since(startTime).Milliseconds(), + UserMs: cmd.ProcessState.UserTime().Milliseconds(), + SysMs: cmd.ProcessState.SystemTime().Milliseconds(), + Maxrss: -1, + } + if rusage, ok := cmd.ProcessState.SysUsage().(*syscall.Rusage); ok { + cs.Maxrss = rusage.Maxrss + } + log.Info("Cmd ended", "stats", cs) + csBytes, err := yaml.Marshal(cs) + if err != nil { + log.Error(err, "Marshaling cmd stats") + return + } + + err = os.WriteFile(path.Join(workDir, fmt.Sprintf("%s.stats.yaml", exe)), csBytes, 0666) + if err != nil { + log.Error(err, "Writing cmd stats") + } + }() + return nil +} + +func runGenerator(ctx context.Context, cfg *rest.Config, generatorConfig string, errCh chan<- error, wg *sync.WaitGroup, genDone chan<- struct{}) error { + defer wg.Done() + + log := ctrl.LoggerFrom(ctx).WithName("Run generator") + c, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + log.Error(err, "Create generator's client") + close(genDone) + return err + } + + cohorts, err := generator.LoadConfig(generatorConfig) + if err != nil { + log.Error(err, "Loading config") + close(genDone) + return err + } + + statTime := time.Now() + wg.Add(1) + go func() { + defer wg.Done() + defer close(genDone) + err := generator.Generate(ctx, c, cohorts) + if err != nil { + log.Error(err, "generating") + errCh <- err + return + } + log.Info("Generator done", "duration", time.Since(statTime)) + }() + + log.Info("Generator started", "qps", cfg.QPS, "burst", cfg.Burst) + return nil +} + +func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}, recordTimeout time.Duration) (*recorder.Recorder, error) { + defer wg.Done() + log := ctrl.LoggerFrom(ctx).WithName("Start recorder") + //TODO: make the timeout an arg + recorder := recorder.New(recordTimeout) + wg.Add(1) + go func() { + defer wg.Done() + err := recorder.Run(ctx, genDone) + if err != nil { + log.Error(err, "Recorder run") + } else { + log.Info("Recorder done") + } + errCh <- err + }() + + log.Info("Recorder started", "timeout", recordTimeout) + return recorder, nil +} + +func runManager(ctx context.Context, cfg *rest.Config, errCh chan<- error, wg *sync.WaitGroup, r *recorder.Recorder) error { + defer wg.Done() + log := ctrl.LoggerFrom(ctx).WithName("Run manager") + + options := ctrl.Options{ + Scheme: scheme, + Controller: crconfig.Controller{ + GroupKindConcurrency: map[string]int{ + kueue.GroupVersion.WithKind("Workload").GroupKind().String(): 5, + }, + }, + Metrics: metricsserver.Options{ + BindAddress: "0", + }, + } + mgr, err := ctrl.NewManager(cfg, options) + if err != nil { + log.Error(err, "Creating manager") + return err + } + + if err := controller.NewReconciler(mgr.GetClient(), r).SetupWithManager(mgr); err != nil { + log.Error(err, "Setup controller") + return err + } + + wg.Add(1) + go func() { + defer wg.Done() + log.Info("Starting manager") + if err := mgr.Start(ctx); err != nil { + log.Error(err, "Could not run manager") + errCh <- err + } else { + log.Info("End manager") + } + }() + + log.Info("Manager started") + return nil +} diff --git a/test/scalability/runner/recorder/recorder.go b/test/scalability/runner/recorder/recorder.go new file mode 100644 index 0000000000..442f9454ca --- /dev/null +++ b/test/scalability/runner/recorder/recorder.go @@ -0,0 +1,106 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recorder + +import ( + "context" + "sync/atomic" + "time" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +type CQStatus struct { + Name string + PendingWorkloads int32 + ReservingWorkloads int32 + Active bool +} + +type Store map[string]CQStatus + +type Recorder struct { + maxRecording time.Duration + running atomic.Bool + evChan chan CQStatus + + Store Store +} + +func New(maxRecording time.Duration) *Recorder { + return &Recorder{ + maxRecording: maxRecording, + running: atomic.Bool{}, + evChan: make(chan CQStatus, 10), + Store: map[string]CQStatus{}, + } +} + +func (r *Recorder) record(ev CQStatus) { + r.Store[ev.Name] = ev +} + +func (r *Recorder) expectMoreEvents() bool { + for _, s := range r.Store { + if (s.PendingWorkloads > 0 || s.ReservingWorkloads > 0) && s.Active { + return true + } + } + return false +} + +func (r *Recorder) Run(ctx context.Context, genDone <-chan struct{}) error { + r.running.Store(true) + defer r.running.Store(false) + + generateDone := atomic.Bool{} + generateDone.Store(false) + go func() { + <-genDone + generateDone.Store(true) + }() + + ctx, cancel := context.WithTimeout(ctx, r.maxRecording) + defer cancel() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-r.evChan: + r.record(ev) + if generateDone.Load() && !r.expectMoreEvents() { + return nil + } + } + } +} + +func (r *Recorder) RecordCQStatus(cq *kueue.ClusterQueue) { + if !r.running.Load() { + return + } + + r.evChan <- CQStatus{ + Name: cq.Name, + PendingWorkloads: cq.Status.PendingWorkloads, + ReservingWorkloads: cq.Status.ReservingWorkloads, + Active: apimeta.IsStatusConditionTrue(cq.Status.Conditions, kueue.AdmissionCheckActive), + } +} diff --git a/test/scalability/runner/stats/stats.go b/test/scalability/runner/stats/stats.go new file mode 100644 index 0000000000..4d143f6210 --- /dev/null +++ b/test/scalability/runner/stats/stats.go @@ -0,0 +1,24 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +type CmdStats struct { + WallMs int64 `json:"wallMs"` + UserMs int64 `json:"userMs"` + SysMs int64 `json:"sysMs"` + Maxrss int64 `json:"maxrss"` +} From 0b1d04d404228e9952406d58b92aa8b39d33a655 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 16 Apr 2024 16:41:30 +0300 Subject: [PATCH 3/5] Review remarks. --- test/scalability/runner/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/scalability/runner/main.go b/test/scalability/runner/main.go index a9fa6138f1..2ce9f1d30f 100644 --- a/test/scalability/runner/main.go +++ b/test/scalability/runner/main.go @@ -327,7 +327,6 @@ func runGenerator(ctx context.Context, cfg *rest.Config, generatorConfig string, func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}, recordTimeout time.Duration) (*recorder.Recorder, error) { defer wg.Done() log := ctrl.LoggerFrom(ctx).WithName("Start recorder") - //TODO: make the timeout an arg recorder := recorder.New(recordTimeout) wg.Add(1) go func() { From b8767b0571a5ee1f7684e113f8178b6d7083dbad Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 17 Apr 2024 09:38:22 +0300 Subject: [PATCH 4/5] Review Remarks --- test/scalability/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/scalability/README.md b/test/scalability/README.md index 267d8b8f39..3545cd278d 100644 --- a/test/scalability/README.md +++ b/test/scalability/README.md @@ -33,7 +33,7 @@ Checks the results of a scalability against a set of expected value defined as [ make run-scalability-in-cluster ``` -Will run a scalability scenario against an existing cluster (connectable by the hosts default kubeconfig), and store the resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability-in-cluster`. +Will run a scalability scenario against an existing cluster (connectable by the host's default kubeconfig), and store the resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability-in-cluster`. The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml` From f3a7d05025f463e8409cd1fc634f59aa28be8b79 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 17 Apr 2024 17:10:58 +0300 Subject: [PATCH 5/5] Review Remarks --- test/scalability/README.md | 2 +- test/scalability/checker/checker_test.go | 8 ++++---- test/scalability/default_rangespec.yaml | 2 +- test/scalability/minimalkueue/main.go | 2 +- test/scalability/runner/controller/controller.go | 3 +-- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/test/scalability/README.md b/test/scalability/README.md index 3545cd278d..492a7edc47 100644 --- a/test/scalability/README.md +++ b/test/scalability/README.md @@ -1,6 +1,6 @@ # Scalability test -Is a test meant to detect regressions int the Kueue's overall scheduling capabilities. +Is a test meant to detect regressions in the Kueue's overall scheduling capabilities. # Components In order to achieve this the following components are used: diff --git a/test/scalability/checker/checker_test.go b/test/scalability/checker/checker_test.go index e5db019986..57aa5eff70 100644 --- a/test/scalability/checker/checker_test.go +++ b/test/scalability/checker/checker_test.go @@ -65,16 +65,16 @@ func TestScalability(t *testing.T) { t.Run("CommandStats", func(t *testing.T) { if cmdStats.WallMs > rangeSpec.Cmd.MaxWallMs { - t.Errorf("Wall time %dms is grater than maximum expected %dms", cmdStats.WallMs, rangeSpec.Cmd.MaxWallMs) + t.Errorf("Wall time %dms is greater than maximum expected %dms", cmdStats.WallMs, rangeSpec.Cmd.MaxWallMs) } if cmdStats.UserMs > rangeSpec.Cmd.MaxUserMs { - t.Errorf("User time %dms is grater than maximum expected %dms", cmdStats.UserMs, rangeSpec.Cmd.MaxUserMs) + t.Errorf("User time %dms is greater than maximum expected %dms", cmdStats.UserMs, rangeSpec.Cmd.MaxUserMs) } if cmdStats.SysMs > rangeSpec.Cmd.MaxSysMs { - t.Errorf("Sys time %dms is grater than maximum expected %dms", cmdStats.SysMs, rangeSpec.Cmd.MaxSysMs) + t.Errorf("Sys time %dms is greater than maximum expected %dms", cmdStats.SysMs, rangeSpec.Cmd.MaxSysMs) } if cmdStats.Maxrss > int64(rangeSpec.Cmd.Maxrss) { - t.Errorf("Maxrss %dKib is grater than maximum expected %dKib", cmdStats.Maxrss, rangeSpec.Cmd.Maxrss) + t.Errorf("Maxrss %dKib is greater than maximum expected %dKib", cmdStats.Maxrss, rangeSpec.Cmd.Maxrss) } }) } diff --git a/test/scalability/default_rangespec.yaml b/test/scalability/default_rangespec.yaml index 27dbb9afd9..e57e111793 100644 --- a/test/scalability/default_rangespec.yaml +++ b/test/scalability/default_rangespec.yaml @@ -1,7 +1,7 @@ # Until we have a clear picture on how the setup # performs in CI keep the values "very relaxed" cmd: - maxWallMs: 3600_000 #ih + maxWallMs: 3600_000 #1h maxUserMs: 3600_000 maxSysMs: 3600_000 maxrss: 1024_000 #1000MiB diff --git a/test/scalability/minimalkueue/main.go b/test/scalability/minimalkueue/main.go index 1ba148560a..6beaebcc85 100644 --- a/test/scalability/minimalkueue/main.go +++ b/test/scalability/minimalkueue/main.go @@ -121,7 +121,7 @@ func mainWithExitCode() int { } mgr, err := ctrl.NewManager(kubeConfig, options) if err != nil { - log.Error(err, "Unable to created manager") + log.Error(err, "Unable to create manager") return 1 } diff --git a/test/scalability/runner/controller/controller.go b/test/scalability/runner/controller/controller.go index b0ae36f4fd..9c37e121ef 100644 --- a/test/scalability/runner/controller/controller.go +++ b/test/scalability/runner/controller/controller.go @@ -43,8 +43,7 @@ import ( ) type reconciler struct { - client client.Client - // todo add the recorder here + client client.Client atLock sync.RWMutex admissionTime map[types.UID]time.Time recorder *recorder.Recorder