Skip to content

Commit

Permalink
feat: add an option to bootstrap WatchKind with initial list of resou…
Browse files Browse the repository at this point in the history
…rces

This allows to have a consistent view of a list of resources, it is
bootstrapped with the initial list and then incremental updates.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Feb 16, 2021
1 parent 734f1e1 commit 28dd9aa
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 6 deletions.
41 changes: 41 additions & 0 deletions pkg/state/conformance/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,47 @@ func (suite *StateSuite) TestWatchKind() {
case <-time.After(time.Second):
suite.FailNow("timed out waiting for event")
}

chWithBootstrap := make(chan state.Event)

suite.Require().NoError(suite.State.WatchKind(ctx, path1.Metadata(), chWithBootstrap, state.WithBootstrapContents(true)))

resources, err := suite.State.List(ctx, path1.Metadata())
suite.Require().NoError(err)

for _, res := range resources.Items {
select {
case event := <-chWithBootstrap:
suite.Assert().Equal(state.Created, event.Type)
suite.Assert().Equal(res.String(), event.Resource.String())
suite.Assert().Equal(res.Metadata().Version(), event.Resource.Metadata().Version())
case <-time.After(time.Second):
suite.FailNow("timed out waiting for event")
}
}

oldVersion = path2.Metadata().Version()
path2.Metadata().BumpVersion()

suite.Require().NoError(suite.State.Update(ctx, oldVersion, path2))

select {
case event := <-ch:
suite.Assert().Equal(state.Updated, event.Type)
suite.Assert().Equal(path2.String(), event.Resource.String())
suite.Assert().Equal(path2.Metadata().Version(), event.Resource.Metadata().Version())
case <-time.After(time.Second):
suite.FailNow("timed out waiting for event")
}

select {
case event := <-chWithBootstrap:
suite.Assert().Equal(state.Updated, event.Type)
suite.Assert().Equal(path2.String(), event.Resource.String())
suite.Assert().Equal(path2.Metadata().Version(), event.Resource.Metadata().Version())
case <-time.After(time.Second):
suite.FailNow("timed out waiting for event")
}
}

// TestConcurrentFinalizers perform concurrent finalizer updates.
Expand Down
36 changes: 35 additions & 1 deletion pkg/state/impl/inmem/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,47 @@ func (collection *ResourceCollection) Watch(ctx context.Context, id resource.ID,
}

// WatchAll for any resource change stored in this collection.
func (collection *ResourceCollection) WatchAll(ctx context.Context, ch chan<- state.Event) error {
func (collection *ResourceCollection) WatchAll(ctx context.Context, ch chan<- state.Event, opts ...state.WatchKindOption) error {
var options state.WatchKindOptions

for _, opt := range opts {
opt(&options)
}

collection.mu.Lock()
defer collection.mu.Unlock()

pos := collection.writePos

var bootstrapList []resource.Resource

if options.BootstrapContents {
bootstrapList = make([]resource.Resource, 0, len(collection.storage))

for _, res := range collection.storage {
bootstrapList = append(bootstrapList, res.DeepCopy())
}

sort.Slice(bootstrapList, func(i, j int) bool {
return bootstrapList[i].Metadata().ID() < bootstrapList[j].Metadata().ID()
})
}

go func() {
// send initial contents if they were captured
for _, res := range bootstrapList {
select {
case ch <- state.Event{
Type: state.Created,
Resource: res,
}:
case <-ctx.Done():
return
}
}

bootstrapList = nil

for {
collection.mu.Lock()
// while there's no data to consume (pos == e.writePos), wait for Condition variable signal,
Expand Down
4 changes: 2 additions & 2 deletions pkg/state/impl/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ func (state *State) Watch(ctx context.Context, resourcePointer resource.Pointer,
}

// WatchKind all resources by type.
func (state *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch chan<- state.Event) error {
return state.getCollection(resourceKind.Type()).WatchAll(ctx, ch)
func (state *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch chan<- state.Event, opts ...state.WatchKindOption) error {
return state.getCollection(resourceKind.Type()).WatchAll(ctx, ch, opts...)
}
4 changes: 2 additions & 2 deletions pkg/state/impl/namespaced/namespaced.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ func (st *State) Watch(ctx context.Context, ptr resource.Pointer, ch chan<- stat
}

// WatchKind watches resources of specific kind (namespace and type).
func (st *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- state.Event) error {
return st.getNamespace(kind.Namespace()).WatchKind(ctx, kind, ch)
func (st *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- state.Event, opts ...state.WatchKindOption) error {
return st.getNamespace(kind.Namespace()).WatchKind(ctx, kind, ch, opts...)
}
15 changes: 15 additions & 0 deletions pkg/state/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,18 @@ type WatchOptions struct{}

// WatchOption builds WatchOptions.
type WatchOption func(*WatchOptions)

// WatchKindOptions for the CoreState.WatchKind function.
type WatchKindOptions struct {
BootstrapContents bool
}

// WatchKindOption builds WatchOptions.
type WatchKindOption func(*WatchKindOptions)

// WithBootstrapContents enables loading initial list of resources as 'created' events for WatchKind API.
func WithBootstrapContents(enable bool) WatchKindOption {
return func(opts *WatchKindOptions) {
opts.BootstrapContents = enable
}
}
2 changes: 1 addition & 1 deletion pkg/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type CoreState interface {
Watch(context.Context, resource.Pointer, chan<- Event, ...WatchOption) error

// WatchKind watches resources of specific kind (namespace and type).
WatchKind(context.Context, resource.Kind, chan<- Event) error
WatchKind(context.Context, resource.Kind, chan<- Event, ...WatchKindOption) error
}

// UpdaterFunc is called on resource to update it to the desired state.
Expand Down

0 comments on commit 28dd9aa

Please sign in to comment.