Skip to content
10 changes: 5 additions & 5 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -1099,19 +1099,19 @@ def test_patroni_config_update(self):

def compare_config():
effective_config = k8s.patroni_rest(masterPod.metadata.name, "config")
desired_patroni = pg_patch_config["spec"]["patroni"]
desired_config = pg_patch_config["spec"]["patroni"]
desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"]
effective_parameters = effective_config["postgresql"]["parameters"]
self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"],
"max_connections not updated")
self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added")
self.assertEqual(desired_patroni["ttl"], effective_config["ttl"],
self.assertEqual(desired_config["ttl"], effective_config["ttl"],
"ttl not updated")
self.assertEqual(desired_patroni["loop_wait"], effective_config["loop_wait"],
self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"],
"loop_wait not updated")
self.assertEqual(desired_patroni["retry_timeout"], effective_config["retry_timeout"],
self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"],
"retry_timeout not updated")
self.assertEqual(desired_patroni["synchronous_mode"], effective_config["synchronous_mode"],
self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"],
"synchronous_mode not updated")
return True

Expand Down
17 changes: 8 additions & 9 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ import (
)

const (
pgBinariesLocationTemplate = "/usr/lib/postgresql/%v/bin"
patroniPGBinariesParameterName = "bin_dir"
patroniPGParametersParameterName = "parameters"
patroniPGHBAConfParameterName = "pg_hba"
localHost = "127.0.0.1/32"
connectionPoolerContainer = "connection-pooler"
pgPort = 5432
pgBinariesLocationTemplate = "/usr/lib/postgresql/%v/bin"
patroniPGBinariesParameterName = "bin_dir"
patroniPGHBAConfParameterName = "pg_hba"
localHost = "127.0.0.1/32"
connectionPoolerContainer = "connection-pooler"
pgPort = 5432
)

type pgUser struct {
Expand Down Expand Up @@ -277,11 +276,11 @@ PatroniInitDBParams:
local, bootstrap := getLocalAndBoostrapPostgreSQLParameters(pg.Parameters)

if len(local) > 0 {
config.PgLocalConfiguration[patroniPGParametersParameterName] = local
config.PgLocalConfiguration[constants.PatroniPGParametersParameterName] = local
}
if len(bootstrap) > 0 {
config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{})
config.Bootstrap.DCS.PGBootstrapConfiguration[patroniPGParametersParameterName] = bootstrap
config.Bootstrap.DCS.PGBootstrapConfiguration[constants.PatroniPGParametersParameterName] = bootstrap
}
}
// Patroni gives us a choice of writing pg_hba.conf to either the bootstrap section or to the local postgresql one.
Expand Down
60 changes: 35 additions & 25 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,24 @@ func (c *Cluster) syncStatefulSet() error {
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
for i, pod := range pods {
emptyPatroniConfig := acidv1.Patroni{}
podName := util.NameFromMeta(pods[i].ObjectMeta)
config, err := c.patroni.GetConfig(&pod)
patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod)
if err != nil {
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
continue
}
instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue

// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
// do not attempt a restart
if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 {
instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
}
break
}
break
}

// if the config update requires a restart, call Patroni restart for replicas first, then master
Expand Down Expand Up @@ -493,16 +499,9 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri

// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
// (like max_connections) have changed and if necessary sets it via the Patroni API
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) {
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) {
configToSet := make(map[string]interface{})
parametersToSet := make(map[string]string)
effectivePgParameters := make(map[string]interface{})

// read effective Patroni config if set
if patroniConfig != nil {
effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{})
effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{})
}

// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
desiredPgParameters := c.Spec.Parameters
Expand All @@ -514,36 +513,47 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC
}

if len(parametersToSet) > 0 {
configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet}
configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet}
}

// compare other options from config with c.Spec.Patroni from manifest
desiredPatroniConfig := c.Spec.Patroni
if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) {
if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait {
configToSet["loop_wait"] = desiredPatroniConfig.LoopWait
}
if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) {
if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != patroniConfig.MaximumLagOnFailover {
configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover
}
if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) {
if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, patroniConfig.PgHba) {
configToSet["pg_hba"] = desiredPatroniConfig.PgHba
}
if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) {
if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != patroniConfig.RetryTimeout {
configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout
}
if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) {
configToSet["slots"] = desiredPatroniConfig.Slots
}
if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] {
if desiredPatroniConfig.SynchronousMode != patroniConfig.SynchronousMode {
configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode
}
if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] {
if desiredPatroniConfig.SynchronousModeStrict != patroniConfig.SynchronousModeStrict {
configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict
}
if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) {
if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != patroniConfig.TTL {
configToSet["ttl"] = desiredPatroniConfig.TTL
}

// check if specified slots exist in config and if they differ
slotsToSet := make(map[string]map[string]string)
for slotName, desiredSlot := range desiredPatroniConfig.Slots {
if effectiveSlot, exists := patroniConfig.Slots[slotName]; exists {
if reflect.DeepEqual(desiredSlot, effectiveSlot) {
continue
}
}
slotsToSet[slotName] = desiredSlot
}
if len(slotsToSet) > 0 {
configToSet["slots"] = slotsToSet
}

if len(configToSet) == 0 {
return false, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/constants/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const (
PostgresDataMount = "/home/postgres/pgdata"
PostgresDataPath = PostgresDataMount + "/pgroot"

PatroniPGParametersParameterName = "parameters"

PostgresConnectRetryTimeout = 2 * time.Minute
PostgresConnectTimeout = 15 * time.Second

Expand Down
90 changes: 49 additions & 41 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"strconv"
"time"

"github.com/zalando/postgres-operator/pkg/util/constants"
httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient"

"github.com/sirupsen/logrus"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
v1 "k8s.io/api/core/v1"
)

Expand All @@ -31,7 +33,7 @@ type Interface interface {
SetPostgresParameters(server *v1.Pod, options map[string]string) error
GetMemberData(server *v1.Pod) (MemberData, error)
Restart(server *v1.Pod) error
GetConfig(server *v1.Pod) (map[string]interface{}, error)
GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error)
SetConfig(server *v1.Pod, config map[string]interface{}) error
}

Expand Down Expand Up @@ -109,28 +111,23 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
}

func (p *Patroni) httpGet(url string) (string, error) {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return "", fmt.Errorf("could not create request: %v", err)
}
p.logger.Debugf("making GET http request: %s", url)

p.logger.Debugf("making GET http request: %s", request.URL.String())

resp, err := p.httpClient.Do(request)
response, err := p.httpClient.Get(url)
if err != nil {
return "", fmt.Errorf("could not make request: %v", err)
}
bodyBytes, err := ioutil.ReadAll(resp.Body)
defer response.Body.Close()

bodyBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %v", err)
}
if err := resp.Body.Close(); err != nil {
return "", fmt.Errorf("could not close request: %v", err)
}

if resp.StatusCode != http.StatusOK {
return string(bodyBytes), fmt.Errorf("patroni returned '%d'", resp.StatusCode)
if response.StatusCode != http.StatusOK {
return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode)
}

return string(bodyBytes), nil
}

Expand Down Expand Up @@ -194,30 +191,43 @@ type MemberData struct {
Patroni MemberDataPatroni `json:"patroni"`
}

func (p *Patroni) GetConfigOrStatus(server *v1.Pod, path string) (map[string]interface{}, error) {
result := make(map[string]interface{})
func (p *Patroni) GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) {
var (
patroniConfig acidv1.Patroni
pgConfig map[string]interface{}
)
apiURLString, err := apiURL(server)
if err != nil {
return result, err
return patroniConfig, nil, err
}
body, err := p.httpGet(apiURLString + path)
err = json.Unmarshal([]byte(body), &result)
body, err := p.httpGet(apiURLString + configPath)
if err != nil {
return result, err
return patroniConfig, nil, err
}
err = json.Unmarshal([]byte(body), &patroniConfig)
if err != nil {
return patroniConfig, nil, err
}

return result, err
}

func (p *Patroni) GetStatus(server *v1.Pod) (map[string]interface{}, error) {
return p.GetConfigOrStatus(server, statusPath)
}
// unmarshalling postgresql parameters needs a detour
err = json.Unmarshal([]byte(body), &pgConfig)
if err != nil {
return patroniConfig, nil, err
}
pgParameters := make(map[string]string)
if _, exists := pgConfig["postgresql"]; exists {
effectivePostgresql := pgConfig["postgresql"].(map[string]interface{})
effectivePgParameters := effectivePostgresql[constants.PatroniPGParametersParameterName].(map[string]interface{})
for parameter, value := range effectivePgParameters {
strValue := fmt.Sprintf("%v", value)
pgParameters[parameter] = strValue
}
}

func (p *Patroni) GetConfig(server *v1.Pod) (map[string]interface{}, error) {
return p.GetConfigOrStatus(server, configPath)
return patroniConfig, pgParameters, err
}

//Restart method restarts instance via Patroni POST API call.
// Restart method restarts instance via Patroni POST API call.
func (p *Patroni) Restart(server *v1.Pod) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(map[string]interface{}{"restart_pending": true})
Expand All @@ -228,9 +238,13 @@ func (p *Patroni) Restart(server *v1.Pod) error {
if err != nil {
return err
}
status, err := p.GetStatus(server)
pending_restart, ok := status["pending_restart"]
if !ok || !pending_restart.(bool) {
memberData, err := p.GetMemberData(server)
if err != nil {
return err
}

// do restart only when it is pending
if !memberData.PendingRestart {
return nil
}
return p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf)
Expand All @@ -243,19 +257,13 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {
if err != nil {
return MemberData{}, err
}
response, err := p.httpClient.Get(apiURLString)
if err != nil {
return MemberData{}, fmt.Errorf("could not perform Get request: %v", err)
}
defer response.Body.Close()

body, err := ioutil.ReadAll(response.Body)
body, err := p.httpGet(apiURLString + statusPath)
if err != nil {
return MemberData{}, fmt.Errorf("could not read response: %v", err)
return MemberData{}, err
}

data := MemberData{}
err = json.Unmarshal(body, &data)
err = json.Unmarshal([]byte(body), &data)
if err != nil {
return MemberData{}, err
}
Expand Down
Loading