Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
}
} else {
// patroni configuration is not always reflected on the StatefulSet as they have
// different lifecycle (due to SPILO's implementation - see: issues#501) but they
// still need to be updated via the Patroni REST API.
if err := c.applyPatroniConfig(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the intention here to update Patroni config in DCS on every Update event even if the actual config did not change ?

c.logger.Errorf("could not apply patroni config: %v", err)
updateFailed = true
}
}
}()

Expand Down
52 changes: 45 additions & 7 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func fillResourceList(spec acidv1.ResourceDescription, defaults acidv1.ResourceD
return requests, nil
}

func generateSpiloJSONConfiguration(pg *acidv1.PostgresqlParam, patroni *acidv1.Patroni, pamRoleName string, logger *logrus.Entry) (string, error) {
func generateSpiloJSONConfiguration(pg *acidv1.PostgresqlParam, patroni *acidv1.Patroni, pamRoleName string, logger *logrus.Entry, isNewCluster bool, initialPatroniConfig string) (string, error) {
Copy link
Member

@sdudoladov sdudoladov Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests also need to be updated

pkg/cluster/k8sres_test.go:76:48: not enough arguments in call to generateSpiloJSONConfiguration
        have (*"github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1".PostgresqlParam, *"github.com/zala
ndo/postgres-operator/pkg/apis/acid.zalan.do/v1".Patroni, string, *logrus.Entry)
        want (*"github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1".PostgresqlParam, *"github.com/zala
ndo/postgres-operator/pkg/apis/acid.zalan.do/v1".Patroni, string, *logrus.Entry, bool, string)

config := spiloConfiguration{}

config.Bootstrap = pgBootstrap{}
Expand Down Expand Up @@ -219,8 +219,27 @@ PatroniInitDBParams:
if patroni.TTL != 0 {
config.Bootstrap.DCS.TTL = patroni.TTL
}
if patroni.Slots != nil {
config.Bootstrap.DCS.Slots = patroni.Slots

// The SPILO image is taking in account the 'Slots' settings **only** when bootstraping a new cluster.
// Therefore, updating this value in the SPILO_CONFIGURATION of the statefulset is pointless, but still
// required at cluster creation.
if isNewCluster {
if patroni.Slots != nil {
config.Bootstrap.DCS.Slots = patroni.Slots
}
} else {
oldestKnownPatroniCfg := acidv1.Patroni{}
if initialPatroniConfig == "" {
// when upgrading postgres-operator to this new version, existing StatefulSet won't have this annotation, so we're using the current specs
oldestKnownPatroniCfg = *patroni
} else {
if err := json.Unmarshal([]byte(initialPatroniConfig), &oldestKnownPatroniCfg); err != nil {
logger.Errorf("cannot decode initial patroni config from annotations: %s (%v)", err, initialPatroniConfig)
return "", err
}
}

config.Bootstrap.DCS.Slots = oldestKnownPatroniCfg.Slots
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but at this point the cluster already exists and bootstrap options have no effect. why are we resetting it again ? to get the same stateful set spec ?

}

config.PgLocalConfiguration = make(map[string]interface{})
Expand Down Expand Up @@ -784,7 +803,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State
func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name })
}

spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger)
// c.Statefulset is not set when called from .createStatefulSet()
initialPatroniConfig := ""
if c.Statefulset != nil {
initialPatroniConfig = c.Statefulset.ObjectMeta.Annotations[initialPatroniConfigAnnotationKey]
}
spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger, c.isNewCluster(), initialPatroniConfig)
if err != nil {
return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err)
}
Expand Down Expand Up @@ -875,6 +899,20 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State

numberOfInstances := c.getNumberOfInstances(spec)

ann := map[string]string{rollingUpdateStatefulsetAnnotationKey: "false"}
// Save the initial patroni config for later usage.
// Note: if the annotation is empty (during the upgrade path for example), set it to the current specs.
// see: issues#501
if c.isNewCluster() || initialPatroniConfig == "" {
x, err := json.Marshal(spec.Patroni)
if err != nil {
return nil, fmt.Errorf("could not JSON encode initial patroni config: %v", err)
}
ann[initialPatroniConfigAnnotationKey] = string(x)
} else {
ann[initialPatroniConfigAnnotationKey] = initialPatroniConfig
}

// the operator has domain-specific logic on how to do rolling updates of PG clusters
// so we do not use default rolling updates implemented by stateful sets
// that leaves the legacy "OnDelete" update strategy as the only option
Expand All @@ -894,7 +932,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State
Name: c.statefulSetName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: map[string]string{rollingUpdateStatefulsetAnnotationKey: "false"},
Annotations: ann,
},
Spec: v1beta1.StatefulSetSpec{
Replicas: &numberOfInstances,
Expand Down Expand Up @@ -1242,11 +1280,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
c.logger.Info(msg, description.S3WalPath)

envs := []v1.EnvVar{
v1.EnvVar{
{
Name: "CLONE_WAL_S3_BUCKET",
Value: c.OpConfig.WALES3Bucket,
},
v1.EnvVar{
{
Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX",
Value: getBucketScopeSuffix(description.UID),
},
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

const (
rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required"
initialPatroniConfigAnnotationKey = "v1.acid.zalan.do/initial-patroni-config"
)

func (c *Cluster) listResources() error {
Expand Down
33 changes: 33 additions & 0 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ func (c *Cluster) syncStatefulSet() error {
return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err)
}

if err := c.applyPatroniConfig(); err != nil {
return fmt.Errorf("could not apply patroni config: %v", err)
}

// if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset)
if podsRollingUpdateRequired {
Expand Down Expand Up @@ -372,6 +376,35 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
len(pods))
}

// TODO: merge applyPatroniConfig() and checkAndSetGlobalPostgreSQLConfiguration()?
func (c *Cluster) applyPatroniConfig() error {
var (
err error
pods []v1.Pod
)

if pods, err = c.listPods(); err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("could not call Patroni API: cluster has no pods")
}
// try all pods until the first one that is successful, as it doesn't matter which pod
// carries the request to change configuration through
for _, pod := range pods {
podName := util.NameFromMeta(pod.ObjectMeta)
c.logger.Debugf("calling Patroni API on a pod %s to set the following patroni config: %v",
podName, c.Postgresql.Spec.Patroni)

if err = c.patroni.ApplyConfig(&pod, c.Postgresql.Spec.Patroni); err == nil {
return nil
}
c.logger.Warningf("could not patch patroni config with on pod %s: %v", podName, err)
}
return fmt.Errorf("could not reach Patroni API to patch patroni config: failed on every pod (%d total)",
len(pods))
}

func (c *Cluster) syncSecrets() error {
var (
err error
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"

acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
)

const (
Expand All @@ -23,6 +25,7 @@ const (
type Interface interface {
Switchover(master *v1.Pod, candidate string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
ApplyConfig(server *v1.Pod, config acidv1.Patroni) error
}

// Patroni API client
Expand Down Expand Up @@ -102,3 +105,12 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
}
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
}

func (p *Patroni) ApplyConfig(server *v1.Pod, config acidv1.Patroni) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(config)
if err != nil {
return fmt.Errorf("could not encode json: %v", err)
}
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
}