Skip to content

Commit

Permalink
[release-4.5] Bug 1870782: Retry conflicting catalog source pod updat…
Browse files Browse the repository at this point in the history
…e after fetching latest pod spec
  • Loading branch information
Ankita Thomas authored and Ankita Thomas committed Aug 21, 2020
1 parent 63b96f4 commit d1ecc79
Show file tree
Hide file tree
Showing 13 changed files with 965 additions and 18 deletions.
3 changes: 2 additions & 1 deletion cmd/catalog/main.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
utilclock "k8s.io/apimachinery/pkg/util/clock"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client"
Expand Down Expand Up @@ -172,7 +173,7 @@ func main() {
}

// Create a new instance of the operator.
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace)
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme)
if err != nil {
log.Panicf("error configuring operator: %s", err.Error())
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/operators/catalog/operator.go
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"

errorwrap "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -47,6 +49,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
Expand All @@ -71,6 +74,7 @@ const (
generatedByKey = "olm.generated-by"
maxInstallPlanCount = 5
maxDeletesPerSweep = 5
RegistryFieldManager = "olm.registry"
)

// Operator represents a Kubernetes operator that executes InstallPlans by
Expand Down Expand Up @@ -103,7 +107,7 @@ type Operator struct {
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)

// NewOperator creates a new Catalog Operator.
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string) (*Operator, error) {
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme) (*Operator, error) {
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
Expand Down Expand Up @@ -132,6 +136,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Create an OperatorLister
lister := operatorlister.NewLister()

ssaClient, err := controllerclient.NewForConfig(config, scheme, RegistryFieldManager)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand All @@ -152,7 +161,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient, dynamicClient),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)

// Wire OLM CR sharedIndexInformers
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, resyncPeriod())
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/operators/catalog/operator_test.go
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
"github.com/operator-framework/operator-lifecycle-manager/pkg/fakes"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/clientfake"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand Down Expand Up @@ -1247,7 +1248,14 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
}
op.sources = grpc.NewSourceStore(config.logger, 1*time.Second, 5*time.Second, op.syncSourceState)
if op.reconciler == nil {
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now)
s := runtime.NewScheme()
err := k8sfake.AddToScheme(s)
if err != nil {
return nil, err
}
applier := controllerclient.NewFakeApplier(s, "testowner")

op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now, applier)
}

op.RunInformers(ctx)
Expand Down
22 changes: 12 additions & 10 deletions pkg/controller/registry/reconciler/grpc.go
Expand Up @@ -3,8 +3,8 @@ package reconciler
import (
"context"
"fmt"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand Down Expand Up @@ -70,9 +70,10 @@ func (s *grpcCatalogSourceDecorator) Pod() *v1.Pod {
}

type GrpcRegistryReconciler struct {
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
SSAClient *controllerclient.ServerSideApplier
}

var _ RegistryReconciler = &GrpcRegistryReconciler{}
Expand Down Expand Up @@ -194,16 +195,17 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat
logrus.WithField("CatalogSource", source.GetName()).Info("detect image update for catalogsource pod")

updateFlag = true
updatePod.Labels[CatalogSourceLabelKey] = source.GetName()
updatePod.Labels[CatalogSourceUpdateKey] = ""

// Update the update pod to promote it to serving pod
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Update(context.TODO(), updatePod, metav1.UpdateOptions{})
err := c.SSAClient.Apply(context.TODO(), updatePod, func(p *v1.Pod) error {
p.Labels[CatalogSourceLabelKey] = source.GetName()
p.Labels[CatalogSourceUpdateKey] = ""
return nil
})()

if err != nil {
return errors.Wrapf(err, "error creating new pod: %s", source.Pod().GetName())
return errors.Wrapf(err, "error updating catalog source pod: %s", source.Pod().GetName())
}

break
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/registry/reconciler/reconciler.go
Expand Up @@ -3,6 +3,7 @@ package reconciler

import (
"github.com/operator-framework/api/pkg/operators/v1alpha1"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -46,6 +47,7 @@ type registryReconcilerFactory struct {
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
ConfigMapServerImage string
SSAClient *controllerclient.ServerSideApplier
}

// ReconcilerForSource returns a RegistryReconciler based on the configuration of the given CatalogSource.
Expand All @@ -62,9 +64,10 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
case v1alpha1.SourceTypeGrpc:
if source.Spec.Image != "" {
return &GrpcRegistryReconciler{
now: r.now,
Lister: r.Lister,
OpClient: r.OpClient,
now: r.now,
Lister: r.Lister,
OpClient: r.OpClient,
SSAClient: r.SSAClient,
}
} else if source.Spec.Address != "" {
return &GrpcAddressRegistryReconciler{
Expand All @@ -76,12 +79,13 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
}

// NewRegistryReconcilerFactory returns an initialized RegistryReconcilerFactory.
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc) RegistryReconcilerFactory {
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc, ssaClient *controllerclient.ServerSideApplier) RegistryReconcilerFactory {
return &registryReconcilerFactory{
now: now,
Lister: lister,
OpClient: opClient,
ConfigMapServerImage: configMapServerImage,
SSAClient: ssaClient,
}
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/lib/controller-runtime/client/fake_ssa.go
@@ -0,0 +1,56 @@
package client

import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client"
fakecontrollerclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

// FakeApplier provides a wrapper around the fake k8s controller client to convert the unsupported apply-type patches to merge patches.
func NewFakeApplier(scheme *runtime.Scheme, owner string, objs ...runtime.Object) *ServerSideApplier {
return &ServerSideApplier{
client: &fakeApplier{fakecontrollerclient.NewFakeClientWithScheme(scheme, objs...)},
Scheme: scheme,
Owner: k8scontrollerclient.FieldOwner(owner),
}
}

type fakeApplier struct {
k8scontrollerclient.Client
}

func (c *fakeApplier) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
patch, opts = convertApplyToMergePatch(patch, opts...)
return c.Client.Patch(ctx, obj, patch, opts...)
}

func (c *fakeApplier) Status() k8scontrollerclient.StatusWriter {
return fakeStatusWriter{c.Client.Status()}
}

type fakeStatusWriter struct {
k8scontrollerclient.StatusWriter
}

func (c fakeStatusWriter) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
patch, opts = convertApplyToMergePatch(patch, opts...)
return c.StatusWriter.Patch(ctx, obj, patch, opts...)
}

func convertApplyToMergePatch(patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) (k8scontrollerclient.Patch, []k8scontrollerclient.PatchOption) {
// Apply patch type is not supported on the fake controller
if patch.Type() == types.ApplyPatchType {
patch = k8scontrollerclient.Merge
patchOptions := make([]k8scontrollerclient.PatchOption, 0, len(opts))
for _, opt := range opts {
if opt == k8scontrollerclient.ForceOwnership {
continue
}
patchOptions = append(patchOptions, opt)
}
opts = patchOptions
}
return patch, opts
}

0 comments on commit d1ecc79

Please sign in to comment.