Skip to content

Commit

Permalink
Allow backup list requests to be paged
Browse files Browse the repository at this point in the history
Signed-off-by: Dharma Bellamkonda <bellamko@adobe.com>
  • Loading branch information
dharmab committed May 25, 2021
1 parent d0f94a2 commit 1452c45
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/3823-dharmab
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add --client-page-size flag to server to allow chunking Kubernetes API LIST calls across multiple requests on large clusters
6 changes: 5 additions & 1 deletion pkg/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright 2021 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +73,7 @@ type kubernetesBackupper struct {
resticBackupperFactory restic.BackupperFactory
resticTimeout time.Duration
defaultVolumesToRestic bool
clientPageSize int
}

type resolvedAction struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewKubernetesBackupper(
resticBackupperFactory restic.BackupperFactory,
resticTimeout time.Duration,
defaultVolumesToRestic bool,
clientPageSize int,
) (Backupper, error) {
return &kubernetesBackupper{
backupClient: backupClient,
Expand All @@ -115,6 +117,7 @@ func NewKubernetesBackupper(
resticBackupperFactory: resticBackupperFactory,
resticTimeout: resticTimeout,
defaultVolumesToRestic: defaultVolumesToRestic,
clientPageSize: clientPageSize,
}, nil
}

Expand Down Expand Up @@ -272,6 +275,7 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
dynamicFactory: kb.dynamicFactory,
cohabitatingResources: cohabitatingResources(),
dir: tempDir,
pageSize: kb.clientPageSize,
}

items := collector.getAllItems()
Expand Down
55 changes: 46 additions & 9 deletions pkg/backup/item_collector.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright 2021 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@ limitations under the License.
package backup

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -28,7 +29,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/pager"

"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
Expand All @@ -45,6 +48,7 @@ type itemCollector struct {
dynamicFactory client.DynamicFactory
cohabitatingResources map[string]*cohabitatingResource
dir string
pageSize int
}

type kubernetesResource struct {
Expand Down Expand Up @@ -275,6 +279,7 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
var items []*kubernetesResource

for _, namespace := range namespacesToList {
// List items from Kubernetes API
log = log.WithField("namespace", namespace)

resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, namespace)
Expand All @@ -287,18 +292,50 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
listOptions := metav1.ListOptions{LabelSelector: labelSelector}

log.Info("Listing items")
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
unstructuredItems := make([]unstructured.Unstructured, 0)

if r.pageSize == 0 {
// If limit is the special value 0, do not use paging. Instead, request all items at once
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
unstructuredItems = append(unstructuredItems, unstructuredList.Items...)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
} else {
// If limit is positive, use a pager to split list over multiple requests
// Use Velero's dynamic list function instead of the default
listFunc := pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
list, err := resourceClient.List(listOptions)
if err != nil {
return nil, err
}
return list, nil
})
listPager := pager.New(listFunc)
// Use the page size defined in the server config
// TODO allow configuration of page buffer size and handling of ResourceExpired errors
listPager.PageSize = int64(r.pageSize)
// Add each item to unstructuredItems
err := listPager.EachListItem(context.Background(), listOptions, func(object runtime.Object) error {
item := object.(*unstructured.Unstructured)
unstructuredItems = append(unstructuredItems, *item)
return nil
})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error paging item list")
continue
}
}
log.Infof("Retrieved %d items", len(unstructuredList.Items))

// collect the items
for i := range unstructuredList.Items {
item := &unstructuredList.Items[i]
log.Infof("Retrieved %d items", len(unstructuredItems))

// Collect items in included Namespaces
for i := range unstructuredItems {
item := &unstructuredItems[i]

if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(item.GetName()) {
log.WithField("name", item.GetName()).Info("Skipping namespace because it's excluded")
Expand Down
13 changes: 11 additions & 2 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ const (
defaultResourceTerminatingTimeout = 10 * time.Minute

// server's client default qps and burst
defaultClientQPS float32 = 20.0
defaultClientBurst int = 30
defaultClientQPS float32 = 20.0
defaultClientBurst int = 30
defaultClientPageSize int = 0

defaultProfilerAddress = "localhost:6060"

Expand All @@ -115,6 +116,7 @@ type serverConfig struct {
disabledControllers []string
clientQPS float32
clientBurst int
clientPageSize int
profilerAddress string
formatFlag *logging.FormatFlag
defaultResticMaintenanceFrequency time.Duration
Expand Down Expand Up @@ -142,6 +144,7 @@ func NewCommand(f client.Factory) *cobra.Command {
restoreResourcePriorities: defaultRestorePriorities,
clientQPS: defaultClientQPS,
clientBurst: defaultClientBurst,
clientPageSize: defaultClientPageSize,
profilerAddress: defaultProfilerAddress,
resourceTerminatingTimeout: defaultResourceTerminatingTimeout,
formatFlag: logging.NewFormatFlag(),
Expand Down Expand Up @@ -205,6 +208,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().Var(&volumeSnapshotLocations, "default-volume-snapshot-locations", "List of unique volume providers and default volume snapshot location (provider1:location-01,provider2:location-02,...)")
command.Flags().Float32Var(&config.clientQPS, "client-qps", config.clientQPS, "Maximum number of requests per second by the server to the Kubernetes API once the burst limit has been reached.")
command.Flags().IntVar(&config.clientBurst, "client-burst", config.clientBurst, "Maximum number of requests by the server to the Kubernetes API in a short period of time.")
command.Flags().IntVar(&config.clientPageSize, "client-page-size", config.clientPageSize, "Page size of requests by the server to the Kubernetes API when listing objects during a backup. Set to 0 to disable paging.")
command.Flags().StringVar(&config.profilerAddress, "profiler-address", config.profilerAddress, "The address to expose the pprof profiler.")
command.Flags().DurationVar(&config.resourceTerminatingTimeout, "terminating-resource-timeout", config.resourceTerminatingTimeout, "How long to wait on persistent volumes and namespaces to terminate during a restore before timing out.")
command.Flags().DurationVar(&config.defaultBackupTTL, "default-backup-ttl", config.defaultBackupTTL, "How long to wait by default before backups can be garbage collected.")
Expand Down Expand Up @@ -249,6 +253,10 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
}
f.SetClientBurst(config.clientBurst)

if config.clientPageSize < 0 {
return nil, errors.New("client-page-size must not be negative")
}

kubeClient, err := f.KubeClient()
if err != nil {
return nil, err
Expand Down Expand Up @@ -610,6 +618,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.resticManager,
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToRestic,
s.config.clientPageSize,
)
cmd.CheckError(err)

Expand Down
8 changes: 8 additions & 0 deletions site/content/docs/main/backup-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ velero backup create --from-schedule example-schedule
```

This command will immediately trigger a new backup based on your template for `example-schedule`. This will not affect the backup schedule, and another backup will trigger at the scheduled time.

## Enable Kubernetes API Pagination

By default, Velero uses a single LIST API call for each resource type from the Kubernetes API when collecting items into a backup. This provides a fully consistent list. However, on clusters with a very large number of objects, the performance of this API call can be degraded and eventually be timed out by the Kubernetes apiserver.

The `--client-page-size` flag for the Velero server configures Velero to attempt to split the list up over multiple requests. Each request will contain no more objects than the given page size. If the object list is modified between requests, Velero will fall back to attempting a full list in a single API call.

If pagination is needed on a large cluster, a good starting page size is 500. This is the default value of `kubectl`'s similar `--chunk-size` flag. You can then experiment with higher values, noting their impact on the relevant `apiserver_request_duration_seconds_*` metrics from the Kubernetes apiserver.

0 comments on commit 1452c45

Please sign in to comment.