Skip to content

Commit

Permalink
Improve and simplify maintenance of APF bootstrap objects
Browse files Browse the repository at this point in the history
Prepare to make deletion of unwanted object conditional on ResourceVersion.

Remove unnecessary split between finding unwanted objects and removing
them.

Remove unnecessary layers of indirection to reach constant logic.

Use interfaces to remove need for type assertions.

Threaded context into APF object maintenance

Note and respect immutability of desired bootstrap objects
  • Loading branch information
MikeSpreitzer committed May 5, 2023
1 parent 7e25f12 commit 008576d
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 681 deletions.
226 changes: 92 additions & 134 deletions pkg/registry/flowcontrol/ensurer/flowschema.go
Expand Up @@ -18,194 +18,152 @@ package ensurer

import (
"context"
"errors"
"fmt"

flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
)

var (
errObjectNotFlowSchema = errors.New("object is not a FlowSchema type")
)
// WrapBootstrapFlowSchemas creates a generic representation of the given bootstrap objects bound with their operations
// Every object in `boots` is immutable.
func WrapBootstrapFlowSchemas(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister, boots []*flowcontrolv1beta3.FlowSchema) BootstrapObjects {
return &bootstrapFlowSchemas{
flowSchemaClient: flowSchemaClient{
client: client,
lister: lister},
boots: boots,
}
}

// FlowSchemaEnsurer ensures the specified bootstrap configuration objects
type FlowSchemaEnsurer interface {
Ensure([]*flowcontrolv1beta3.FlowSchema) error
type flowSchemaClient struct {
client flowcontrolclient.FlowSchemaInterface
lister flowcontrollisters.FlowSchemaLister
}

// FlowSchemaRemover is the interface that wraps the
// RemoveAutoUpdateEnabledObjects method.
//
// RemoveAutoUpdateEnabledObjects removes a set of bootstrap FlowSchema
// objects specified via their names. The function removes an object
// only if automatic update of the spec is enabled for it.
type FlowSchemaRemover interface {
RemoveAutoUpdateEnabledObjects([]string) error
type bootstrapFlowSchemas struct {
flowSchemaClient

// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.FlowSchema
}

// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of suggested FlowSchema configuration objects.
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
lister: lister,
}
return &fsEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper,
}
func (*flowSchemaClient) typeName() string {
return "FlowSchema"
}

// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of mandatory FlowSchema configuration objects.
func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
lister: lister,
}
return &fsEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
func (boots *bootstrapFlowSchemas) len() int {
return len(boots.boots)
}

// NewFlowSchemaRemover returns a FlowSchemaRemover instance that
// can be used to remove a set of FlowSchema configuration objects.
func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaRemover {
return &fsEnsurer{
wrapper: &flowSchemaWrapper{
client: client,
lister: lister,
},
func (boots *bootstrapFlowSchemas) get(i int) bootstrapObject {
return &bootstrapFlowSchema{
flowSchemaClient: &boots.flowSchemaClient,
bootstrap: boots.boots[i],
}
}

// GetFlowSchemaRemoveCandidates returns a list of FlowSchema object
// names that are candidates for deletion from the cluster.
// bootstrap: a set of hard coded FlowSchema configuration objects
// kube-apiserver maintains in-memory.
func GetFlowSchemaRemoveCandidates(lister flowcontrollisters.FlowSchemaLister, bootstrap []*flowcontrolv1beta3.FlowSchema) ([]string, error) {
fsList, err := lister.List(labels.Everything())
func (boots *bootstrapFlowSchemas) getExistingObjects() ([]deletable, error) {
objs, err := boots.lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list FlowSchema - %w", err)
return nil, fmt.Errorf("failed to list FlowSchema objects - %w", err)
}

bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletableFlowSchema{
FlowSchema: obj,
client: boots.client,
}
}
return dels, nil
}

currentObjects := make([]metav1.Object, len(fsList))
for i := range fsList {
currentObjects[i] = fsList[i]
}
type bootstrapFlowSchema struct {
*flowSchemaClient

return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil
// points to immutable contnet
bootstrap *flowcontrolv1beta3.FlowSchema
}

type fsEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
func (boot *bootstrapFlowSchema) getName() string {
return boot.bootstrap.Name
}

func (e *fsEnsurer) Ensure(flowSchemas []*flowcontrolv1beta3.FlowSchema) error {
for _, flowSchema := range flowSchemas {
// This code gets called by different goroutines. To avoid race conditions when
// https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243
// temporarily modifies the TypeMeta, we have to make a copy here.
if err := ensureConfiguration(e.wrapper, e.strategy, flowSchema.DeepCopy()); err != nil {
return err
}
}

return nil
func (boot *bootstrapFlowSchema) create(ctx context.Context) error {
// Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
}

func (e *fsEnsurer) RemoveAutoUpdateEnabledObjects(flowSchemas []string) error {
for _, flowSchema := range flowSchemas {
if err := removeAutoUpdateEnabledConfiguration(e.wrapper, flowSchema); err != nil {
return err
}
func (boot *bootstrapFlowSchema) getCurrent() (wantAndHave, error) {
current, err := boot.lister.Get(boot.bootstrap.Name)
if err != nil {
return nil, err
}

return nil
return &wantAndHaveFlowSchema{
client: boot.client,
want: boot.bootstrap,
have: current,
}, nil
}

// flowSchemaWrapper abstracts all FlowSchema specific logic, with this
// we can manage all boiler plate code in one place.
type flowSchemaWrapper struct {
type wantAndHaveFlowSchema struct {
client flowcontrolclient.FlowSchemaInterface
lister flowcontrollisters.FlowSchemaLister
want *flowcontrolv1beta3.FlowSchema
have *flowcontrolv1beta3.FlowSchema
}

func (fs *flowSchemaWrapper) TypeName() string {
return "FlowSchema"
func (wah *wantAndHaveFlowSchema) getWant() configurationObject {
return wah.want
}

func (fs *flowSchemaWrapper) Create(object runtime.Object) (runtime.Object, error) {
fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return nil, errObjectNotFlowSchema
}

return fs.client.Create(context.TODO(), fsObject, metav1.CreateOptions{FieldManager: fieldManager})
func (wah *wantAndHaveFlowSchema) getHave() configurationObject {
return wah.have
}

func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, error) {
fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return nil, errObjectNotFlowSchema
func (wah *wantAndHaveFlowSchema) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatableFlowSchema{
FlowSchema: copy,
client: wah.client,
}

return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager})
}

func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) {
return fs.lister.Get(name)
func (wah *wantAndHaveFlowSchema) specsDiffer() bool {
return flowSchemaSpecChanged(wah.want, wah.have)
}

func (fs *flowSchemaWrapper) Delete(name string) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{})
func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool {
copiedExpectedFlowSchema := expected.DeepCopy()
flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec)
}

func (fs *flowSchemaWrapper) CopySpec(bootstrap, current runtime.Object) error {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return errObjectNotFlowSchema
}
currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return errObjectNotFlowSchema
}

specCopy := bootstrapFS.Spec.DeepCopy()
currentFS.Spec = *specCopy
return nil
type updatableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}

func (fs *flowSchemaWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return false, errObjectNotFlowSchema
}
currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return false, errObjectNotFlowSchema
}
func (u *updatableFlowSchema) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.FlowSchema, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}

return flowSchemaSpecChanged(bootstrapFS, currentFS), nil
type deletableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}

func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool {
copiedExpectedFlowSchema := expected.DeepCopy()
flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec)
func (dbl *deletableFlowSchema) delete(ctx context.Context /* TODO: resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
}

0 comments on commit 008576d

Please sign in to comment.