diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b9759ca86..03fca8908 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -576,6 +576,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.logger.Errorf("could not sync statefulsets: %v", err) updateFailed = true } + } else { + // patroni configuration is not always reflected on the StatefulSet as they have + // different lifecycle (due to SPILO's implementation - see: issues#501) but they + // still need to be updated via the Patroni REST API. + if err := c.applyPatroniConfig(); err != nil { + c.logger.Errorf("could not apply patroni config: %v", err) + updateFailed = true + } } }() diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index a16e810ed..6b5c05d27 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -149,7 +149,7 @@ func fillResourceList(spec acidv1.ResourceDescription, defaults acidv1.ResourceD return requests, nil } -func generateSpiloJSONConfiguration(pg *acidv1.PostgresqlParam, patroni *acidv1.Patroni, pamRoleName string, logger *logrus.Entry) (string, error) { +func generateSpiloJSONConfiguration(pg *acidv1.PostgresqlParam, patroni *acidv1.Patroni, pamRoleName string, logger *logrus.Entry, isNewCluster bool, initialPatroniConfig string) (string, error) { config := spiloConfiguration{} config.Bootstrap = pgBootstrap{} @@ -219,8 +219,27 @@ PatroniInitDBParams: if patroni.TTL != 0 { config.Bootstrap.DCS.TTL = patroni.TTL } - if patroni.Slots != nil { - config.Bootstrap.DCS.Slots = patroni.Slots + + // The SPILO image is taking in account the 'Slots' settings **only** when bootstraping a new cluster. + // Therefore, updating this value in the SPILO_CONFIGURATION of the statefulset is pointless, but still + // required at cluster creation. + if isNewCluster { + if patroni.Slots != nil { + config.Bootstrap.DCS.Slots = patroni.Slots + } + } else { + oldestKnownPatroniCfg := acidv1.Patroni{} + if initialPatroniConfig == "" { + // when upgrading postgres-operator to this new version, existing StatefulSet won't have this annotation, so we're using the current specs + oldestKnownPatroniCfg = *patroni + } else { + if err := json.Unmarshal([]byte(initialPatroniConfig), &oldestKnownPatroniCfg); err != nil { + logger.Errorf("cannot decode initial patroni config from annotations: %s (%v)", err, initialPatroniConfig) + return "", err + } + } + + config.Bootstrap.DCS.Slots = oldestKnownPatroniCfg.Slots } config.PgLocalConfiguration = make(map[string]interface{}) @@ -784,7 +803,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name }) } - spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger) + // c.Statefulset is not set when called from .createStatefulSet() + initialPatroniConfig := "" + if c.Statefulset != nil { + initialPatroniConfig = c.Statefulset.ObjectMeta.Annotations[initialPatroniConfigAnnotationKey] + } + spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger, c.isNewCluster(), initialPatroniConfig) if err != nil { return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) } @@ -875,6 +899,20 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State numberOfInstances := c.getNumberOfInstances(spec) + ann := map[string]string{rollingUpdateStatefulsetAnnotationKey: "false"} + // Save the initial patroni config for later usage. + // Note: if the annotation is empty (during the upgrade path for example), set it to the current specs. + // see: issues#501 + if c.isNewCluster() || initialPatroniConfig == "" { + x, err := json.Marshal(spec.Patroni) + if err != nil { + return nil, fmt.Errorf("could not JSON encode initial patroni config: %v", err) + } + ann[initialPatroniConfigAnnotationKey] = string(x) + } else { + ann[initialPatroniConfigAnnotationKey] = initialPatroniConfig + } + // the operator has domain-specific logic on how to do rolling updates of PG clusters // so we do not use default rolling updates implemented by stateful sets // that leaves the legacy "OnDelete" update strategy as the only option @@ -894,7 +932,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State Name: c.statefulSetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), - Annotations: map[string]string{rollingUpdateStatefulsetAnnotationKey: "false"}, + Annotations: ann, }, Spec: v1beta1.StatefulSetSpec{ Replicas: &numberOfInstances, @@ -1242,11 +1280,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) c.logger.Info(msg, description.S3WalPath) envs := []v1.EnvVar{ - v1.EnvVar{ + { Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket, }, - v1.EnvVar{ + { Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.UID), }, diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index c89edac63..c3f25e51b 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -20,6 +20,7 @@ import ( const ( rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required" + initialPatroniConfigAnnotationKey = "v1.acid.zalan.do/initial-patroni-config" ) func (c *Cluster) listResources() error { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index f5ae30b81..2d47f57c6 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -314,6 +314,10 @@ func (c *Cluster) syncStatefulSet() error { return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err) } + if err := c.applyPatroniConfig(); err != nil { + return fmt.Errorf("could not apply patroni config: %v", err) + } + // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) if podsRollingUpdateRequired { @@ -372,6 +376,35 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error { len(pods)) } +// TODO: merge applyPatroniConfig() and checkAndSetGlobalPostgreSQLConfiguration()? +func (c *Cluster) applyPatroniConfig() error { + var ( + err error + pods []v1.Pod + ) + + if pods, err = c.listPods(); err != nil { + return err + } + if len(pods) == 0 { + return fmt.Errorf("could not call Patroni API: cluster has no pods") + } + // try all pods until the first one that is successful, as it doesn't matter which pod + // carries the request to change configuration through + for _, pod := range pods { + podName := util.NameFromMeta(pod.ObjectMeta) + c.logger.Debugf("calling Patroni API on a pod %s to set the following patroni config: %v", + podName, c.Postgresql.Spec.Patroni) + + if err = c.patroni.ApplyConfig(&pod, c.Postgresql.Spec.Patroni); err == nil { + return nil + } + c.logger.Warningf("could not patch patroni config with on pod %s: %v", podName, err) + } + return fmt.Errorf("could not reach Patroni API to patch patroni config: failed on every pod (%d total)", + len(pods)) +} + func (c *Cluster) syncSecrets() error { var ( err error diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 23260f277..3e9e94d83 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -10,6 +10,8 @@ import ( "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" ) const ( @@ -23,6 +25,7 @@ const ( type Interface interface { Switchover(master *v1.Pod, candidate string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error + ApplyConfig(server *v1.Pod, config acidv1.Patroni) error } // Patroni API client @@ -102,3 +105,12 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st } return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf) } + +func (p *Patroni) ApplyConfig(server *v1.Pod, config acidv1.Patroni) error { + buf := &bytes.Buffer{} + err := json.NewEncoder(buf).Encode(config) + if err != nil { + return fmt.Errorf("could not encode json: %v", err) + } + return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf) +}