Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 50 additions & 7 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilclock "k8s.io/apimachinery/pkg/util/clock"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
Expand Down Expand Up @@ -142,7 +143,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
bundleUnpackerImage: configmapRegistryImage, // Assume the configmapRegistryImage contains the unpacker for now.
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, op.logger)

// Wire OLM CR sharedIndexInformers
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, resyncPeriod)
Expand Down Expand Up @@ -363,8 +364,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())

switch state.State {
case connectivity.Ready:
if state.State == connectivity.Ready {
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand All @@ -377,11 +377,11 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
}

o.nsResolveQueue.Add(state.Key.Namespace)
default:
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
}
}
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
Copy link
Member

@ecordell ecordell Dec 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this just requeuing after every state change? is that desired?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually do so that the status stays in sync with what the grpc connection is reporting. This only adds additional requeues when the connection is reported as ready as requeues were already being done otherwise. I could refactor since a lot of the above is similar if you agree.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and refactored it.

o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
}
return
}

func (o *Operator) requeueOwners(obj metav1.Object) {
Expand Down Expand Up @@ -631,6 +631,7 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour

// Set connection status and return.
updateConnectionStateFunc(out, source)
return
}

// connection is already good, but we need to update the sync time
Expand All @@ -643,6 +644,47 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour
return
}

func (o *Operator) checkBackingPodStatus(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) {
if in.Spec.Address != "" {
// if this is an imageless source type, there's no backing pod to check
return in, true, nil
}

out = in.DeepCopy()

selector := reconciler.CatalogSourceLabelForPod(in.GetName())
pods, err := o.lister.CoreV1().PodLister().Pods(in.GetNamespace()).List(labels.SelectorFromSet(selector))
if err != nil {
continueSync = false
syncError = fmt.Errorf("error while examining catalog source pod: %v", err)
return
}

if len(pods) > 0 {
// this assumes a 1:1 mapping for catalog sources to pods
pod := pods[0]
for _, cond := range pod.Status.Conditions {
if cond.Type != corev1.ContainersReady {
continue
}

if cond.Status == corev1.ConditionTrue {
continueSync = true
} else {
logger.Infof("backing pod '%s' not yet ready", pod.GetName())
}
break
}
} else {
continueSync = false
syncError = fmt.Errorf("did not find expected backing pod")
return
}

o.catsrcQueueSet.RequeueAfter(in.GetNamespace(), in.GetName(), 2*time.Second)
return
}

func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
catsrc, ok := obj.(*v1alpha1.CatalogSource)
if !ok {
Expand Down Expand Up @@ -701,6 +743,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
chain := []CatalogSourceSyncFunc{
o.syncConfigMap,
o.syncRegistryServer,
o.checkBackingPodStatus,
o.syncConnection,
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
}
op.sources = grpc.NewSourceStore(config.logger, 1*time.Second, 5*time.Second, op.syncSourceState)
if op.reconciler == nil {
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now, op.logger)
}

op.RunInformers(ctx)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func toManifest(obj runtime.Object) string {
}

func pod(s v1alpha1.CatalogSource) *corev1.Pod {
pod := reconciler.Pod(&s, "registry-server", s.Spec.Image, s.GetLabels(), 5, 10)
pod := reconciler.Pod(&s, "registry-server", s.Spec.Image, s.GetLabels(), 5, 10, false)
ownerutil.AddOwner(pod, &s, false, false)
return pod
}
2 changes: 1 addition & 1 deletion pkg/controller/registry/grpc/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *SourceStore) Start(ctx context.Context) {
s.logger.Debug("closing source manager")
return
case e := <-s.notify:
s.logger.Debugf("Got source event: %#v", e)
s.logger.Debugf("Got source event: %#v, (state: %s)", e, e.State)
s.syncFn(e)
}
}
Expand Down
25 changes: 15 additions & 10 deletions pkg/controller/registry/reconciler/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ const (
)

func (s *configMapCatalogSourceDecorator) Labels() map[string]string {
labels := map[string]string{
CatalogSourceLabelKey: s.GetName(),
}
labels := CatalogSourceLabelForPod(s.GetName())
if s.Spec.SourceType == v1alpha1.SourceTypeInternal || s.Spec.SourceType == v1alpha1.SourceTypeConfigmap {
labels[ConfigMapRVLabelKey] = s.Status.ConfigMapResource.ResourceVersion
}
Expand Down Expand Up @@ -88,8 +86,8 @@ func (s *configMapCatalogSourceDecorator) Service() *v1.Service {
return svc
}

func (s *configMapCatalogSourceDecorator) Pod(image string) *v1.Pod {
pod := Pod(s.CatalogSource, "configmap-registry-server", image, s.Labels(), 1, 2)
func (s *configMapCatalogSourceDecorator) Pod(image string, debugLogging bool) *v1.Pod {
pod := Pod(s.CatalogSource, "configmap-registry-server", image, s.Labels(), 1, 2, debugLogging)
pod.Spec.ServiceAccountName = s.GetName() + ConfigMapServerPostfix
pod.Spec.Containers[0].Command = []string{"configmap-server", "-c", s.Spec.ConfigMap, "-n", s.GetNamespace()}
ownerutil.AddOwner(pod, s.CatalogSource, false, false)
Expand Down Expand Up @@ -154,12 +152,20 @@ type ConfigMapRegistryReconciler struct {
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
Image string
logger *logrus.Logger
}

var _ RegistryEnsurer = &ConfigMapRegistryReconciler{}
var _ RegistryChecker = &ConfigMapRegistryReconciler{}
var _ RegistryReconciler = &ConfigMapRegistryReconciler{}

func (c *ConfigMapRegistryReconciler) isDebugEnabled() bool {
if c.logger == nil {
return false
}
return c.logger.IsLevelEnabled(logrus.DebugLevel)
}

func (c *ConfigMapRegistryReconciler) currentService(source configMapCatalogSourceDecorator) *v1.Service {
serviceName := source.Service().GetName()
service, err := c.Lister.CoreV1().ServiceLister().Services(source.GetNamespace()).Get(serviceName)
Expand Down Expand Up @@ -201,7 +207,7 @@ func (c *ConfigMapRegistryReconciler) currentRoleBinding(source configMapCatalog
}

func (c *ConfigMapRegistryReconciler) currentPods(source configMapCatalogSourceDecorator, image string) []*v1.Pod {
podName := source.Pod(image).GetName()
podName := source.Pod(image, c.isDebugEnabled()).GetName()
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromSet(source.Selector()))
if err != nil {
logrus.WithField("pod", podName).WithError(err).Debug("couldn't find pod in cache")
Expand All @@ -214,7 +220,7 @@ func (c *ConfigMapRegistryReconciler) currentPods(source configMapCatalogSourceD
}

func (c *ConfigMapRegistryReconciler) currentPodsWithCorrectResourceVersion(source configMapCatalogSourceDecorator, image string) []*v1.Pod {
podName := source.Pod(image).GetName()
podName := source.Pod(image, c.isDebugEnabled()).GetName()
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels()))
if err != nil {
logrus.WithField("pod", podName).WithError(err).Debug("couldn't find pod in cache")
Expand Down Expand Up @@ -279,7 +285,7 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alph
return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
}
if err := c.ensurePod(source, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(image).GetName())
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(image, c.isDebugEnabled()).GetName())
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", source.Service().GetName())
Expand Down Expand Up @@ -341,7 +347,7 @@ func (c *ConfigMapRegistryReconciler) ensureRoleBinding(source configMapCatalogS
}

func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDecorator, overwrite bool) error {
pod := source.Pod(c.Image)
pod := source.Pod(c.Image, c.isDebugEnabled())
currentPods := c.currentPods(source, c.Image)
if len(currentPods) > 0 {
if !overwrite {
Expand Down Expand Up @@ -405,7 +411,6 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha

// Check on registry resources
// TODO: more complex checks for resources
// TODO: add gRPC health check
if c.currentServiceAccount(source) == nil ||
c.currentRole(source) == nil ||
c.currentRoleBinding(source) == nil ||
Expand Down
10 changes: 6 additions & 4 deletions pkg/controller/registry/reconciler/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ghodss/yaml"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -109,6 +110,7 @@ func fakeReconcilerFactory(t *testing.T, stopc <-chan struct{}, options ...fakeR
OpClient: opClientFake,
Lister: lister,
ConfigMapServerImage: config.configMapServerImage,
logger: logrus.StandardLogger(),
}

var hasSyncedCheckFns []cache.InformerSynced
Expand Down Expand Up @@ -170,7 +172,7 @@ func validConfigMapCatalogSource(configMap *corev1.ConfigMap) *v1alpha1.CatalogS
Name: "cool-catalog",
Namespace: testNamespace,
UID: types.UID("catalog-uid"),
Labels: map[string]string{"olm.catalogSource": "cool-catalog"},
Labels: map[string]string{"olm.catalogSource": "cool-catalog"},
},
Spec: v1alpha1.CatalogSourceSpec{
ConfigMap: "cool-configmap",
Expand All @@ -193,7 +195,7 @@ func objectsForCatalogSource(catsrc *v1alpha1.CatalogSource) []runtime.Object {
case v1alpha1.SourceTypeInternal, v1alpha1.SourceTypeConfigmap:
decorated := configMapCatalogSourceDecorator{catsrc}
objs = clientfake.AddSimpleGeneratedNames(
clientfake.AddSimpleGeneratedName(decorated.Pod(registryImageName)),
clientfake.AddSimpleGeneratedName(decorated.Pod(registryImageName, false)),
decorated.Service(),
decorated.ServiceAccount(),
decorated.Role(),
Expand All @@ -203,7 +205,7 @@ func objectsForCatalogSource(catsrc *v1alpha1.CatalogSource) []runtime.Object {
if catsrc.Spec.Image != "" {
decorated := grpcCatalogSourceDecorator{catsrc}
objs = clientfake.AddSimpleGeneratedNames(
decorated.Pod(),
decorated.Pod(false),
decorated.Service(),
)
}
Expand Down Expand Up @@ -439,7 +441,7 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
// if no error, the reconciler should create the same set of kube objects every time
decorated := configMapCatalogSourceDecorator{tt.in.catsrc}

pod := decorated.Pod(registryImageName)
pod := decorated.Pod(registryImageName, false)
listOptions := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set{CatalogSourceLabelKey: tt.in.catsrc.GetName()}).String()}
outPods, err := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).List(listOptions)
require.NoError(t, err)
Expand Down
30 changes: 18 additions & 12 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ func (s *grpcCatalogSourceDecorator) SelectorForUpdate() labels.Selector {
}

func (s *grpcCatalogSourceDecorator) Labels() map[string]string {
return map[string]string{
CatalogSourceLabelKey: s.GetName(),
}
return CatalogSourceLabelForPod(s.GetName())
}

func (s *grpcCatalogSourceDecorator) Service() *v1.Service {
Expand All @@ -62,8 +60,8 @@ func (s *grpcCatalogSourceDecorator) Service() *v1.Service {
return svc
}

func (s *grpcCatalogSourceDecorator) Pod() *v1.Pod {
pod := Pod(s.CatalogSource, "registry-server", s.Spec.Image, s.Labels(), 5, 10)
func (s *grpcCatalogSourceDecorator) Pod(debugLogging bool) *v1.Pod {
pod := Pod(s.CatalogSource, "registry-server", s.Spec.Image, s.Labels(), 5, 10, debugLogging)
ownerutil.AddOwner(pod, s.CatalogSource, false, false)
return pod
}
Expand All @@ -72,10 +70,18 @@ type GrpcRegistryReconciler struct {
now nowFunc
Lister operatorlister.OperatorLister
OpClient operatorclient.ClientInterface
logger *logrus.Logger
}

var _ RegistryReconciler = &GrpcRegistryReconciler{}

func (s *GrpcRegistryReconciler) isDebugEnabled() bool {
if s.logger == nil {
return false
}
return s.logger.IsLevelEnabled(logrus.DebugLevel)
}

func (c *GrpcRegistryReconciler) currentService(source grpcCatalogSourceDecorator) *v1.Service {
serviceName := source.Service().GetName()
service, err := c.Lister.CoreV1().ServiceLister().Services(source.GetNamespace()).Get(serviceName)
Expand Down Expand Up @@ -136,10 +142,10 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensurePod(source, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod().GetName())
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(c.isDebugEnabled()).GetName())
}
if err := c.ensureUpdatePod(source); err != nil {
return errors.Wrapf(err, "error ensuring updated catalog source pod: %s", source.Pod().GetName())
return errors.Wrapf(err, "error ensuring updated catalog source pod: %s", source.Pod(c.isDebugEnabled()).GetName())
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", source.Service().GetName())
Expand Down Expand Up @@ -171,9 +177,9 @@ func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, ov
}
}
}
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(source.Pod())
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(source.Pod(c.isDebugEnabled()))
if err != nil {
return errors.Wrapf(err, "error creating new pod: %s", source.Pod().GetGenerateName())
return errors.Wrapf(err, "error creating new pod: %s", source.Pod(c.isDebugEnabled()).GetGenerateName())
}

return nil
Expand All @@ -199,7 +205,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat
// Update the update pod to promote it to serving pod
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Update(updatePod)
if err != nil {
return errors.Wrapf(err, "error creating new pod: %s", source.Pod().GetName())
return errors.Wrapf(err, "error creating new pod: %s", source.Pod(c.isDebugEnabled()).GetName())
}

break
Expand Down Expand Up @@ -257,13 +263,13 @@ func (c *GrpcRegistryReconciler) ensureService(source grpcCatalogSourceDecorator
// createUpdatePod is an internal method that creates a pod using the latest catalog source.
func (c *GrpcRegistryReconciler) createUpdatePod(source grpcCatalogSourceDecorator) error {
// remove label from pod to ensure service does accidentally route traffic to the pod
p := source.Pod()
p := source.Pod(c.isDebugEnabled())
p.Labels[CatalogSourceLabelKey] = ""
p.Labels[CatalogSourceUpdateKey] = source.Name

_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(p)
if err != nil {
logrus.WithField("pod", source.Pod().GetName()).Warn("couldn't create new catalogsource pod")
logrus.WithField("pod", source.Pod(c.isDebugEnabled()).GetName()).Warn("couldn't create new catalogsource pod")
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/reconciler/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestGrpcRegistryReconciler(t *testing.T) {

// Check for resource existence
decorated := grpcCatalogSourceDecorator{tt.in.catsrc}
pod := decorated.Pod()
pod := decorated.Pod(false)
service := decorated.Service()
listOptions := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set{CatalogSourceLabelKey: tt.in.catsrc.GetName()}).String()}
outPods, podErr := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).List(listOptions)
Expand Down
Loading