Skip to content

Commit

Permalink
Add pager.EachPageItem utility function to incrementally process lists
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Apr 10, 2019
1 parent d99f49d commit 6a64ee6
Show file tree
Hide file tree
Showing 3 changed files with 355 additions and 2 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/tools/pager/BUILD
Expand Up @@ -17,6 +17,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)

Expand Down
114 changes: 114 additions & 0 deletions staging/src/k8s.io/client-go/tools/pager/pager.go
Expand Up @@ -25,9 +25,11 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

const defaultPageSize = 500
const defaultPageBufferSize = 10

// ListPageFunc returns a list object for the given list options.
type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
Expand All @@ -48,6 +50,9 @@ type ListPager struct {
PageFn ListPageFunc

FullListIfExpired bool

// Number of pages to buffer
PageBufferSize int32
}

// New creates a new pager from the provided pager function using the default
Expand All @@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager {
PageSize: defaultPageSize,
PageFn: fn,
FullListIfExpired: true,
PageBufferSize: defaultPageBufferSize,
}
}

Expand All @@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
}
var list *metainternalversion.List
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

obj, err := p.PageFn(ctx, options)
if err != nil {
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
Expand Down Expand Up @@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
options.Continue = m.GetContinue()
}
}

// EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
// fn returns an error, processing stops and that error is returned. If fn does not return an error,
// any error encountered while retrieving the list from the server is returned. If the context
// cancels or times out, the context error is returned. Since the list is retrieved in paginated
// chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
// requests exceed the expiration limit of the apiserver being called.
//
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
// ListPager.PageBufferSize chunks buffered concurrently in the background.
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItem(obj, fn)
})
}

// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
// not return an error, any error encountered while retrieving the list from the server is
// returned. If the context cancels or times out, the context error is returned. Since the list is
// retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
// the pagination list requests exceed the expiration limit of the apiserver being called.
//
// Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
if p.PageBufferSize < 0 {
return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
}

// Ensure background goroutine is stopped if this call exits before all list items are
// processed. Cancelation error from this deferred cancel call is never returned to caller;
// either the list result has already been sent to bgResultC or the fn error is returned and
// the cancelation error is discarded.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

chunkC := make(chan runtime.Object, p.PageBufferSize)
bgResultC := make(chan error, 1)
go func() {
defer utilruntime.HandleCrash()

var err error
defer func() {
close(chunkC)
bgResultC <- err
}()
err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
select {
case chunkC <- chunk: // buffer the chunk, this can block
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}()

for o := range chunkC {
err := fn(o)
if err != nil {
return err // any fn error should be returned immediately
}
}
// promote the results of our background goroutine to the foreground
return <-bgResultC
}

// eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
// chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
// an error, any error encountered while retrieving the list from the server is returned. If the
// context cancels or times out, the context error is returned. Since the list is retrieved in
// paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
// pagination list requests exceed the expiration limit of the apiserver being called.
func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
if options.Limit == 0 {
options.Limit = p.PageSize
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

obj, err := p.PageFn(ctx, options)
if err != nil {
return err
}
m, err := meta.ListAccessor(obj)
if err != nil {
return fmt.Errorf("returned object must be a list: %v", err)
}
if err := fn(obj); err != nil {
return err
}
// if we have no more items, return.
if len(m.GetContinue()) == 0 {
return nil
}
// set the next loop up
options.Continue = m.GetContinue()
}
}

0 comments on commit 6a64ee6

Please sign in to comment.