Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -271,26 +272,29 @@ func (c *Cluster) Create() (err error) {
)

defer func() {
var (
pgUpdatedStatus *acidv1.Postgresql
errStatus error
)
if err == nil {
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
} else {
currentStatus := c.Status.DeepCopy()
pg := c.Postgresql.DeepCopy()
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning

if err != nil {
c.logger.Warningf("cluster created failed: %v", err)
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
}
if errStatus != nil {
c.logger.Warningf("could not set cluster status: %v", errStatus)
return
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
}
if pgUpdatedStatus != nil {

if !equality.Semantic.DeepEqual(currentStatus, pg.Status) {
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg)
if err != nil {
c.logger.Warningf("could not set cluster status: %v", err)
return
}
c.setSpec(pgUpdatedStatus)
}
}()

pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
pg := c.Postgresql.DeepCopy()
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusCreating

pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg)
if err != nil {
return fmt.Errorf("could not set cluster status: %v", err)
}
Expand Down Expand Up @@ -978,7 +982,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
c.mu.Lock()
defer c.mu.Unlock()

c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating

newSpec, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
if err != nil {
return fmt.Errorf("could not set cluster status to updating: %w", err)
}

if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
// do not apply any major version related changes yet
Expand All @@ -987,20 +996,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
c.setSpec(newSpec)

defer func() {
var (
pgUpdatedStatus *acidv1.Postgresql
err error
)
currentStatus := newSpec.Status.DeepCopy()
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning

if updateFailed {
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
} else {
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
}
if err != nil {
c.logger.Warningf("could not set cluster status: %v", err)
return
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
}
if pgUpdatedStatus != nil {

if !equality.Semantic.DeepEqual(currentStatus, newSpec.Status) {
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
if err != nil {
c.logger.Warningf("could not set cluster status: %v", err)
return
}
c.setSpec(pgUpdatedStatus)
}
}()
Expand Down
21 changes: 10 additions & 11 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand All @@ -43,21 +44,19 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
c.setSpec(newSpec)

defer func() {
var (
pgUpdatedStatus *acidv1.Postgresql
errStatus error
)
if err != nil {
c.logger.Warningf("error while syncing cluster state: %v", err)
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
} else if !c.Status.Running() {
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
}
if errStatus != nil {
c.logger.Warningf("could not set cluster status: %v", errStatus)
return
}
if pgUpdatedStatus != nil {

if !equality.Semantic.DeepEqual(oldSpec.Status, newSpec.Status) {
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
if err != nil {
c.logger.Warningf("could not set cluster status: %v", err)
return
}
c.setSpec(pgUpdatedStatus)
}
}()
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
},
}

// add postgresql cluster to fake client
_, err := client.PostgresqlsGetter.Postgresqls(namespace).Create(context.TODO(), &pg, metav1.CreateOptions{})
if err != nil {
return nil, err
}

cluster := New(
Config{
OpConfig: config.Config{
Expand Down Expand Up @@ -321,7 +327,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
}, client, pg, logger, eventRecorder)
cluster.Name = clusterName
cluster.Namespace = namespace
_, err := cluster.createStatefulSet()
_, err = cluster.createStatefulSet()
if err != nil {
return nil, err
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/controller/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (c *Controller) acquireInitialListOfClusters() error {
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
if c.opConfig.EnableTeamIdClusternamePrefix {
if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
pgSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusInvalid
c.KubeClient.SetPostgresCRDStatus(clusterName, pgSpec)
return nil, err
}
}
Expand Down Expand Up @@ -470,13 +471,25 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.

switch eventType {
case EventAdd:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
if err != nil {
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
}
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
case EventUpdate:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
if err != nil {
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
}
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
default:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
if err != nil {
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
}
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
}

Expand Down
20 changes: 2 additions & 18 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,8 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
}

// SetPostgresCRDStatus of Postgres cluster
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*apiacidv1.Postgresql, error) {
var pg *apiacidv1.Postgresql
var pgStatus apiacidv1.PostgresStatus
pgStatus.PostgresClusterStatus = status

patch, err := json.Marshal(struct {
PgStatus interface{} `json:"status"`
}{&pgStatus})

if err != nil {
return pg, fmt.Errorf("could not marshal status: %v", err)
}

// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
// we should take advantage of it.
pg, err = client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Patch(
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql) (*apiacidv1.Postgresql, error) {
pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).UpdateStatus(context.TODO(), pg, metav1.UpdateOptions{})
if err != nil {
return pg, fmt.Errorf("could not update status: %v", err)
}
Expand Down