From 7e6e316c3b58929a1ff95b8aa679b00be7a2d8dd Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 11 Jun 2020 11:32:51 +0200 Subject: [PATCH 1/3] try to emit error for missing team name in cluster name --- pkg/apis/acid.zalan.do/v1/marshal.go | 6 +++--- pkg/controller/postgresql.go | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/apis/acid.zalan.do/v1/marshal.go b/pkg/apis/acid.zalan.do/v1/marshal.go index d180f784c..336b0da41 100644 --- a/pkg/apis/acid.zalan.do/v1/marshal.go +++ b/pkg/apis/acid.zalan.do/v1/marshal.go @@ -102,7 +102,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { } tmp.Error = err.Error() - tmp.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid} + tmp.Status.PostgresClusterStatus = ClusterStatusInvalid *p = Postgresql(tmp) @@ -112,10 +112,10 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { if clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID); err != nil { tmp2.Error = err.Error() - tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid} + tmp2.Status.PostgresClusterStatus = ClusterStatusInvalid } else if err := validateCloneClusterDescription(&tmp2.Spec.Clone); err != nil { tmp2.Error = err.Error() - tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid} + tmp2.Status.PostgresClusterStatus = ClusterStatusInvalid } else { tmp2.Spec.ClusterName = clusterName } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index c243f330f..aaa921421 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -14,7 +14,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/reference" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/cluster" @@ -420,18 +422,23 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. clusterError = informerNewSpec.Error } + workerID := c.clusterWorkerID(clusterName) + lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) + if clusterError != "" && eventType != EventDelete { - c.logger. - WithField("cluster-name", clusterName). - Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError) + lg.Errorf("skipping %q event for the invalid cluster: %s", eventType, clusterError) + ref, err := reference.GetReference(scheme.Scheme, informerNewSpec) + if err != nil { + lg.Errorf("could not get reference for Postgresql CR %v/%v: %v", informerNewSpec.Namespace, informerNewSpec.Name, err) + } + c.eventRecorder.Eventf(ref, v1.EventTypeWarning, strings.Title(strings.ToLower(string(eventType))), "%v", clusterError) return } // Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected - // in the informer internal state, making it incohherent with the actual Kubernetes object (and, as a side + // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side // effect, the modified state will be returned together with subsequent events). - workerID := c.clusterWorkerID(clusterName) clusterEvent := ClusterEvent{ EventTime: time.Now(), EventType: eventType, @@ -441,7 +448,6 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. WorkerID: workerID, } - lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { lg.Errorf("error while queueing cluster event: %v", clusterEvent) } From 0691ce8255727314dd9391e4add31e04bca47af6 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 11 Jun 2020 17:01:38 +0200 Subject: [PATCH 2/3] skip creation after new cluster object --- pkg/cluster/cluster.go | 14 +++++++------- pkg/cluster/sync.go | 4 ++-- pkg/controller/postgresql.go | 27 +++++++++++++++------------ 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1585618a6..f503644aa 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -183,7 +183,7 @@ func (c *Cluster) GetReference() *v1.ObjectReference { // SetStatus of Postgres cluster // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above -func (c *Cluster) setStatus(status string) { +func (c *Cluster) SetStatus(status string) { var pgStatus acidv1.PostgresStatus pgStatus.PostgresClusterStatus = status @@ -257,13 +257,13 @@ func (c *Cluster) Create() error { defer func() { if err == nil { - c.setStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running? + c.SetStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running? } else { - c.setStatus(acidv1.ClusterStatusAddFailed) + c.SetStatus(acidv1.ClusterStatusAddFailed) } }() - c.setStatus(acidv1.ClusterStatusCreating) + c.SetStatus(acidv1.ClusterStatusCreating) c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") if err = c.enforceMinResourceLimits(&c.Spec); err != nil { @@ -630,14 +630,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() - c.setStatus(acidv1.ClusterStatusUpdating) + c.SetStatus(acidv1.ClusterStatusUpdating) c.setSpec(newSpec) defer func() { if updateFailed { - c.setStatus(acidv1.ClusterStatusUpdateFailed) + c.SetStatus(acidv1.ClusterStatusUpdateFailed) } else { - c.setStatus(acidv1.ClusterStatusRunning) + c.SetStatus(acidv1.ClusterStatusRunning) } }() diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 697fc2d05..6d0e71214 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -32,9 +32,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { defer func() { if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) - c.setStatus(acidv1.ClusterStatusSyncFailed) + c.SetStatus(acidv1.ClusterStatusSyncFailed) } else if !c.Status.Running() { - c.setStatus(acidv1.ClusterStatusRunning) + c.SetStatus(acidv1.ClusterStatusRunning) } }() diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index aaa921421..95c69a4ff 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -14,9 +14,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/reference" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/cluster" @@ -236,6 +234,15 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) + // if there are already issues skip creation + if cl.Error != "" { + cl.SetStatus(acidv1.ClusterStatusInvalid) + lg.Errorf("could not create cluster: %v", cl.Error) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error) + + return + } + if err := cl.Create(); err != nil { cl.Error = fmt.Sprintf("could not create cluster: %v", err) lg.Error(cl.Error) @@ -422,16 +429,10 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. clusterError = informerNewSpec.Error } - workerID := c.clusterWorkerID(clusterName) - lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) - - if clusterError != "" && eventType != EventDelete { - lg.Errorf("skipping %q event for the invalid cluster: %s", eventType, clusterError) - ref, err := reference.GetReference(scheme.Scheme, informerNewSpec) - if err != nil { - lg.Errorf("could not get reference for Postgresql CR %v/%v: %v", informerNewSpec.Namespace, informerNewSpec.Name, err) - } - c.eventRecorder.Eventf(ref, v1.EventTypeWarning, strings.Title(strings.ToLower(string(eventType))), "%v", clusterError) + if clusterError != "" && eventType != EventDelete && eventType != EventAdd { + c.logger. + WithField("cluster-name", clusterName). + Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError) return } @@ -439,6 +440,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. // in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side // effect, the modified state will be returned together with subsequent events). + workerID := c.clusterWorkerID(clusterName) clusterEvent := ClusterEvent{ EventTime: time.Now(), EventType: eventType, @@ -448,6 +450,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. WorkerID: workerID, } + lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName) if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { lg.Errorf("error while queueing cluster event: %v", clusterEvent) } From 09c7e7a843e5fb769707ea19da0cc68981a2e831 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 15 Jun 2020 17:40:53 +0200 Subject: [PATCH 3/3] move SetStatus to k8sclient and emit event when skipping creation and rename to SetPostgresCRDStatus --- pkg/cluster/cluster.go | 41 ++++++------------------------------ pkg/cluster/sync.go | 4 ++-- pkg/controller/controller.go | 11 ++++++++++ pkg/controller/postgresql.go | 28 ++++++++++++------------ pkg/util/k8sutil/k8sutil.go | 30 ++++++++++++++++++++++++++ 5 files changed, 64 insertions(+), 50 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f503644aa..44c3e9b62 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -5,7 +5,6 @@ package cluster import ( "context" "database/sql" - "encoding/json" "fmt" "reflect" "regexp" @@ -181,34 +180,6 @@ func (c *Cluster) GetReference() *v1.ObjectReference { return ref } -// SetStatus of Postgres cluster -// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above -func (c *Cluster) SetStatus(status string) { - var pgStatus acidv1.PostgresStatus - pgStatus.PostgresClusterStatus = status - - patch, err := json.Marshal(struct { - PgStatus interface{} `json:"status"` - }{&pgStatus}) - - if err != nil { - c.logger.Errorf("could not marshal status: %v", err) - } - - // we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ), - // however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11) - // we should take advantage of it. - newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch( - context.TODO(), c.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status") - if err != nil { - c.logger.Errorf("could not update status: %v", err) - // return as newspec is empty, see PR654 - return - } - // update the spec, maintaining the new resourceVersion. - c.setSpec(newspec) -} - func (c *Cluster) isNewCluster() bool { return c.Status.Creating() } @@ -257,13 +228,13 @@ func (c *Cluster) Create() error { defer func() { if err == nil { - c.SetStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running? + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running? } else { - c.SetStatus(acidv1.ClusterStatusAddFailed) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed) } }() - c.SetStatus(acidv1.ClusterStatusCreating) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating) c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") if err = c.enforceMinResourceLimits(&c.Spec); err != nil { @@ -630,14 +601,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() - c.SetStatus(acidv1.ClusterStatusUpdating) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating) c.setSpec(newSpec) defer func() { if updateFailed { - c.SetStatus(acidv1.ClusterStatusUpdateFailed) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed) } else { - c.SetStatus(acidv1.ClusterStatusRunning) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) } }() diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 6d0e71214..e49bd4537 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -32,9 +32,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { defer func() { if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) - c.SetStatus(acidv1.ClusterStatusSyncFailed) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed) } else if !c.Status.Running() { - c.SetStatus(acidv1.ClusterStatusRunning) + c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) } }() diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 26b6b1b87..6011d3863 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,6 +25,7 @@ import ( typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" ) // Controller represents operator controller @@ -442,6 +443,16 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr return namespace } +// GetReference of Postgres CR object +// i.e. required to emit events to this resource +func (c *Controller) GetReference(postgresql *acidv1.Postgresql) *v1.ObjectReference { + ref, err := reference.GetReference(scheme.Scheme, postgresql) + if err != nil { + c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", postgresql.Namespace, postgresql.Name, err) + } + return ref +} + // hasOwnership returns true if the controller is the "owner" of the postgresql. // Whether it's owner is determined by the value of 'acid.zalan.do/controller' // annotation. If the value matches the controllerID then it owns it, or if the diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 95c69a4ff..a41eb0335 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -234,15 +234,6 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) - // if there are already issues skip creation - if cl.Error != "" { - cl.SetStatus(acidv1.ClusterStatusInvalid) - lg.Errorf("could not create cluster: %v", cl.Error) - c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error) - - return - } - if err := cl.Create(); err != nil { cl.Error = fmt.Sprintf("could not create cluster: %v", err) lg.Error(cl.Error) @@ -429,10 +420,21 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. clusterError = informerNewSpec.Error } - if clusterError != "" && eventType != EventDelete && eventType != EventAdd { - c.logger. - WithField("cluster-name", clusterName). - Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError) + if clusterError != "" && eventType != EventDelete { + c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError) + + switch eventType { + case EventAdd: + c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed) + c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError) + case EventUpdate: + c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed) + c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError) + default: + c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed) + c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError) + } + return } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index d7be2f48a..5cde1c3e8 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -6,10 +6,13 @@ import ( "reflect" b64 "encoding/base64" + "encoding/json" batchv1beta1 "k8s.io/api/batch/v1beta1" clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/spec" apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" @@ -156,6 +159,33 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { return kubeClient, nil } +// SetPostgresCRDStatus of Postgres cluster +func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*acidv1.Postgresql, error) { + var pg *acidv1.Postgresql + var pgStatus acidv1.PostgresStatus + pgStatus.PostgresClusterStatus = status + + patch, err := json.Marshal(struct { + PgStatus interface{} `json:"status"` + }{&pgStatus}) + + if err != nil { + return pg, fmt.Errorf("could not marshal status: %v", err) + } + + // we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ), + // however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11) + // we should take advantage of it. + pg, err = client.AcidV1ClientSet.AcidV1().Postgresqls(clusterName.Namespace).Patch( + context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status") + if err != nil { + return pg, fmt.Errorf("could not update status: %v", err) + } + + // update the spec, maintaining the new resourceVersion. + return pg, nil +} + // SameService compares the Services func SameService(cur, new *v1.Service) (match bool, reason string) { //TODO: improve comparison