Skip to content

Commit

Permalink
Move server side apply patch code to library, use ssa to update catal…
Browse files Browse the repository at this point in the history
…og source pods
  • Loading branch information
Ankita Thomas authored and Ankita Thomas committed Aug 21, 2020
1 parent 63b96f4 commit cfe2bd9
Show file tree
Hide file tree
Showing 15 changed files with 1,430 additions and 18 deletions.
3 changes: 2 additions & 1 deletion cmd/catalog/main.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
utilclock "k8s.io/apimachinery/pkg/util/clock"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client"
Expand Down Expand Up @@ -172,7 +173,7 @@ func main() {
}

// Create a new instance of the operator.
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace)
op, err := catalog.NewOperator(ctx, *kubeConfigPath, utilclock.RealClock{}, logger, *wakeupInterval, *configmapServerImage, *utilImage, *catalogNamespace, k8sscheme.Scheme)
if err != nil {
log.Panicf("error configuring operator: %s", err.Error())
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/operators/catalog/operator.go
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"

errorwrap "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -47,6 +49,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
Expand All @@ -71,6 +74,7 @@ const (
generatedByKey = "olm.generated-by"
maxInstallPlanCount = 5
maxDeletesPerSweep = 5
RegistryFieldManager = "olm.registry"
)

// Operator represents a Kubernetes operator that executes InstallPlans by
Expand Down Expand Up @@ -103,7 +107,7 @@ type Operator struct {
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)

// NewOperator creates a new Catalog Operator.
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string) (*Operator, error) {
func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clock, logger *logrus.Logger, resync time.Duration, configmapRegistryImage, utilImage string, operatorNamespace string, scheme *runtime.Scheme) (*Operator, error) {
resyncPeriod := queueinformer.ResyncWithJitter(resync, 0.2)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
Expand Down Expand Up @@ -132,6 +136,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Create an OperatorLister
lister := operatorlister.NewLister()

ssaClient, err := controllerclient.NewForConfig(config, scheme, RegistryFieldManager)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand All @@ -152,7 +161,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient, dynamicClient),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)

// Wire OLM CR sharedIndexInformers
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, resyncPeriod())
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/operators/catalog/operator_test.go
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
"github.com/operator-framework/operator-lifecycle-manager/pkg/fakes"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/clientfake"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand Down Expand Up @@ -1247,7 +1248,14 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
}
op.sources = grpc.NewSourceStore(config.logger, 1*time.Second, 5*time.Second, op.syncSourceState)
if op.reconciler == nil {
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now)
s := runtime.NewScheme()
err := k8sfake.AddToScheme(s)
if err != nil {
return nil, err
}
applier := controllerclient.NewFakeApplier(s, "testowner")

op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now, applier)
}

op.RunInformers(ctx)
Expand Down
22 changes: 12 additions & 10 deletions pkg/controller/registry/reconciler/grpc.go
Expand Up @@ -3,8 +3,8 @@ package reconciler
import (
"context"
"fmt"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand Down Expand Up @@ -70,9 +70,10 @@ func (s *grpcCatalogSourceDecorator) Pod() *v1.Pod {
}

type GrpcRegistryReconciler struct {
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
SSAClient *controllerclient.ServerSideApplier
}

var _ RegistryReconciler = &GrpcRegistryReconciler{}
Expand Down Expand Up @@ -194,16 +195,17 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat
logrus.WithField("CatalogSource", source.GetName()).Info("detect image update for catalogsource pod")

updateFlag = true
updatePod.Labels[CatalogSourceLabelKey] = source.GetName()
updatePod.Labels[CatalogSourceUpdateKey] = ""

// Update the update pod to promote it to serving pod
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Update(context.TODO(), updatePod, metav1.UpdateOptions{})
err := c.SSAClient.Apply(context.TODO(), updatePod, func(p *v1.Pod) error {
p.Labels[CatalogSourceLabelKey] = source.GetName()
p.Labels[CatalogSourceUpdateKey] = ""
return nil
})()

if err != nil {
return errors.Wrapf(err, "error creating new pod: %s", source.Pod().GetName())
return errors.Wrapf(err, "error updating catalog source pod: %s", source.Pod().GetName())
}

break
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/registry/reconciler/reconciler.go
Expand Up @@ -3,6 +3,7 @@ package reconciler

import (
"github.com/operator-framework/api/pkg/operators/v1alpha1"
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -46,6 +47,7 @@ type registryReconcilerFactory struct {
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
ConfigMapServerImage string
SSAClient *controllerclient.ServerSideApplier
}

// ReconcilerForSource returns a RegistryReconciler based on the configuration of the given CatalogSource.
Expand All @@ -62,9 +64,10 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
case v1alpha1.SourceTypeGrpc:
if source.Spec.Image != "" {
return &GrpcRegistryReconciler{
now: r.now,
Lister: r.Lister,
OpClient: r.OpClient,
now: r.now,
Lister: r.Lister,
OpClient: r.OpClient,
SSAClient: r.SSAClient,
}
} else if source.Spec.Address != "" {
return &GrpcAddressRegistryReconciler{
Expand All @@ -76,12 +79,13 @@ func (r *registryReconcilerFactory) ReconcilerForSource(source *v1alpha1.Catalog
}

// NewRegistryReconcilerFactory returns an initialized RegistryReconcilerFactory.
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc) RegistryReconcilerFactory {
func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient operatorclient.ClientInterface, configMapServerImage string, now nowFunc, ssaClient *controllerclient.ServerSideApplier) RegistryReconcilerFactory {
return &registryReconcilerFactory{
now: now,
Lister: lister,
OpClient: opClient,
ConfigMapServerImage: configMapServerImage,
SSAClient: ssaClient,
}
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/lib/controller-runtime/client/fake_ssa.go
@@ -0,0 +1,56 @@
package client

import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client"
fakecontrollerclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

// FakeApplier provides a wrapper around the fake k8s controller client to convert the unsupported apply-type patches to merge patches.
func NewFakeApplier(scheme *runtime.Scheme, owner string, objs ...runtime.Object) *ServerSideApplier {
return &ServerSideApplier{
client: &fakeApplier{fakecontrollerclient.NewFakeClientWithScheme(scheme, objs...)},
Scheme: scheme,
Owner: k8scontrollerclient.FieldOwner(owner),
}
}

type fakeApplier struct {
k8scontrollerclient.Client
}

func (c *fakeApplier) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
patch, opts = convertApplyToMergePatch(patch, opts...)
return c.Client.Patch(ctx, obj, patch, opts...)
}

func (c *fakeApplier) Status() k8scontrollerclient.StatusWriter {
return fakeStatusWriter{c.Client.Status()}
}

type fakeStatusWriter struct {
k8scontrollerclient.StatusWriter
}

func (c fakeStatusWriter) Patch(ctx context.Context, obj runtime.Object, patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) error {
patch, opts = convertApplyToMergePatch(patch, opts...)
return c.StatusWriter.Patch(ctx, obj, patch, opts...)
}

func convertApplyToMergePatch(patch k8scontrollerclient.Patch, opts ...k8scontrollerclient.PatchOption) (k8scontrollerclient.Patch, []k8scontrollerclient.PatchOption) {
// Apply patch type is not supported on the fake controller
if patch.Type() == types.ApplyPatchType {
patch = k8scontrollerclient.Merge
patchOptions := make([]k8scontrollerclient.PatchOption, 0, len(opts))
for _, opt := range opts {
if opt == k8scontrollerclient.ForceOwnership {
continue
}
patchOptions = append(patchOptions, opt)
}
opts = patchOptions
}
return patch, opts
}

0 comments on commit cfe2bd9

Please sign in to comment.