Skip to content

Commit

Permalink
Merge pull request kubernetes#116584 from justinsb/parallel_discovery
Browse files Browse the repository at this point in the history
kubectl prunev2: issue discovery requests in parallel
  • Loading branch information
k8s-ci-robot committed Mar 15, 2023
2 parents 815b1bf + 82eee59 commit 2287225
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions staging/src/k8s.io/kubectl/pkg/cmd/apply/applyset_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package apply
import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,8 +66,17 @@ func (p *PruneObject) String() string {
// FindAllObjectsToPrune returns the list of objects that will be pruned.
// Calling this instead of Prune can be useful for dry-run / diff behaviour.
func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dynamic.Interface, visitedUids sets.Set[types.UID]) ([]PruneObject, error) {
var allObjects []PruneObject
// TODO: Run discovery in parallel (and maybe in consistent order?)
type task struct {
namespace string
restMapping *meta.RESTMapping

err error
results []PruneObject
}
var tasks []*task

// We run discovery in parallel, in as many goroutines as priority and fairness will allow
// (We don't expect many requests in real-world scenarios - maybe tens, unlikely to be hundreds)
for _, restMapping := range a.AllPrunableResources() {
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
Expand All @@ -75,25 +85,48 @@ func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dyna
// Just double-check because otherwise we get cryptic error messages
return nil, fmt.Errorf("unexpectedly encountered empty namespace during prune of namespace-scoped resource %v", restMapping.GroupVersionKind)
}
pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, namespace, restMapping)
if err != nil {
return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err)
}
allObjects = append(allObjects, pruneObjects...)
tasks = append(tasks, &task{
namespace: namespace,
restMapping: restMapping,
})
}

case meta.RESTScopeNameRoot:
pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, metav1.NamespaceNone, restMapping)
if err != nil {
return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err)
}
allObjects = append(allObjects, pruneObjects...)
tasks = append(tasks, &task{
restMapping: restMapping,
})

default:
return nil, fmt.Errorf("unhandled scope %q", restMapping.Scope.Name())
}
}

var wg sync.WaitGroup

for i := range tasks {
task := tasks[i]
wg.Add(1)
go func() {
defer wg.Done()

results, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, task.namespace, task.restMapping)
if err != nil {
task.err = fmt.Errorf("listing %v objects for pruning: %w", task.restMapping.GroupVersionKind.String(), err)
} else {
task.results = results
}
}()
}
// Wait for all the goroutines to finish
wg.Wait()

var allObjects []PruneObject
for _, task := range tasks {
if task.err != nil {
return nil, task.err
}
allObjects = append(allObjects, task.results...)
}
return allObjects, nil
}

Expand Down

0 comments on commit 2287225

Please sign in to comment.