From 6a64ee638780de4dcfa2ab2b4fc739dd23127c49 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Thu, 28 Mar 2019 15:39:14 -0700 Subject: [PATCH] Add pager.EachPageItem utility function to incrementally process lists --- .../src/k8s.io/client-go/tools/pager/BUILD | 1 + .../src/k8s.io/client-go/tools/pager/pager.go | 114 +++++++++ .../client-go/tools/pager/pager_test.go | 242 +++++++++++++++++- 3 files changed, 355 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/pager/BUILD b/staging/src/k8s.io/client-go/tools/pager/BUILD index 304d5b650694..9cf8111a631f 100644 --- a/staging/src/k8s.io/client-go/tools/pager/BUILD +++ b/staging/src/k8s.io/client-go/tools/pager/BUILD @@ -17,6 +17,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index 74ea3586ab8b..d265db786834 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -25,9 +25,11 @@ import ( metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) const defaultPageSize = 500 +const defaultPageBufferSize = 10 // ListPageFunc returns a list object for the given list options. type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) @@ -48,6 +50,9 @@ type ListPager struct { PageFn ListPageFunc FullListIfExpired bool + + // Number of pages to buffer + PageBufferSize int32 } // New creates a new pager from the provided pager function using the default @@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager { PageSize: defaultPageSize, PageFn: fn, FullListIfExpired: true, + PageBufferSize: defaultPageBufferSize, } } @@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti } var list *metainternalversion.List for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + obj, err := p.PageFn(ctx, options) if err != nil { if !errors.IsResourceExpired(err) || !p.FullListIfExpired { @@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti options.Continue = m.GetContinue() } } + +// EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If +// fn returns an error, processing stops and that error is returned. If fn does not return an error, +// any error encountered while retrieving the list from the server is returned. If the context +// cancels or times out, the context error is returned. Since the list is retrieved in paginated +// chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list +// requests exceed the expiration limit of the apiserver being called. +// +// Items are retrieved in chunks from the server to reduce the impact on the server with up to +// ListPager.PageBufferSize chunks buffered concurrently in the background. +func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error { + return meta.EachListItem(obj, fn) + }) +} + +// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on +// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does +// not return an error, any error encountered while retrieving the list from the server is +// returned. If the context cancels or times out, the context error is returned. Since the list is +// retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if +// the pagination list requests exceed the expiration limit of the apiserver being called. +// +// Up to ListPager.PageBufferSize chunks are buffered concurrently in the background. +func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + if p.PageBufferSize < 0 { + return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize) + } + + // Ensure background goroutine is stopped if this call exits before all list items are + // processed. Cancelation error from this deferred cancel call is never returned to caller; + // either the list result has already been sent to bgResultC or the fn error is returned and + // the cancelation error is discarded. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + chunkC := make(chan runtime.Object, p.PageBufferSize) + bgResultC := make(chan error, 1) + go func() { + defer utilruntime.HandleCrash() + + var err error + defer func() { + close(chunkC) + bgResultC <- err + }() + err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error { + select { + case chunkC <- chunk: // buffer the chunk, this can block + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + }() + + for o := range chunkC { + err := fn(o) + if err != nil { + return err // any fn error should be returned immediately + } + } + // promote the results of our background goroutine to the foreground + return <-bgResultC +} + +// eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list +// chunk. If fn returns an error, processing stops and that error is returned. If fn does not return +// an error, any error encountered while retrieving the list from the server is returned. If the +// context cancels or times out, the context error is returned. Since the list is retrieved in +// paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the +// pagination list requests exceed the expiration limit of the apiserver being called. +func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + if options.Limit == 0 { + options.Limit = p.PageSize + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + obj, err := p.PageFn(ctx, options) + if err != nil { + return err + } + m, err := meta.ListAccessor(obj) + if err != nil { + return fmt.Errorf("returned object must be a list: %v", err) + } + if err := fn(obj); err != nil { + return err + } + // if we have no more items, return. + if len(m.GetContinue()) == 0 { + return nil + } + // set the next loop up + options.Continue = m.GetContinue() + } +} diff --git a/staging/src/k8s.io/client-go/tools/pager/pager_test.go b/staging/src/k8s.io/client-go/tools/pager/pager_test.go index ae517cab207e..2332b53d78f6 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager_test.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager_test.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "testing" + "time" "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" @@ -115,7 +116,6 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options } return p.PagedList(ctx, options) } - func TestListPager_List(t *testing.T) { type fields struct { PageSize int64 @@ -189,7 +189,11 @@ func TestListPager_List(t *testing.T) { PageFn: tt.fields.PageFn, FullListIfExpired: tt.fields.FullListIfExpired, } - got, err := p.List(tt.args.ctx, tt.args.options) + ctx := tt.args.ctx + if ctx == nil { + ctx = context.Background() + } + got, err := p.List(ctx, tt.args.options) if (err != nil) != tt.wantErr { t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) return @@ -204,3 +208,237 @@ func TestListPager_List(t *testing.T) { }) } } + +func TestListPager_EachListItem(t *testing.T) { + type fields struct { + PageSize int64 + PageFn ListPageFunc + } + tests := []struct { + name string + fields fields + want runtime.Object + wantErr bool + wantPanic bool + isExpired bool + processorErrorOnItem int + processorPanicOnItem int + cancelContextOnItem int + }{ + { + name: "empty page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, + want: list(0, "rv:20"), + }, + { + name: "one page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, + want: list(9, "rv:20"), + }, + { + name: "one full page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, + want: list(10, "rv:20"), + }, + { + name: "two pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + want: list(11, "rv:20"), + }, + { + name: "three pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, + want: list(21, "rv:20"), + }, + { + name: "expires on second page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, + want: list(10, "rv:20"), // all items on the first page should have been visited + wantErr: true, + isExpired: true, + }, + { + name: "error processing item", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, + want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited + wantPanic: true, + processorPanicOnItem: 3, + }, + { + name: "cancel context while processing", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, + want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited + wantErr: true, + cancelContextOnItem: 3, + }, + { + name: "panic processing item", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, + want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited + wantPanic: true, + }, + } + + processorErr := fmt.Errorf("processor error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + p := &ListPager{ + PageSize: tt.fields.PageSize, + PageFn: tt.fields.PageFn, + } + var items []runtime.Object + + fn := func(obj runtime.Object) error { + items = append(items, obj) + if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem { + return processorErr + } + if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem { + panic(processorErr) + } + if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem { + cancel() + } + return nil + } + var err error + var panic interface{} + func() { + defer func() { + panic = recover() + }() + err = p.EachListItem(ctx, metav1.ListOptions{}, fn) + }() + if (panic != nil) && !tt.wantPanic { + t.Fatalf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic) + } else { + return + } + if (err != nil) != tt.wantErr { + t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.isExpired != errors.IsResourceExpired(err) { + t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired) + return + } + if tt.processorErrorOnItem > 0 && err != processorErr { + t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem) + return + } + l := tt.want.(*metainternalversion.List) + if !reflect.DeepEqual(items, l.Items) { + t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items) + } + }) + } +} + +func TestListPager_eachListPageBuffered(t *testing.T) { + tests := []struct { + name string + totalPages int + pagesProcessed int + wantPageLists int + pageBufferSize int32 + pageSize int + }{ + { + name: "no buffer, one total page", + totalPages: 1, + pagesProcessed: 1, + wantPageLists: 1, + pageBufferSize: 0, + }, { + name: "no buffer, 1/5 pages processed", + totalPages: 5, + pagesProcessed: 1, + wantPageLists: 2, // 1 received for processing, 1 listed + pageBufferSize: 0, + }, + { + name: "no buffer, 2/5 pages processed", + totalPages: 5, + pagesProcessed: 2, + wantPageLists: 3, + pageBufferSize: 0, + }, + { + name: "no buffer, 5/5 pages processed", + totalPages: 5, + pagesProcessed: 5, + wantPageLists: 5, + pageBufferSize: 0, + }, + { + name: "size 1 buffer, 1/5 pages processed", + totalPages: 5, + pagesProcessed: 1, + wantPageLists: 3, + pageBufferSize: 1, + }, + { + name: "size 1 buffer, 5/5 pages processed", + totalPages: 5, + pagesProcessed: 5, + wantPageLists: 5, + pageBufferSize: 1, + }, + { + name: "size 10 buffer, 1/5 page processed", + totalPages: 5, + pagesProcessed: 1, + wantPageLists: 5, + pageBufferSize: 10, // buffer is larger than list + }, + } + processorErr := fmt.Errorf("processor error") + pageSize := 10 + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"} + pageLists := 0 + wantedPageListsDone := make(chan struct{}) + listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + pageLists++ + if pageLists == tt.wantPageLists { + close(wantedPageListsDone) + } + return pgr.PagedList(ctx, options) + } + p := &ListPager{ + PageSize: int64(pageSize), + PageBufferSize: tt.pageBufferSize, + PageFn: listFn, + } + + pagesProcessed := 0 + fn := func(obj runtime.Object) error { + pagesProcessed++ + if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 { + // wait for buffering to catch up + select { + case <-time.After(time.Second): + return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists) + case <-wantedPageListsDone: + } + return processorErr + } + return nil + } + err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn) + if tt.pagesProcessed > 0 && err == processorErr { + // expected + } else if err != nil { + t.Fatal(err) + } + if tt.wantPageLists > 0 && pageLists != tt.wantPageLists { + t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists) + } + if pagesProcessed != tt.pagesProcessed { + t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed) + } + }) + } +}