diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 495d589f4..6564cd08e 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -1099,19 +1099,19 @@ def test_patroni_config_update(self): def compare_config(): effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") - desired_patroni = pg_patch_config["spec"]["patroni"] + desired_config = pg_patch_config["spec"]["patroni"] desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] effective_parameters = effective_config["postgresql"]["parameters"] self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], "max_connections not updated") self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") - self.assertEqual(desired_patroni["ttl"], effective_config["ttl"], + self.assertEqual(desired_config["ttl"], effective_config["ttl"], "ttl not updated") - self.assertEqual(desired_patroni["loop_wait"], effective_config["loop_wait"], + self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], "loop_wait not updated") - self.assertEqual(desired_patroni["retry_timeout"], effective_config["retry_timeout"], + self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], "retry_timeout not updated") - self.assertEqual(desired_patroni["synchronous_mode"], effective_config["synchronous_mode"], + self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], "synchronous_mode not updated") return True diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 70c02c26a..366570c3c 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -30,13 +30,12 @@ import ( ) const ( - pgBinariesLocationTemplate = "/usr/lib/postgresql/%v/bin" - patroniPGBinariesParameterName = "bin_dir" - patroniPGParametersParameterName = "parameters" - patroniPGHBAConfParameterName = "pg_hba" - localHost = "127.0.0.1/32" - connectionPoolerContainer = "connection-pooler" - pgPort = 5432 + pgBinariesLocationTemplate = "/usr/lib/postgresql/%v/bin" + patroniPGBinariesParameterName = "bin_dir" + patroniPGHBAConfParameterName = "pg_hba" + localHost = "127.0.0.1/32" + connectionPoolerContainer = "connection-pooler" + pgPort = 5432 ) type pgUser struct { @@ -277,11 +276,11 @@ PatroniInitDBParams: local, bootstrap := getLocalAndBoostrapPostgreSQLParameters(pg.Parameters) if len(local) > 0 { - config.PgLocalConfiguration[patroniPGParametersParameterName] = local + config.PgLocalConfiguration[constants.PatroniPGParametersParameterName] = local } if len(bootstrap) > 0 { config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{}) - config.Bootstrap.DCS.PGBootstrapConfiguration[patroniPGParametersParameterName] = bootstrap + config.Bootstrap.DCS.PGBootstrapConfiguration[constants.PatroniPGParametersParameterName] = bootstrap } } // Patroni gives us a choice of writing pg_hba.conf to either the bootstrap section or to the local postgresql one. diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5fa93bdd2..c763d4bdb 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -395,18 +395,24 @@ func (c *Cluster) syncStatefulSet() error { // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used. for i, pod := range pods { + emptyPatroniConfig := acidv1.Patroni{} podName := util.NameFromMeta(pods[i].ObjectMeta) - config, err := c.patroni.GetConfig(&pod) + patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) if err != nil { c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) continue } - instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config) - if err != nil { - c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) - continue + + // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup + // do not attempt a restart + if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { + instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) + if err != nil { + c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) + continue + } + break } - break } // if the config update requires a restart, call Patroni restart for replicas first, then master @@ -493,16 +499,9 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters // (like max_connections) have changed and if necessary sets it via the Patroni API -func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) { +func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { configToSet := make(map[string]interface{}) parametersToSet := make(map[string]string) - effectivePgParameters := make(map[string]interface{}) - - // read effective Patroni config if set - if patroniConfig != nil { - effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{}) - effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{}) - } // compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest desiredPgParameters := c.Spec.Parameters @@ -514,36 +513,47 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC } if len(parametersToSet) > 0 { - configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet} + configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} } // compare other options from config with c.Spec.Patroni from manifest desiredPatroniConfig := c.Spec.Patroni - if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) { + if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { configToSet["loop_wait"] = desiredPatroniConfig.LoopWait } - if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) { + if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != patroniConfig.MaximumLagOnFailover { configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover } - if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) { + if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, patroniConfig.PgHba) { configToSet["pg_hba"] = desiredPatroniConfig.PgHba } - if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) { + if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != patroniConfig.RetryTimeout { configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout } - if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) { - configToSet["slots"] = desiredPatroniConfig.Slots - } - if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] { + if desiredPatroniConfig.SynchronousMode != patroniConfig.SynchronousMode { configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode } - if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] { + if desiredPatroniConfig.SynchronousModeStrict != patroniConfig.SynchronousModeStrict { configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict } - if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) { + if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != patroniConfig.TTL { configToSet["ttl"] = desiredPatroniConfig.TTL } + // check if specified slots exist in config and if they differ + slotsToSet := make(map[string]map[string]string) + for slotName, desiredSlot := range desiredPatroniConfig.Slots { + if effectiveSlot, exists := patroniConfig.Slots[slotName]; exists { + if reflect.DeepEqual(desiredSlot, effectiveSlot) { + continue + } + } + slotsToSet[slotName] = desiredSlot + } + if len(slotsToSet) > 0 { + configToSet["slots"] = slotsToSet + } + if len(configToSet) == 0 { return false, nil } diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go index e39fd423f..41bfdd66e 100644 --- a/pkg/util/constants/postgresql.go +++ b/pkg/util/constants/postgresql.go @@ -8,6 +8,8 @@ const ( PostgresDataMount = "/home/postgres/pgdata" PostgresDataPath = PostgresDataMount + "/pgroot" + PatroniPGParametersParameterName = "parameters" + PostgresConnectRetryTimeout = 2 * time.Minute PostgresConnectTimeout = 15 * time.Second diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index a9cadafba..6adc0bfbc 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -10,9 +10,11 @@ import ( "strconv" "time" + "github.com/zalando/postgres-operator/pkg/util/constants" httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient" "github.com/sirupsen/logrus" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" v1 "k8s.io/api/core/v1" ) @@ -31,7 +33,7 @@ type Interface interface { SetPostgresParameters(server *v1.Pod, options map[string]string) error GetMemberData(server *v1.Pod) (MemberData, error) Restart(server *v1.Pod) error - GetConfig(server *v1.Pod) (map[string]interface{}, error) + GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) SetConfig(server *v1.Pod, config map[string]interface{}) error } @@ -109,28 +111,23 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) } func (p *Patroni) httpGet(url string) (string, error) { - request, err := http.NewRequest("GET", url, nil) - if err != nil { - return "", fmt.Errorf("could not create request: %v", err) - } + p.logger.Debugf("making GET http request: %s", url) - p.logger.Debugf("making GET http request: %s", request.URL.String()) - - resp, err := p.httpClient.Do(request) + response, err := p.httpClient.Get(url) if err != nil { return "", fmt.Errorf("could not make request: %v", err) } - bodyBytes, err := ioutil.ReadAll(resp.Body) + defer response.Body.Close() + + bodyBytes, err := ioutil.ReadAll(response.Body) if err != nil { return "", fmt.Errorf("could not read response: %v", err) } - if err := resp.Body.Close(); err != nil { - return "", fmt.Errorf("could not close request: %v", err) - } - if resp.StatusCode != http.StatusOK { - return string(bodyBytes), fmt.Errorf("patroni returned '%d'", resp.StatusCode) + if response.StatusCode != http.StatusOK { + return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode) } + return string(bodyBytes), nil } @@ -194,30 +191,43 @@ type MemberData struct { Patroni MemberDataPatroni `json:"patroni"` } -func (p *Patroni) GetConfigOrStatus(server *v1.Pod, path string) (map[string]interface{}, error) { - result := make(map[string]interface{}) +func (p *Patroni) GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) { + var ( + patroniConfig acidv1.Patroni + pgConfig map[string]interface{} + ) apiURLString, err := apiURL(server) if err != nil { - return result, err + return patroniConfig, nil, err } - body, err := p.httpGet(apiURLString + path) - err = json.Unmarshal([]byte(body), &result) + body, err := p.httpGet(apiURLString + configPath) if err != nil { - return result, err + return patroniConfig, nil, err + } + err = json.Unmarshal([]byte(body), &patroniConfig) + if err != nil { + return patroniConfig, nil, err } - return result, err -} - -func (p *Patroni) GetStatus(server *v1.Pod) (map[string]interface{}, error) { - return p.GetConfigOrStatus(server, statusPath) -} + // unmarshalling postgresql parameters needs a detour + err = json.Unmarshal([]byte(body), &pgConfig) + if err != nil { + return patroniConfig, nil, err + } + pgParameters := make(map[string]string) + if _, exists := pgConfig["postgresql"]; exists { + effectivePostgresql := pgConfig["postgresql"].(map[string]interface{}) + effectivePgParameters := effectivePostgresql[constants.PatroniPGParametersParameterName].(map[string]interface{}) + for parameter, value := range effectivePgParameters { + strValue := fmt.Sprintf("%v", value) + pgParameters[parameter] = strValue + } + } -func (p *Patroni) GetConfig(server *v1.Pod) (map[string]interface{}, error) { - return p.GetConfigOrStatus(server, configPath) + return patroniConfig, pgParameters, err } -//Restart method restarts instance via Patroni POST API call. +// Restart method restarts instance via Patroni POST API call. func (p *Patroni) Restart(server *v1.Pod) error { buf := &bytes.Buffer{} err := json.NewEncoder(buf).Encode(map[string]interface{}{"restart_pending": true}) @@ -228,9 +238,13 @@ func (p *Patroni) Restart(server *v1.Pod) error { if err != nil { return err } - status, err := p.GetStatus(server) - pending_restart, ok := status["pending_restart"] - if !ok || !pending_restart.(bool) { + memberData, err := p.GetMemberData(server) + if err != nil { + return err + } + + // do restart only when it is pending + if !memberData.PendingRestart { return nil } return p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf) @@ -243,19 +257,13 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) { if err != nil { return MemberData{}, err } - response, err := p.httpClient.Get(apiURLString) - if err != nil { - return MemberData{}, fmt.Errorf("could not perform Get request: %v", err) - } - defer response.Body.Close() - - body, err := ioutil.ReadAll(response.Body) + body, err := p.httpGet(apiURLString + statusPath) if err != nil { - return MemberData{}, fmt.Errorf("could not read response: %v", err) + return MemberData{}, err } data := MemberData{} - err = json.Unmarshal(body, &data) + err = json.Unmarshal([]byte(body), &data) if err != nil { return MemberData{}, err } diff --git a/pkg/util/patroni/patroni_test.go b/pkg/util/patroni/patroni_test.go index 939270453..5a6b2657c 100644 --- a/pkg/util/patroni/patroni_test.go +++ b/pkg/util/patroni/patroni_test.go @@ -6,14 +6,19 @@ import ( "fmt" "io/ioutil" "net/http" + "reflect" "testing" "github.com/golang/mock/gomock" + "github.com/sirupsen/logrus" "github.com/zalando/postgres-operator/mocks" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" v1 "k8s.io/api/core/v1" ) +var logger = logrus.New().WithField("test", "patroni") + func newMockPod(ip string) *v1.Pod { return &v1.Pod{ Status: v1.PodStatus{ @@ -80,31 +85,141 @@ func TestApiURL(t *testing.T) { } } -func TestPatroniAPI(t *testing.T) { +func TestGetMemberData(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 90621, "cluster_unlocked": false, "xlog": {"location": 55978296057856}, "timeline": 6, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.0.1", "scope": "acid-rest92-standby"}}` + expectedMemberData := MemberData{ + State: "running", + Role: "master", + ServerVersion: 130004, + PendingRestart: true, + Patroni: MemberDataPatroni{ + Version: "2.1.1", + Scope: "acid-test-cluster", + }, + } + + json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 130004, "cluster_unlocked": false, "xlog": {"location": 123456789}, "timeline": 1, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.1.1", "scope": "acid-test-cluster"}}` r := ioutil.NopCloser(bytes.NewReader([]byte(json))) response := http.Response{ - Status: "200", - Body: r, + StatusCode: 200, + Body: r, } mockClient := mocks.NewMockHTTPClient(ctrl) mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) - p := New(nil, mockClient) + p := New(logger, mockClient) - pod := v1.Pod{ - Status: v1.PodStatus{ - PodIP: "192.168.100.1", - }, + memberData, err := p.GetMemberData(newMockPod("192.168.100.1")) + + if !reflect.DeepEqual(expectedMemberData, memberData) { + t.Errorf("Patroni member data differs: expected: %#v, got: %#v", expectedMemberData, memberData) } - _, err := p.GetMemberData(&pod) if err != nil { t.Errorf("Could not read Patroni data: %v", err) } } + +func TestGetConfig(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + expectedPatroniConfig := acidv1.Patroni{ + TTL: 30, + LoopWait: 10, + RetryTimeout: 10, + MaximumLagOnFailover: 33554432, + Slots: map[string]map[string]string{ + "cdc": { + "database": "foo", + "plugin": "wal2json", + "type": "logical", + }, + }, + } + + expectedPgParameters := map[string]string{ + "archive_mode": "on", + "archive_timeout": "1800s", + "autovacuum_analyze_scale_factor": "0.02", + "autovacuum_max_workers": "5", + "autovacuum_vacuum_scale_factor": "0.05", + "checkpoint_completion_target": "0.9", + "hot_standby": "on", + "log_autovacuum_min_duration": "0", + "log_checkpoints": "on", + "log_connections": "on", + "log_disconnections": "on", + "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", + "log_lock_waits": "on", + "log_min_duration_statement": "500", + "log_statement": "ddl", + "log_temp_files": "0", + "max_connections": "100", + "max_replication_slots": "10", + "max_wal_senders": "10", + "tcp_keepalives_idle": "900", + "tcp_keepalives_interval": "100", + "track_functions": "all", + "wal_level": "hot_standby", + "wal_log_hints": "on", + } + + configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` + r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) + + response := http.Response{ + StatusCode: 200, + Body: r, + } + + mockClient := mocks.NewMockHTTPClient(ctrl) + mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) + + p := New(logger, mockClient) + + patroniConfig, pgParameters, err := p.GetConfig(newMockPod("192.168.100.1")) + if err != nil { + t.Errorf("Could not read Patroni config endpoint: %v", err) + } + + if !reflect.DeepEqual(expectedPatroniConfig, patroniConfig) { + t.Errorf("Patroni config differs: expected: %#v, got: %#v", expectedPatroniConfig, patroniConfig) + } + if !reflect.DeepEqual(expectedPgParameters, pgParameters) { + t.Errorf("Postgre parameters differ: expected: %#v, got: %#v", expectedPgParameters, pgParameters) + } +} + +func TestSetPostgresParameters(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + parametersToSet := map[string]string{ + "max_connections": "50", + "wal_level": "logical", + } + + configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` + r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) + + response := http.Response{ + StatusCode: 200, + Body: r, + } + + mockClient := mocks.NewMockHTTPClient(ctrl) + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil) + + p := New(logger, mockClient) + + err := p.SetPostgresParameters(newMockPod("192.168.100.1"), parametersToSet) + if err != nil { + t.Errorf("could not call patch Patroni config: %v", err) + } + +}