Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ListPager.EachListItem util #75849

Merged
merged 1 commit into from Apr 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/tools/pager/BUILD
Expand Up @@ -17,6 +17,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)

Expand Down
114 changes: 114 additions & 0 deletions staging/src/k8s.io/client-go/tools/pager/pager.go
Expand Up @@ -25,9 +25,11 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

const defaultPageSize = 500
const defaultPageBufferSize = 10

// ListPageFunc returns a list object for the given list options.
type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
Expand All @@ -48,6 +50,9 @@ type ListPager struct {
PageFn ListPageFunc

FullListIfExpired bool

// Number of pages to buffer
PageBufferSize int32
Copy link
Contributor

@tedyu tedyu Apr 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the buffer should be defined by the memory consumption instead of number of pages.

We can evaluate the size of a few chunks to dynamically determine the number of chunks to buffer, based on desired buffer size (in terms of memory).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Maybe wait and see how this approach works out and then optimize as needed from there? We previously had quite a bit of code doing full lists, and so as we transition to paginated lists and this sort of incremental processing my expectation is we'll reduce memory usage, particularly for object kinds that have large counts. If we still hit scalability/performance limits once this is in use, that would seem like a good time to look into optimizing this further .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

}

// New creates a new pager from the provided pager function using the default
Expand All @@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager {
PageSize: defaultPageSize,
PageFn: fn,
FullListIfExpired: true,
PageBufferSize: defaultPageBufferSize,
}
}

Expand All @@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
}
var list *metainternalversion.List
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

obj, err := p.PageFn(ctx, options)
if err != nil {
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
Expand Down Expand Up @@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
options.Continue = m.GetContinue()
}
}

// EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
// fn returns an error, processing stops and that error is returned. If fn does not return an error,
// any error encountered while retrieving the list from the server is returned. If the context
// cancels or times out, the context error is returned. Since the list is retrieved in paginated
// chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
// requests exceed the expiration limit of the apiserver being called.
//
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
// ListPager.PageBufferSize chunks buffered concurrently in the background.
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItem(obj, fn)
})
}

// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
// not return an error, any error encountered while retrieving the list from the server is
// returned. If the context cancels or times out, the context error is returned. Since the list is
// retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
// the pagination list requests exceed the expiration limit of the apiserver being called.
//
// Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
if p.PageBufferSize < 0 {
return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
}

// Ensure background goroutine is stopped if this call exits before all list items are
// processed. Cancelation error from this deferred cancel call is never returned to caller;
// either the list result has already been sent to bgResultC or the fn error is returned and
// the cancelation error is discarded.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

chunkC := make(chan runtime.Object, p.PageBufferSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error if p.PageBufferSize <= 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, set it to < since 0 size buffer is supported (tests demo this).

bgResultC := make(chan error, 1)
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer runtime.HandleCrash() at the top of the goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I should do something with panics. I've gone with runtime.RecoverFromPanic here. Hopefully it's appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runtime.RecoverFromPanic didn't work as I had expected (and from a quick grep, doesn't look like we use it in the k8s codebase), so I did a more direct recover.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you can't use runtime.HandleCrash() - it's pretty widely used in the codebase and allows to use additional custom handler too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had initially misunderstood it to catch the panic and swallow it. But it does still crash. Updating the PR to use it now.

defer utilruntime.HandleCrash()

var err error
defer func() {
close(chunkC)
bgResultC <- err
}()
err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
select {
case chunkC <- chunk: // buffer the chunk, this can block
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}()

for o := range chunkC {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If listing chunk in the background fails (such as due to resource expired), we will still finish processing the existing chunks in the buffer before return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think from an API contract perspective, when a error occurs listing chunks, i think it's valid to either (1) stop calling fn as soon as possible, or (2) call fn on as many items as have been successfully retrieved. I went with #1 since it was trivial to implement. I imagine in different situations a client might benefit more from one of these and in other situations the other. Thoughts?

err := fn(o)
if err != nil {
return err // any fn error should be returned immediately
}
}
// promote the results of our background goroutine to the foreground
return <-bgResultC
}

// eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
// chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
// an error, any error encountered while retrieving the list from the server is returned. If the
// context cancels or times out, the context error is returned. Since the list is retrieved in
// paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
// pagination list requests exceed the expiration limit of the apiserver being called.
func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
if options.Limit == 0 {
options.Limit = p.PageSize
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

obj, err := p.PageFn(ctx, options)
if err != nil {
return err
}
m, err := meta.ListAccessor(obj)
if err != nil {
return fmt.Errorf("returned object must be a list: %v", err)
}
if err := fn(obj); err != nil {
return err
}
// if we have no more items, return.
if len(m.GetContinue()) == 0 {
return nil
}
// set the next loop up
options.Continue = m.GetContinue()
}
}