diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 9937add88..c3ad1c999 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -252,6 +252,13 @@ def exec_with_kubectl(self, pod, cmd): stdout=subprocess.PIPE, stderr=subprocess.PIPE) + def patroni_rest(self, pod, path): + r = self.exec_with_kubectl(pod, "curl localhost:8008/" + path) + if not r.returncode == 0 or not r.stdout.decode()[0:1] == "{": + return None + + return json.loads(r.stdout.decode()) + def get_patroni_state(self, pod): r = self.exec_with_kubectl(pod, "patronictl list -f json") if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": @@ -514,6 +521,13 @@ def exec_with_kubectl(self, pod, cmd): stdout=subprocess.PIPE, stderr=subprocess.PIPE) + def patroni_rest(self, pod, path): + r = self.exec_with_kubectl(pod, "curl localhost:8008/" + path) + if not r.returncode == 0 or not r.stdout.decode()[0:1] == "{": + return None + + return json.loads(r.stdout.decode()) + def get_patroni_state(self, pod): r = self.exec_with_kubectl(pod, "patronictl list -f json") if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 08d2864d2..6a4bf78ca 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -324,65 +324,6 @@ def test_cross_namespace_secrets(self): self.eventuallyEqual(lambda: self.k8s.count_secrets_with_label("cluster-name=acid-minimal-cluster,application=spilo", self.test_namespace), 1, "Secret not created for user in namespace") - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_decrease_max_connections(self): - ''' - Test decreasing max_connections and restarting cluster through rest api - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - labels = 'spilo-role=master,' + cluster_label - new_max_connections_value = "99" - pods = k8s.api.core_v1.list_namespaced_pod( - 'default', label_selector=labels).items - self.assert_master_is_unique() - masterPod = pods[0] - creationTimestamp = masterPod.metadata.creation_timestamp - - # adjust max_connection - pg_patch_max_connections = { - "spec": { - "postgresql": { - "parameters": { - "max_connections": new_max_connections_value - } - } - } - } - - try: - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) - - def get_max_connections(): - pods = k8s.api.core_v1.list_namespaced_pod( - 'default', label_selector=labels).items - self.assert_master_is_unique() - masterPod = pods[0] - get_max_connections_cmd = '''psql -At -U postgres -c "SELECT setting FROM pg_settings WHERE name = 'max_connections';"''' - result = k8s.exec_with_kubectl(masterPod.metadata.name, get_max_connections_cmd) - max_connections_value = int(result.stdout) - return max_connections_value - - #Make sure that max_connections decreased - self.eventuallyEqual(get_max_connections, int(new_max_connections_value), "max_connections didn't decrease") - pods = k8s.api.core_v1.list_namespaced_pod( - 'default', label_selector=labels).items - self.assert_master_is_unique() - masterPod = pods[0] - #Make sure that pod didn't restart - self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp, - "Master pod creation timestamp is updated") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # make sure cluster is in a good state for further tests - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, - "No 2 pods running") - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_enable_disable_connection_pooler(self): ''' @@ -1114,6 +1055,88 @@ def test_overwrite_pooler_deployment(self): self.eventuallyEqual(lambda: self.k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), 0, "Pooler pods not scaled down") + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_patroni_config_update(self): + ''' + Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni + and query Patroni config endpoint to check if manifest changes got applied + via restarting cluster through Patroni's rest api + ''' + k8s = self.k8s + masterPod = k8s.get_cluster_leader_pod() + labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master' + creationTimestamp = masterPod.metadata.creation_timestamp + new_max_connections_value = "50" + + # adjust max_connection + pg_patch_config = { + "spec": { + "postgresql": { + "parameters": { + "max_connections": new_max_connections_value + } + }, + "patroni": { + "slots": { + "test_slot": { + "type": "physical" + } + }, + "ttl": 29, + "loop_wait": 9, + "retry_timeout": 9, + "synchronous_mode": True + } + } + } + + try: + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) + + self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + def compare_config(): + effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") + desired_patroni = pg_patch_config["spec"]["patroni"] + desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] + effective_parameters = effective_config["postgresql"]["parameters"] + self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], + "max_connections not updated") + self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") + self.assertEqual(desired_patroni["ttl"], effective_config["ttl"], + "ttl not updated") + self.assertEqual(desired_patroni["loop_wait"], effective_config["loop_wait"], + "loop_wait not updated") + self.assertEqual(desired_patroni["retry_timeout"], effective_config["retry_timeout"], + "retry_timeout not updated") + self.assertEqual(desired_patroni["synchronous_mode"], effective_config["synchronous_mode"], + "synchronous_mode not updated") + return True + + self.eventuallyTrue(compare_config, "Postgres config not applied") + + setting_query = """ + SELECT setting + FROM pg_settings + WHERE name = 'max_connections'; + """ + self.eventuallyEqual(lambda: self.query_database(masterPod.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "New max_connections setting not applied", 10, 5) + + # make sure that pod wasn't recreated + self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp, + "Master pod creation timestamp is updated") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # make sure cluster is in a good state for further tests + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No 2 pods running") + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_rolling_update_flag(self): ''' diff --git a/pkg/apis/acid.zalan.do/v1/marshal.go b/pkg/apis/acid.zalan.do/v1/marshal.go index 9521082fc..f4167ce92 100644 --- a/pkg/apis/acid.zalan.do/v1/marshal.go +++ b/pkg/apis/acid.zalan.do/v1/marshal.go @@ -81,7 +81,7 @@ func (ps *PostgresStatus) UnmarshalJSON(data []byte) error { if err != nil { metaErr := json.Unmarshal(data, &status) if metaErr != nil { - return fmt.Errorf("Could not parse status: %v; err %v", string(data), metaErr) + return fmt.Errorf("could not parse status: %v; err %v", string(data), metaErr) } tmp.PostgresClusterStatus = status } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 85d87b35a..4937a2034 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -2,7 +2,9 @@ package cluster import ( "context" + "encoding/json" "fmt" + "reflect" "regexp" "strings" "time" @@ -261,14 +263,18 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { } func (c *Cluster) syncStatefulSet() error { - var instancesRestartRequired bool + var ( + masterPod *v1.Pod + postgresConfig map[string]interface{} + instanceRestartRequired bool + ) podsToRecreate := make([]v1.Pod, 0) switchoverCandidates := make([]spec.NamespacedName, 0) pods, err := c.listPods() if err != nil { - c.logger.Infof("could not list pods of the statefulset: %v", err) + c.logger.Warnf("could not list pods of the statefulset: %v", err) } // NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early. @@ -381,20 +387,50 @@ func (c *Cluster) syncStatefulSet() error { // Apply special PostgreSQL parameters that can only be set via the Patroni API. // it is important to do it after the statefulset pods are there, but before the rolling update // since those parameters require PostgreSQL restart. - instancesRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration() + pods, err = c.listPods() if err != nil { - return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err) + c.logger.Warnf("could not get list of pods to apply special PostgreSQL parameters only to be set via Patroni API: %v", err) } - if instancesRestartRequired { - c.logger.Debugln("restarting Postgres server within pods") - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "restarting Postgres server within pods") - if err := c.restartInstances(); err != nil { - c.logger.Warningf("could not restart Postgres server within pods: %v", err) + // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs + // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used. + for i, pod := range pods { + podName := util.NameFromMeta(pods[i].ObjectMeta) + config, err := c.patroni.GetConfig(&pod) + if err != nil { + c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) + continue + } + instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config) + if err != nil { + c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) + continue } - c.logger.Infof("Postgres server successfuly restarted on all pods") - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Postgres server restart done - all instances have been restarted") + break } + + // if the config update requires a restart, call Patroni restart for replicas first, then master + if instanceRestartRequired { + c.logger.Debug("restarting Postgres server within pods") + ttl, ok := postgresConfig["ttl"].(int32) + if !ok { + ttl = 30 + } + for i, pod := range pods { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + if role == Master { + masterPod = &pods[i] + continue + } + c.restartInstance(&pod) + time.Sleep(time.Duration(ttl) * time.Second) + } + + if masterPod != nil { + c.restartInstance(masterPod) + } + } + // if we get here we also need to re-create the pods (either leftovers from the old // statefulset or those that got their configuration from the outdated statefulset) if len(podsToRecreate) > 0 { @@ -408,55 +444,19 @@ func (c *Cluster) syncStatefulSet() error { return nil } -func (c *Cluster) restartInstances() error { - c.setProcessName("starting to restart Postgres servers") - ls := c.labelsSet(false) - namespace := c.Namespace +func (c *Cluster) restartInstance(pod *v1.Pod) { + podName := util.NameFromMeta(pod.ObjectMeta) + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - listOptions := metav1.ListOptions{ - LabelSelector: ls.String(), - } + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) - pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions) - if err != nil { - return fmt.Errorf("could not get the list of pods: %v", err) + if err := c.patroni.Restart(pod); err != nil { + c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) + return } - c.logger.Infof("there are %d pods in the cluster which resquire Postgres server restart", len(pods.Items)) - - var ( - masterPod *v1.Pod - ) - for i, pod := range pods.Items { - role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - if role == Master { - masterPod = &pods.Items[i] - continue - } - - podName := util.NameFromMeta(pods.Items[i].ObjectMeta) - config, err := c.patroni.GetConfig(&pod) - if err != nil { - return fmt.Errorf("could not get config for pod %s: %v", podName, err) - } - ttl, ok := config["ttl"].(int32) - if !ok { - ttl = 30 - } - if err = c.patroni.Restart(&pod); err != nil { - return fmt.Errorf("could not restart Postgres server on pod %s: %v", podName, err) - } - time.Sleep(time.Duration(ttl) * time.Second) - } - - if masterPod != nil { - podName := util.NameFromMeta(masterPod.ObjectMeta) - if err = c.patroni.Restart(masterPod); err != nil { - return fmt.Errorf("could not restart postgres server on masterPod %s: %v", podName, err) - } - } - - return nil + c.logger.Debugf("Postgres server successfuly restarted in %s pod %s", role, podName) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) } // AnnotationsToPropagate get the annotations to update if required @@ -492,48 +492,77 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri } // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters -// (like max_connections) has changed and if necessary sets it via the Patroni API -func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() (bool, error) { - var ( - err error - pods []v1.Pod - restartRequired bool - ) +// (like max_connections) have changed and if necessary sets it via the Patroni API +func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) { + configToSet := make(map[string]interface{}) + parametersToSet := make(map[string]string) + effectivePgParameters := make(map[string]interface{}) - // we need to extract those options from the cluster manifest. - optionsToSet := make(map[string]string) - pgOptions := c.Spec.Parameters + // read effective Patroni config if set + if patroniConfig != nil { + effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{}) + effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{}) + } - for k, v := range pgOptions { - if isBootstrapOnlyParameter(k) { - optionsToSet[k] = v + // compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest + desiredPgParameters := c.Spec.Parameters + for desiredOption, desiredValue := range desiredPgParameters { + effectiveValue := effectivePgParameters[desiredOption] + if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { + parametersToSet[desiredOption] = desiredValue } } - if len(optionsToSet) == 0 { - return restartRequired, nil + if len(parametersToSet) > 0 { + configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet} } - if pods, err = c.listPods(); err != nil { - return restartRequired, err + // compare other options from config with c.Spec.Patroni from manifest + desiredPatroniConfig := c.Spec.Patroni + if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) { + configToSet["loop_wait"] = desiredPatroniConfig.LoopWait } - if len(pods) == 0 { - return restartRequired, fmt.Errorf("could not call Patroni API: cluster has no pods") + if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) { + configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover } + if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) { + configToSet["pg_hba"] = desiredPatroniConfig.PgHba + } + if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) { + configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout + } + if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) { + configToSet["slots"] = desiredPatroniConfig.Slots + } + if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] { + configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode + } + if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] { + configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict + } + if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) { + configToSet["ttl"] = desiredPatroniConfig.TTL + } + + if len(configToSet) == 0 { + return false, nil + } + + configToSetJson, err := json.Marshal(configToSet) + if err != nil { + c.logger.Debugf("could not convert config patch to JSON: %v", err) + } + // try all pods until the first one that is successful, as it doesn't matter which pod // carries the request to change configuration through - for _, pod := range pods { - podName := util.NameFromMeta(pod.ObjectMeta) - c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v", - podName, optionsToSet) - if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil { - restartRequired = true - return restartRequired, nil - } - c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err) - } - return restartRequired, fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", - len(pods)) + podName := util.NameFromMeta(pod.ObjectMeta) + c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", + podName, configToSetJson) + if err = c.patroni.SetConfig(pod, configToSet); err != nil { + return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) + } + + return true, nil } func (c *Cluster) syncSecrets() error { diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 1f2c95552..a9cadafba 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -32,6 +32,7 @@ type Interface interface { GetMemberData(server *v1.Pod) (MemberData, error) Restart(server *v1.Pod) error GetConfig(server *v1.Pod) (map[string]interface{}, error) + SetConfig(server *v1.Pod, config map[string]interface{}) error } // Patroni API client @@ -163,6 +164,20 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } +//SetConfig sets Patroni options via Patroni patch API call. +func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error { + buf := &bytes.Buffer{} + err := json.NewEncoder(buf).Encode(config) + if err != nil { + return fmt.Errorf("could not encode json: %v", err) + } + apiURLString, err := apiURL(server) + if err != nil { + return err + } + return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) +} + // MemberDataPatroni child element type MemberDataPatroni struct { Version string `json:"version"`