Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow backup list requests to be chunked #3823

Merged
merged 1 commit into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/3823-dharmab
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add --client-page-size flag to server to allow chunking Kubernetes API LIST calls across multiple requests on large clusters
6 changes: 5 additions & 1 deletion pkg/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright the Velero contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +73,7 @@ type kubernetesBackupper struct {
resticBackupperFactory restic.BackupperFactory
resticTimeout time.Duration
defaultVolumesToRestic bool
clientPageSize int
}

type resolvedAction struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewKubernetesBackupper(
resticBackupperFactory restic.BackupperFactory,
resticTimeout time.Duration,
defaultVolumesToRestic bool,
clientPageSize int,
) (Backupper, error) {
return &kubernetesBackupper{
backupClient: backupClient,
Expand All @@ -115,6 +117,7 @@ func NewKubernetesBackupper(
resticBackupperFactory: resticBackupperFactory,
resticTimeout: resticTimeout,
defaultVolumesToRestic: defaultVolumesToRestic,
clientPageSize: clientPageSize,
}, nil
}

Expand Down Expand Up @@ -272,6 +275,7 @@ func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Req
dynamicFactory: kb.dynamicFactory,
cohabitatingResources: cohabitatingResources(),
dir: tempDir,
pageSize: kb.clientPageSize,
}

items := collector.getAllItems()
Expand Down
71 changes: 62 additions & 9 deletions pkg/backup/item_collector.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 the Velero contributors.
Copyright the Velero contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@ limitations under the License.
package backup

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -25,10 +26,13 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/pager"

"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/discovery"
Expand All @@ -45,6 +49,7 @@ type itemCollector struct {
dynamicFactory client.DynamicFactory
cohabitatingResources map[string]*cohabitatingResource
dir string
pageSize int
}

type kubernetesResource struct {
Expand Down Expand Up @@ -275,6 +280,7 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
var items []*kubernetesResource

for _, namespace := range namespacesToList {
// List items from Kubernetes API
log = log.WithField("namespace", namespace)

resourceClient, err := r.dynamicFactory.ClientForGroupVersionResource(gv, resource, namespace)
Expand All @@ -287,18 +293,65 @@ func (r *itemCollector) getResourceItems(log logrus.FieldLogger, gv schema.Group
if selector := r.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
listOptions := metav1.ListOptions{LabelSelector: labelSelector}

log.Info("Listing items")
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
unstructuredItems := make([]unstructured.Unstructured, 0)

if r.pageSize > 0 {
// If limit is positive, use a pager to split list over multiple requests
// Use Velero's dynamic list function instead of the default
listFunc := pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
list, err := resourceClient.List(listOptions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are not planning on checking the type of error, simply return resourceClient.List(listOptions) might work as well.

Also, can you move the TODO on line 320 here? I think the ResourceExperied errors will be handled here in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Page buffer size would be handled by setting listPager.PageBufferSize.

I double checked how ResourceExpired errors need to be handled and it looks like ListPager only handles them for us if we use List() instead of EachListItem() :( So this needs to be changed to handle the error after all.

if err != nil {
return nil, err
}
return list, nil
})
listPager := pager.New(listFunc)
// Use the page size defined in the server config
// TODO allow configuration of page buffer size
listPager.PageSize = int64(r.pageSize)
// Add each item to temporary slice
var items []unstructured.Unstructured
err := listPager.EachListItem(context.Background(), listOptions, func(object runtime.Object) error {
item, isUnstructured := object.(*unstructured.Unstructured)
if !isUnstructured {
// We should never hit this
log.Error("Got type other than Unstructured from pager func")
return nil
}
items = append(items, *item)
return nil
})
if statusError, isStatusError := err.(*apierrors.StatusError); isStatusError && statusError.Status().Reason == metav1.StatusReasonExpired {
log.WithError(errors.WithStack(err)).Error("Error paging item list. Falling back on unpaginated list")
unstructuredList, err := resourceClient.List(listOptions)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
items = unstructuredList.Items
} else if err != nil {
log.WithError(errors.WithStack(err)).Error("Error paging item list")
continue
}
unstructuredItems = append(unstructuredItems, items...)
} else {
// If limit is not positive, do not use paging. Instead, request all items at once
unstructuredList, err := resourceClient.List(metav1.ListOptions{LabelSelector: labelSelector})
unstructuredItems = append(unstructuredItems, unstructuredList.Items...)
if err != nil {
log.WithError(errors.WithStack(err)).Error("Error listing items")
continue
}
}
log.Infof("Retrieved %d items", len(unstructuredList.Items))

// collect the items
for i := range unstructuredList.Items {
item := &unstructuredList.Items[i]
log.Infof("Retrieved %d items", len(unstructuredItems))

// Collect items in included Namespaces
for i := range unstructuredItems {
item := &unstructuredItems[i]

if gr == kuberesource.Namespaces && !r.backupRequest.NamespaceIncludesExcludes.ShouldInclude(item.GetName()) {
log.WithField("name", item.GetName()).Info("Skipping namespace because it's excluded")
Expand Down
13 changes: 11 additions & 2 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ const (
defaultResourceTerminatingTimeout = 10 * time.Minute

// server's client default qps and burst
defaultClientQPS float32 = 20.0
defaultClientBurst int = 30
defaultClientQPS float32 = 20.0
defaultClientBurst int = 30
defaultClientPageSize int = 500

defaultProfilerAddress = "localhost:6060"

Expand All @@ -115,6 +116,7 @@ type serverConfig struct {
disabledControllers []string
clientQPS float32
clientBurst int
clientPageSize int
profilerAddress string
formatFlag *logging.FormatFlag
defaultResticMaintenanceFrequency time.Duration
Expand Down Expand Up @@ -142,6 +144,7 @@ func NewCommand(f client.Factory) *cobra.Command {
restoreResourcePriorities: defaultRestorePriorities,
clientQPS: defaultClientQPS,
clientBurst: defaultClientBurst,
clientPageSize: defaultClientPageSize,
profilerAddress: defaultProfilerAddress,
resourceTerminatingTimeout: defaultResourceTerminatingTimeout,
formatFlag: logging.NewFormatFlag(),
Expand Down Expand Up @@ -205,6 +208,7 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().Var(&volumeSnapshotLocations, "default-volume-snapshot-locations", "List of unique volume providers and default volume snapshot location (provider1:location-01,provider2:location-02,...)")
command.Flags().Float32Var(&config.clientQPS, "client-qps", config.clientQPS, "Maximum number of requests per second by the server to the Kubernetes API once the burst limit has been reached.")
command.Flags().IntVar(&config.clientBurst, "client-burst", config.clientBurst, "Maximum number of requests by the server to the Kubernetes API in a short period of time.")
command.Flags().IntVar(&config.clientPageSize, "client-page-size", config.clientPageSize, "Page size of requests by the server to the Kubernetes API when listing objects during a backup. Set to 0 to disable paging.")
command.Flags().StringVar(&config.profilerAddress, "profiler-address", config.profilerAddress, "The address to expose the pprof profiler.")
command.Flags().DurationVar(&config.resourceTerminatingTimeout, "terminating-resource-timeout", config.resourceTerminatingTimeout, "How long to wait on persistent volumes and namespaces to terminate during a restore before timing out.")
command.Flags().DurationVar(&config.defaultBackupTTL, "default-backup-ttl", config.defaultBackupTTL, "How long to wait by default before backups can be garbage collected.")
Expand Down Expand Up @@ -249,6 +253,10 @@ func newServer(f client.Factory, config serverConfig, logger *logrus.Logger) (*s
}
f.SetClientBurst(config.clientBurst)

if config.clientPageSize < 0 {
return nil, errors.New("client-page-size must not be negative")
}

kubeClient, err := f.KubeClient()
if err != nil {
return nil, err
Expand Down Expand Up @@ -610,6 +618,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.resticManager,
s.config.podVolumeOperationTimeout,
s.config.defaultVolumesToRestic,
s.config.clientPageSize,
)
cmd.CheckError(err)

Expand Down
8 changes: 8 additions & 0 deletions site/content/docs/main/backup-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ velero backup create --from-schedule example-schedule
```

This command will immediately trigger a new backup based on your template for `example-schedule`. This will not affect the backup schedule, and another backup will trigger at the scheduled time.

## Kubernetes API Pagination

By default, Velero will paginate the LIST API call for each resource type in the Kubernetes API when collecting items into a backup. The `--client-page-size` flag for the Velero server configures the size of each page.

Depending on the cluster's scale, tuning the page size can improve backup performance. You can experiment with higher values, noting their impact on the relevant `apiserver_request_duration_seconds_*` metrics from the Kubernetes apiserver.

Pagination can be entirely disabled by setting `--client-page-size` to `0`. This will request all items in a single unpaginated LIST call.