diff --git a/docker/logical-backup/Dockerfile b/docker/logical-backup/Dockerfile index 62bd5ce8c..5c1ee6e39 100644 --- a/docker/logical-backup/Dockerfile +++ b/docker/logical-backup/Dockerfile @@ -23,6 +23,7 @@ RUN apt-get update \ && curl --silent https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - \ && apt-get update \ && apt-get install --no-install-recommends -y \ + postgresql-client-14 \ postgresql-client-13 \ postgresql-client-12 \ postgresql-client-11 \ diff --git a/e2e/Dockerfile b/e2e/Dockerfile index 3eb8c9d70..eabd0dabe 100644 --- a/e2e/Dockerfile +++ b/e2e/Dockerfile @@ -16,7 +16,7 @@ RUN apt-get update \ curl \ vim \ && pip3 install --no-cache-dir -r requirements.txt \ - && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \ + && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.22.0/bin/linux/amd64/kubectl \ && chmod +x ./kubectl \ && mv ./kubectl /usr/local/bin/kubectl \ && apt-get clean \ diff --git a/e2e/run.sh b/e2e/run.sh index 2d5708778..39d85f072 100755 --- a/e2e/run.sh +++ b/e2e/run.sh @@ -8,7 +8,7 @@ IFS=$'\n\t' readonly cluster_name="postgres-operator-e2e-tests" readonly kubeconfig_path="/tmp/kind-config-${cluster_name}" -readonly spilo_image="registry.opensource.zalan.do/acid/spilo-13-e2e:0.3" +readonly spilo_image="registry.opensource.zalan.do/acid/spilo-14-e2e:0.1" readonly e2e_test_runner_image="registry.opensource.zalan.do/acid/postgres-operator-e2e-tests-runner:0.3" export GOPATH=${GOPATH-~/go} diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 87bbf02a2..e28e3e087 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -11,8 +11,8 @@ from tests.k8s_api import K8s from kubernetes.client.rest import ApiException -SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-13-e2e:0.3" -SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-13-e2e:0.4" +SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-14-e2e:0.1" +SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-14-e2e:0.2" def to_selector(labels): @@ -85,6 +85,7 @@ def setUpClass(cls): # set a single K8s wrapper for all tests k8s = cls.k8s = K8s() + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # remove existing local storage class and create hostpath class try: @@ -150,8 +151,8 @@ def setUpClass(cls): result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr)) try: - k8s.wait_for_pod_start('spilo-role=master') - k8s.wait_for_pod_start('spilo-role=replica') + k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) raise @@ -1064,12 +1065,13 @@ def test_patroni_config_update(self): via restarting cluster through Patroni's rest api ''' k8s = self.k8s - masterPod = k8s.get_cluster_leader_pod() - labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master' - creationTimestamp = masterPod.metadata.creation_timestamp + leader = k8s.get_cluster_leader_pod() + replica = k8s.get_cluster_replica_pod() + masterCreationTimestamp = leader.metadata.creation_timestamp + replicaCreationTimestamp = replica.metadata.creation_timestamp new_max_connections_value = "50" - # adjust max_connection + # adjust Postgres config pg_patch_config = { "spec": { "postgresql": { @@ -1098,7 +1100,7 @@ def test_patroni_config_update(self): self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") def compare_config(): - effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") + effective_config = k8s.patroni_rest(leader.metadata.name, "config") desired_config = pg_patch_config["spec"]["patroni"] desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] effective_parameters = effective_config["postgresql"]["parameters"] @@ -1115,19 +1117,63 @@ def compare_config(): "synchronous_mode not updated") return True + # check if Patroni config has been updated self.eventuallyTrue(compare_config, "Postgres config not applied") + # make sure that pods were not recreated + leader = k8s.get_cluster_leader_pod() + replica = k8s.get_cluster_replica_pod() + self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp, + "Master pod creation timestamp is updated") + self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp, + "Master pod creation timestamp is updated") + + # query max_connections setting setting_query = """ SELECT setting FROM pg_settings WHERE name = 'max_connections'; """ - self.eventuallyEqual(lambda: self.query_database(masterPod.metadata.name, "postgres", setting_query)[0], new_max_connections_value, - "New max_connections setting not applied", 10, 5) + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "New max_connections setting not applied on master", 10, 5) + self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5) - # make sure that pod wasn't recreated - self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp, - "Master pod creation timestamp is updated") + # the next sync should restart the replica because it has pending_restart flag set + # force next sync by deleting the operator pod + k8s.delete_operator_pod() + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value, + "New max_connections setting not applied on replica", 10, 5) + + # decrease max_connections again + # this time restart will be correct and new value should appear on both instances + lower_max_connections_value = "30" + pg_patch_max_connections = { + "spec": { + "postgresql": { + "parameters": { + "max_connections": lower_max_connections_value + } + } + } + } + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check Patroni config again + pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value + self.eventuallyTrue(compare_config, "Postgres config not applied") + + # and query max_connections setting again + self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, + "Previous max_connections setting not applied on master", 10, 5) + self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value, + "Previous max_connections setting not applied on replica", 10, 5) except timeout_decorator.TimeoutError: print('Operator log: {}'.format(k8s.get_operator_log())) @@ -1554,6 +1600,7 @@ def assert_distributed_pods(self, master_node, replica_nodes, cluster_label): Toggle pod anti affinty to distribute pods accross nodes (replica in particular). ''' k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' failover_targets = self.get_failover_targets(master_node, replica_nodes) # enable pod anti affintiy in config map which should trigger movement of replica @@ -1572,8 +1619,8 @@ def assert_distributed_pods(self, master_node, replica_nodes, cluster_label): } } k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity") - k8s.wait_for_pod_start('spilo-role=master') - k8s.wait_for_pod_start('spilo-role=replica') + k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) return True def list_databases(self, pod_name): diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index c763d4bdb..2df168a6e 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "regexp" + "strconv" "strings" "time" @@ -20,6 +21,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var requireMasterRestartWhenDecreased = []string{ + "max_connections", + "max_prepared_transactions", + "max_locks_per_transaction", + "max_worker_processes", + "max_wal_senders", +} + // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { @@ -264,11 +273,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncStatefulSet() error { var ( - masterPod *v1.Pod - postgresConfig map[string]interface{} - instanceRestartRequired bool + restartWait uint32 + restartMasterFirst bool ) - podsToRecreate := make([]v1.Pod, 0) switchoverCandidates := make([]spec.NamespacedName, 0) @@ -402,38 +409,41 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) continue } + restartWait = patroniConfig.LoopWait // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup // do not attempt a restart if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { - instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) + restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) if err != nil { c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) continue } + // it could take up to LoopWait to apply the config + time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2) break } } - // if the config update requires a restart, call Patroni restart for replicas first, then master - if instanceRestartRequired { - c.logger.Debug("restarting Postgres server within pods") - ttl, ok := postgresConfig["ttl"].(int32) - if !ok { - ttl = 30 - } - for i, pod := range pods { - role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - if role == Master { - masterPod = &pods[i] - continue - } - c.restartInstance(&pod) - time.Sleep(time.Duration(ttl) * time.Second) + // restart instances if required + remainingPods := make([]*v1.Pod, 0) + skipRole := Master + if restartMasterFirst { + skipRole = Replica + } + for i, pod := range pods { + role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) + if role == skipRole { + remainingPods = append(remainingPods, &pods[i]) + continue } + c.restartInstance(&pod, restartWait) + } - if masterPod != nil { - c.restartInstance(masterPod) + // in most cases only the master should be left to restart + if len(remainingPods) > 0 { + for _, remainingPod := range remainingPods { + c.restartInstance(remainingPod, restartWait) } } @@ -450,19 +460,27 @@ func (c *Cluster) syncStatefulSet() error { return nil } -func (c *Cluster) restartInstance(pod *v1.Pod) { +func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) { podName := util.NameFromMeta(pod.ObjectMeta) role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) - - if err := c.patroni.Restart(pod); err != nil { - c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) + // if the config update requires a restart, call Patroni restart + memberData, err := c.patroni.GetMemberData(pod) + if err != nil { + c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) return } - c.logger.Debugf("Postgres server successfuly restarted in %s pod %s", role, podName) - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) + // do restart only when it is pending + if memberData.PendingRestart { + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) + if err := c.patroni.Restart(pod); err != nil { + c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) + return + } + time.Sleep(time.Duration(restartWait) * time.Second) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) + } } // AnnotationsToPropagate get the annotations to update if required @@ -502,21 +520,10 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { configToSet := make(map[string]interface{}) parametersToSet := make(map[string]string) + restartMaster := make([]bool, 0) + requiresMasterRestart := false - // compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest - desiredPgParameters := c.Spec.Parameters - for desiredOption, desiredValue := range desiredPgParameters { - effectiveValue := effectivePgParameters[desiredOption] - if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { - parametersToSet[desiredOption] = desiredValue - } - } - - if len(parametersToSet) > 0 { - configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} - } - - // compare other options from config with c.Spec.Patroni from manifest + // compare options from config with c.Spec.Patroni from manifest desiredPatroniConfig := c.Spec.Patroni if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { configToSet["loop_wait"] = desiredPatroniConfig.LoopWait @@ -554,6 +561,35 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC configToSet["slots"] = slotsToSet } + // compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest + desiredPgParameters := c.Spec.Parameters + for desiredOption, desiredValue := range desiredPgParameters { + effectiveValue := effectivePgParameters[desiredOption] + if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { + parametersToSet[desiredOption] = desiredValue + if util.SliceContains(requireMasterRestartWhenDecreased, desiredOption) { + effectiveValueNum, errConv := strconv.Atoi(effectiveValue) + desiredValueNum, errConv2 := strconv.Atoi(desiredValue) + if errConv != nil || errConv2 != nil { + continue + } + if effectiveValueNum > desiredValueNum { + restartMaster = append(restartMaster, true) + continue + } + } + restartMaster = append(restartMaster, false) + } + } + + if !util.SliceContains(restartMaster, false) && len(configToSet) == 0 { + requiresMasterRestart = true + } + + if len(parametersToSet) > 0 { + configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} + } + if len(configToSet) == 0 { return false, nil } @@ -569,10 +605,10 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", podName, configToSetJson) if err = c.patroni.SetConfig(pod, configToSet); err != nil { - return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) + return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) } - return true, nil + return requiresMasterRestart, nil } func (c *Cluster) syncSecrets() error { diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index e6f23914b..c6e2a8357 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -1,22 +1,40 @@ package cluster import ( + "bytes" + "io/ioutil" + "net/http" "testing" "time" "context" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/golang/mock/gomock" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/zalando/postgres-operator/mocks" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "github.com/zalando/postgres-operator/pkg/util/patroni" "k8s.io/client-go/kubernetes/fake" ) +var patroniLogger = logrus.New().WithField("test", "patroni") + +func newMockPod(ip string) *v1.Pod { + return &v1.Pod{ + Status: v1.PodStatus{ + PodIP: ip, + }, + } +} + func newFakeK8sSyncClient() (k8sutil.KubernetesClient, *fake.Clientset) { acidClientSet := fakeacidv1.NewSimpleClientset() clientSet := fake.NewSimpleClientset() @@ -113,3 +131,134 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) { t.Errorf("%s: inherited annotation not found in desired statefulset: %#v", testName, desiredSts.Annotations) } } + +func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { + testName := "test config comparison" + client, _ := newFakeK8sSyncClient() + clusterName := "acid-test-cluster" + namespace := "default" + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Patroni: acidv1.Patroni{ + TTL: 20, + }, + PostgresqlParam: acidv1.PostgresqlParam{ + Parameters: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "50", + }, + }, + Volume: acidv1.Volume{ + Size: "1Gi", + }, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + ResourceCheckInterval: time.Duration(3), + ResourceCheckTimeout: time.Duration(10), + }, + }, + }, client, pg, logger, eventRecorder) + + // mocking a config after setConfig is called + configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}` + r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) + + response := http.Response{ + StatusCode: 200, + Body: r, + } + + mockClient := mocks.NewMockHTTPClient(ctrl) + mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes() + + p := patroni.New(patroniLogger, mockClient) + cluster.patroni = p + mockPod := newMockPod("192.168.100.1") + + // simulate existing config that differs with cluster.Spec + tests := []struct { + subtest string + pod *v1.Pod + patroni acidv1.Patroni + pgParams map[string]string + restartMaster bool + }{ + { + subtest: "Patroni and Postgresql.Parameters differ - restart replica first", + pod: mockPod, + patroni: acidv1.Patroni{ + TTL: 30, // desired 20 + }, + pgParams: map[string]string{ + "log_min_duration_statement": "500", // desired 200 + "max_connections": "100", // desired 50 + }, + restartMaster: false, + }, + { + subtest: "multiple Postgresql.Parameters differ - restart replica first", + pod: mockPod, + patroni: acidv1.Patroni{ + TTL: 20, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "500", // desired 200 + "max_connections": "100", // desired 50 + }, + restartMaster: false, + }, + { + subtest: "desired max_connections bigger - restart replica first", + pod: mockPod, + patroni: acidv1.Patroni{ + TTL: 20, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "30", // desired 50 + }, + restartMaster: false, + }, + { + subtest: "desired max_connections smaller - restart master first", + pod: mockPod, + patroni: acidv1.Patroni{ + TTL: 20, + }, + pgParams: map[string]string{ + "log_min_duration_statement": "200", + "max_connections": "100", // desired 50 + }, + restartMaster: true, + }, + } + + for _, tt := range tests { + requireMasterRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(tt.pod, tt.patroni, tt.pgParams) + assert.NoError(t, err) + if requireMasterRestart != tt.restartMaster { + t.Errorf("%s - %s: unexpect master restart strategy, got %v, expected %v", testName, tt.subtest, requireMasterRestart, tt.restartMaster) + } + } +} diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 6adc0bfbc..4de3b8201 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -238,16 +238,12 @@ func (p *Patroni) Restart(server *v1.Pod) error { if err != nil { return err } - memberData, err := p.GetMemberData(server) - if err != nil { + if err := p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf); err != nil { return err } + p.logger.Infof("Postgres server successfuly restarted in pod %s", server.Name) - // do restart only when it is pending - if !memberData.PendingRestart { - return nil - } - return p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf) + return nil } // GetMemberData read member data from patroni API