Skip to content

Commit

Permalink
Add README and cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Apr 5, 2024
1 parent 9ecbb13 commit effbb88
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 61 deletions.
25 changes: 16 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,9 @@ SCALABILITY_RUNNER := $(ARTIFACTS)/scalability-runner
scalability-runner:
$(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(SCALABILITY_RUNNER) test/scalability/runner/main.go

SCALABILITY_RUN_DIR := $(ARTIFACTS)/run-scalability
.PHONY: sclability-run-dir
sclability-run-dir:
mkdir -p $(SCALABILITY_RUN_DIR)

.PHONY: minimalkueue
minimalkueue: sclability-run-dir
$(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(SCALABILITY_RUN_DIR)/minimalkueue test/scalability/minimalkueue/main.go
minimalkueue:
$(GO_BUILD_ENV) $(GO_CMD) build -ldflags="$(LD_FLAGS)" -o $(ARTIFACTS)/minimalkueue test/scalability/minimalkueue/main.go

ifdef SCALABILITY_CPU_PROFILE
SCALABILITY_EXTRA_ARGS += --withCPUProfile=true
Expand All @@ -228,14 +223,18 @@ ifdef SCALABILITY_KUEUE_LOGS
SCALABILITY_EXTRA_ARGS += --withLogs=true --logToFile=true
endif

SCALABILITY_GENERATOR_CONFIG ?= $(PROJECT_DIR)/test/scalability/default_generator_config.yaml

SCALABILITY_RUN_DIR := $(ARTIFACTS)/run-scalability
.PHONY: run-scalability
run-scalability: envtest scalability-runner minimalkueue
mkdir -p $(SCALABILITY_RUN_DIR)
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \
$(SCALABILITY_RUNNER) \
--o $(SCALABILITY_RUN_DIR) \
--crds=$(PROJECT_DIR)/config/components/crd/bases \
--generatorConfig=$(PROJECT_DIR)/test/scalability/default_generator_config.yaml \
$(SCALABILITY_EXTRA_ARGS)
--generatorConfig=$(SCALABILITY_GENERATOR_CONFIG) \
--minimalKueue=$(ARTIFACTS)/minimalkueue $(SCALABILITY_EXTRA_ARGS)

.PHONY: test-scalability
test-scalability: gotestsum run-scalability
Expand All @@ -244,6 +243,14 @@ test-scalability: gotestsum run-scalability
--cmdStats=$(SCALABILITY_RUN_DIR)/minimalkueue.stats.yaml \
--range=$(PROJECT_DIR)/test/scalability/default_rangespec.yaml

.PHONY: run-scalability-in-cluster
run-scalability-in-cluster: envtest scalability-runner
mkdir -p $(ARTIFACTS)/run-scalability-in-cluster
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \
$(SCALABILITY_RUNNER) \
--o $(ARTIFACTS)/run-scalability-in-cluster \
--generatorConfig=$(SCALABILITY_GENERATOR_CONFIG) \
--qps=1000 --burst=2000 --timeout=15m

.PHONY: ci-lint
ci-lint: golangci-lint
Expand Down
71 changes: 71 additions & 0 deletions test/scalability/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Scalability test

Is a test meant to detect regressions int the Kueue's overall scheduling capabilities.

# Components
In order to achieve this the following components are used:

## MinmalKueue

A light version of the Kueue's controller manager consisting only of the core controllers and the scheduler.


## Runner

An application able to:
- generate a set of Kueue specific objects based on a config following the schema of [default_generator_config](`./default_generator_config.yaml`)
- mimic the execution of the workloads
- monitor the created object and generate execution statistics based on the received events

Optionally it's able to run an instance of [minimalkueue](#MinimalKueue) in a dedicated [envtest](https://book.kubebuilder.io/reference/envtest.html) environment.

## MinmalKueue

A light version of the Kueue's controller manager consisting only of the core controllers and the scheduler.

It is designed to offer the Kueue scheduling capabilities without any additional components which may flood the optional cpu profiles taken during it's execution.


## Checker

Checks the results of a scalability against a set of expected value defined as [default_rangespec](./default_rangespec.yaml).

# Usage

## Run in a existing cluster

```bash
make run-scalability-in-cluster
```

Will run a scalability scenario against an existing cluster (connectable by the hosts default kubeconfig), and store the resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability-in-cluster`.

The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml`

Check [installation guide](https://kueue.sigs.k8s.io/docs/installation) for cluster and [observability](https://kueue.sigs.k8s.io/docs/installation/#add-metrics-scraping-for-prometheus-operator).

## Run with minimalkueue

```bash
make run-scalability
```

Will run a scalability scenario against an [envtest](https://book.kubebuilder.io/reference/envtest.html) environment
and an instance of minimalkueue.
The resulting artifacts are stored in `$(PROJECT_DIR)/bin/run-scalability`.

The generation config to be used can be set in `SCALABILITY_GENERATOR_CONFIG` by default using `$(PROJECT_DIR)/test/scalability/default_generator_config.yaml`

Setting `SCALABILITY_CPU_PROFILE=1` will generate a cpuprofile of minimalkueue in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.cpu.prof`

Setting `SCALABILITY_KUEUE_METRICS=1` will dump a set kueue metrics in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.metrics.yaml`

Setting `SCALABILITY_KUEUE_LOGS=1` will save the logs of minimalkueue in `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.out.log` and `$(PROJECT_DIR)/bin/run-scalability/minimalkueue.err.log`

## Run scalability test

```bash
make test-scalability
```

Runs the scalability with minimalkueue and checks the results against `$(PROJECT_DIR)/test/scalability/default_rangespec.yaml`
6 changes: 3 additions & 3 deletions test/scalability/default_generator_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
reclaimWithinCohort: Any
withinClusterQueue: LowerPriority
workloadsSets:
- count: 231
- count: 350
msBetweenWlCreation: 100
workloads:
- className: small
msRunning: 200 # will eventually queue
priority: 50
request: 1
- count: 66
- count: 100
msBetweenWlCreation: 500
workloads:
- className: medium
msRunning: 500 # will eventually be evicted
priority: 100
request: 5
- count: 33
- count: 50
msBetweenWlCreation: 1200
workloads:
- className: large
Expand Down
18 changes: 8 additions & 10 deletions test/scalability/minimalkueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,15 @@ func main() {
}

func mainWithExitCode() int {
flag.Parse()
opts := func(o *zap.Options) {
o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder
o.ZapOpts = []zaplog.Option{zaplog.AddCaller()}
opts := zap.Options{
TimeEncoder: zapcore.RFC3339NanoTimeEncoder,
ZapOpts: []zaplog.Option{zaplog.AddCaller()},
Development: true,
Level: zaplog.NewAtomicLevelAt(zapcore.ErrorLevel),
}

log := zap.New(
zap.WriteTo(os.Stdout),
zap.UseDevMode(true),
zap.Level(zapcore.Level(-1)),
opts)
opts.BindFlags(flag.CommandLine)
flag.Parse()
log := zap.New(zap.UseFlagOptions(&opts))

ctrl.SetLogger(log)
log.Info("Start")
Expand Down
29 changes: 28 additions & 1 deletion test/scalability/runner/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
resourceFlavorName = "rf"
RunningTimeLabel = "kueue.x-k8s.io/scalability-running-ms"
ClassLabel = "kueue.x-k8s.io/scalability-class"
CleanupLabel = "kueue.x-k8s.io/scalability-cleanup"
)

type WorkloadTemplate struct {
Expand Down Expand Up @@ -143,6 +144,7 @@ func generateQueue(ctx context.Context, c client.Client, qSet QueuesSet, cohortN
WithinClusterQueue: qSet.WithinClusterQueue,
}).
Label(ClassLabel, qSet.ClassName).
Label(CleanupLabel, "true").
Obj()
err := c.Create(ctx, cq)
if err != nil {
Expand All @@ -151,6 +153,7 @@ func generateQueue(ctx context.Context, c client.Client, qSet QueuesSet, cohortN

ns := &corev1.Namespace{}
ns.Name = cq.Name
ns.Labels = map[string]string{CleanupLabel: "true"}
err = c.Create(ctx, ns)
if err != nil {
return err
Expand Down Expand Up @@ -199,10 +202,34 @@ func Generate(ctx context.Context, c client.Client, cSets []CohortSet) error {
log := ctrl.LoggerFrom(ctx).WithName("generate cohort sets").WithValues("numSets", len(cSets))
log.Info("Start generation")
defer log.Info("End generation")
rf := utiltesting.MakeResourceFlavor(resourceFlavorName).Obj()
rf := utiltesting.MakeResourceFlavor(resourceFlavorName).Label(CleanupLabel, "true").Obj()
err := c.Create(ctx, rf)
if err != nil {
return err
}
return concurrent(cSets, func(s []CohortSet) int { return len(s) }, func(idx int) error { return generateCohortSet(ctx, c, cSets[idx]) })
}

func Cleanup(ctx context.Context, c client.Client) {
log := ctrl.LoggerFrom(ctx).WithName("cleanup")
log.Info("Start cleanup")

nsList := corev1.NamespaceList{}
if err := c.List(ctx, &nsList, client.HasLabels{CleanupLabel}); err != nil {
log.Error(err, "Listing namespaces")
} else {
for _, ns := range nsList.Items {
if err := c.Delete(ctx, &ns); err != nil {
log.Error(err, "Deleting namespace")
}
}
}

if err := c.DeleteAllOf(ctx, &kueue.ClusterQueue{}, client.HasLabels{CleanupLabel}); err != nil {
log.Error(err, "Deleting cluster queues")
}

if err := c.DeleteAllOf(ctx, &kueue.ResourceFlavor{}, client.HasLabels{CleanupLabel}); err != nil {
log.Error(err, "Deleting resource flavor")
}
}
85 changes: 47 additions & 38 deletions test/scalability/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,20 @@ import (
)

var (
outputDir = flag.String("o", "", "output directory")
crdsPath = flag.String("crds", "", "crds path")
withCpuProfile = flag.Bool("withCPUProfile", false, "generate a CPU profile for minimalkueue")
withKueueMetrics = flag.Bool("withKueueMetrics", false, "save minimalkueue metrics")
withLogs = flag.Bool("withLogs", false, "capture minimalkueue logs")
logToFile = flag.Bool("logToFile", false, "capture minimalkueue logs to files")
generatorConfig = flag.String("generatorConfig", "", "generator config file")
useExistingCluster = flag.Bool("useexistingcluster", false, "use an existing cluster")
qps = flag.Float64("qps", 0, "qps used by the runner clients, use default if 0")
burst = flag.Int("burst", 0, "qps used by the runner clients, use default if 0")
outputDir = flag.String("o", "", "output directory")
crdsPath = flag.String("crds", "", "crds path")
generatorConfig = flag.String("generatorConfig", "", "generator config file")
timeout = flag.Duration("timeout", 10*time.Minute, "maximum record time")
qps = flag.Float64("qps", 0, "qps used by the runner clients, use default if 0")
burst = flag.Int("burst", 0, "qps used by the runner clients, use default if 0")

// related to minimalkueue
minimalKueuePath = flag.String("minimalKueue", "", "path to minimalkueue, run in the hosts default cluster if empty")
withCpuProfile = flag.Bool("withCPUProfile", false, "generate a CPU profile for minimalkueue")
withKueueMetrics = flag.Bool("withKueueMetrics", false, "save minimalkueue metrics")
withLogs = flag.Bool("withLogs", false, "capture minimalkueue logs")
logLevel = flag.Int("withLogsLevel", 2, "set minimalkueue logs level")
logToFile = flag.Bool("logToFile", false, "capture minimalkueue logs to files")
)

var (
Expand All @@ -77,16 +81,15 @@ func init() {
}

func main() {
flag.Parse()
opts := func(o *zap.Options) {
o.TimeEncoder = zapcore.RFC3339NanoTimeEncoder
o.ZapOpts = []zaplog.Option{zaplog.AddCaller()}
opts := zap.Options{
TimeEncoder: zapcore.RFC3339NanoTimeEncoder,
ZapOpts: []zaplog.Option{zaplog.AddCaller()},
Development: true,
Level: zaplog.NewAtomicLevelAt(-2),
}
log := zap.New(
zap.WriteTo(os.Stdout),
zap.UseDevMode(true),
zap.Level(zapcore.Level(-3)),
opts)
opts.BindFlags(flag.CommandLine)
flag.Parse()
log := zap.New(zap.UseFlagOptions(&opts))

ctrl.SetLogger(log)

Expand All @@ -96,7 +99,7 @@ func main() {
ctx, ctxCancel := context.WithCancel(ctrl.LoggerInto(context.Background(), log))
var cfg *rest.Config

if !*useExistingCluster {
if *minimalKueuePath != "" {
testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{*crdsPath},
ErrorIfCRDPathMissing: true,
Expand Down Expand Up @@ -131,7 +134,7 @@ func main() {

// start the minimal kueue manager process
wg.Add(1)
err = runCommand(ctx, *outputDir, "minimalkueue", "kubeconfig", *withCpuProfile, *withLogs, *logToFile, *withKueueMetrics, errCh, wg)
err = runCommand(ctx, *outputDir, *minimalKueuePath, "kubeconfig", *withCpuProfile, *withLogs, *logToFile, *logLevel, *withKueueMetrics, errCh, wg)
if err != nil {
log.Error(err, "MinimalKueue start")
os.Exit(1)
Expand Down Expand Up @@ -162,7 +165,7 @@ func main() {
}

wg.Add(1)
recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh)
recorder, err := startRecorder(ctx, errCh, wg, generationDoneCh, *timeout)
if err != nil {
log.Error(err, "Recorder start")
os.Exit(1)
Expand Down Expand Up @@ -209,33 +212,39 @@ func main() {
log.Error(err, "Writing wl csv")
os.Exit(1)
}

if *minimalKueuePath == "" {
c, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
log.Error(err, "Create cleanup client")
} else {
generator.Cleanup(context.Background(), c)
}
}
}

func runCommand(ctx context.Context, workDir, exe, kubeconfig string, withCPUProf, withLogs, logToFile, withMetrics bool, errCh chan<- error, wg *sync.WaitGroup) error {
func runCommand(ctx context.Context, workDir, cmdPath, kubeconfig string, withCPUProf, withLogs, logToFile bool, logLevel int, withMetrics bool, errCh chan<- error, wg *sync.WaitGroup) error {
defer wg.Done()
log := ctrl.LoggerFrom(ctx).WithName("Run command")

cmdPath := path.Join(workDir, exe)

args := []string{
"--kubeconfig", path.Join(workDir, kubeconfig),
cmd := exec.CommandContext(ctx, cmdPath, "--kubeconfig", path.Join(workDir, kubeconfig))
cmd.Cancel = func() error {
log.Info("Stop the command")
return cmd.Process.Signal(syscall.SIGINT)
}

exe := path.Base(cmdPath)

if withCPUProf {
args = append(args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe)))
cmd.Args = append(cmd.Args, "--cpuprofile", path.Join(workDir, fmt.Sprintf("%s.cpu.prof", exe)))
}

if withMetrics {
args = append(args, "--metricsfile", path.Join(workDir, fmt.Sprintf("%s.metrics.yaml", exe)))
}

cmd := exec.CommandContext(ctx, cmdPath, args...)
cmd.Cancel = func() error {
log.Info("Stop the command")
return cmd.Process.Signal(syscall.SIGINT)
cmd.Args = append(cmd.Args, "--metricsfile", path.Join(workDir, fmt.Sprintf("%s.metrics.yaml", exe)))
}

if withLogs {
cmd.Args = append(cmd.Args, fmt.Sprintf("--zap-log-level=%d", logLevel))
outWriter := os.Stdout
errWriter := os.Stderr
if logToFile {
Expand Down Expand Up @@ -336,11 +345,11 @@ func runGenerator(ctx context.Context, cfg *rest.Config, generatorConfig string,
return nil
}

func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}) (*recorder.Recorder, error) {
func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup, genDone <-chan struct{}, recordTimeout time.Duration) (*recorder.Recorder, error) {
defer wg.Done()
log := ctrl.LoggerFrom(ctx).WithName("Start recorder")
//TODO: make the timeout an arg
recorder := recorder.New(30 * time.Minute)
recorder := recorder.New(recordTimeout)
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -353,7 +362,7 @@ func startRecorder(ctx context.Context, errCh chan<- error, wg *sync.WaitGroup,
errCh <- err
}()

log.Info("Recorder started ")
log.Info("Recorder started", "timeout", recordTimeout)
return recorder, nil
}

Expand Down

0 comments on commit effbb88

Please sign in to comment.