From 28a39f5c082537a336c69b7e722402a2f478ae45 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 14 Mar 2022 12:10:21 +0100 Subject: [PATCH 1/6] always sync pooler objects --- pkg/cluster/connection_pooler.go | 76 ++++++++++++-------------------- 1 file changed, 27 insertions(+), 49 deletions(-) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 12c0be6ae..da25bf535 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -3,7 +3,6 @@ package cluster import ( "context" "fmt" - "reflect" "strings" "github.com/r3labs/diff" @@ -722,31 +721,6 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look var err error var connectionPoolerNeeded bool - needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler) - masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler) - if err != nil { - c.logger.Error("Error in getting diff of master connection pooler changes") - } - replicaChanges, err := diff.Diff(oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler) - if err != nil { - c.logger.Error("Error in getting diff of replica connection pooler changes") - } - - // skip pooler sync when theres no diff or it's deactivated - // but, handling the case when connectionPooler is not there but it is required - // as per spec, hence do not skip syncing in that case, even though there - // is no diff in specs - if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) && - ((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) || - (c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) && - ((c.ConnectionPooler[Master] != nil && c.ConnectionPooler[Master].LookupFunction) || - (c.ConnectionPooler[Replica] != nil && c.ConnectionPooler[Replica].LookupFunction)))) { - c.logger.Debugln("syncing pooler is not required") - return nil, nil - } - - logPoolerEssentials(c.logger, oldSpec, newSpec) - // Check and perform the sync requirements for each of the roles. for _, role := range [2]PostgresRole{Master, Replica} { @@ -781,7 +755,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look // in between // in this case also do not forget to install lookup function - if !c.ConnectionPooler[role].LookupFunction { + // skip installation in standby clusters, since they are read-only + if !c.ConnectionPooler[role].LookupFunction && c.Spec.StandbyCluster == nil { connectionPooler := c.Spec.ConnectionPooler specSchema := "" specUser := "" @@ -838,32 +813,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( SyncReason, error) { + var ( + deployment *appsv1.Deployment + newDeployment *appsv1.Deployment + svc *v1.Service + newService *v1.Service + err error + ) + syncReason := make([]string, 0) - deployment, err := c.KubeClient. + deployment, err = c.KubeClient. Deployments(c.Namespace). Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) if err != nil && k8sutil.ResourceNotFound(err) { - msg := "deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) + c.logger.Warningf("deployment %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role)) - deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not generate deployment for connection pooler: %v", err) } - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + deployment, err = c.KubeClient. + Deployments(newDeployment.Namespace). + Create(context.TODO(), newDeployment, metav1.CreateOptions{}) if err != nil { return NoSync, err } c.ConnectionPooler[role].Deployment = deployment } else if err != nil { - msg := "could not get connection pooler deployment to sync: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not get connection pooler deployment to sync: %v", err) } else { c.ConnectionPooler[role].Deployment = deployment // actual synchronization @@ -900,16 +880,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, defaultsReason...) if specSync || defaultsSync { - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.logger.Infof("update connection pooler deployment %s, reason: %+v", c.connectionPoolerName(role), syncReason) - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return syncReason, fmt.Errorf(msg, err) + return syncReason, fmt.Errorf("could not generate deployment for connection pooler: %v", err) } - deployment, err := updateConnectionPoolerDeployment(c.KubeClient, - newDeploymentSpec) + deployment, err = updateConnectionPoolerDeployment(c.KubeClient, newDeployment) if err != nil { return syncReason, err @@ -927,17 +905,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.ConnectionPooler[role].Deployment = deployment } - if svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { + if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { c.ConnectionPooler[role].Service = svc desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) if match, reason := k8sutil.SameService(svc, desiredSvc); !match { syncReason = append(syncReason, reason) c.logServiceChanges(role, svc, desiredSvc, false, reason) - updatedService, err := c.updateService(role, svc, desiredSvc) + newService, err = c.updateService(role, svc, desiredSvc) if err != nil { return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.ConnectionPooler[role].Service = updatedService + c.ConnectionPooler[role].Service = newService c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) } return NoSync, nil @@ -953,14 +931,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.logger.Warningf(msg, c.connectionPoolerName(role)) serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - service, err := c.KubeClient. + newService, err = c.KubeClient. Services(serviceSpec.Namespace). Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return NoSync, err } - c.ConnectionPooler[role].Service = service + c.ConnectionPooler[role].Service = newService return NoSync, nil } From 0dde19effcde941095139856ffcd5dd364be2c48 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 14 Mar 2022 14:46:00 +0100 Subject: [PATCH 2/6] bring back locks --- pkg/cluster/connection_pooler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index da25bf535..dcd9317d9 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -721,6 +721,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look var err error var connectionPoolerNeeded bool + logPoolerEssentials(c.logger, oldSpec, newSpec) + // Check and perform the sync requirements for each of the roles. for _, role := range [2]PostgresRole{Master, Replica} { @@ -922,13 +924,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql } if !k8sutil.ResourceNotFound(err) { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not get connection pooler service to sync: %v", err) } c.ConnectionPooler[role].Service = nil - msg := "Service %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) + c.logger.Warningf("service %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role)) serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) newService, err = c.KubeClient. From e0f7ca616636b3b2c5e6a84fe59dcfa28f776503 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 14 Mar 2022 14:57:20 +0100 Subject: [PATCH 3/6] do not capitalize log messages in Go --- pkg/cluster/connection_pooler.go | 8 ++++---- pkg/cluster/k8sres.go | 20 ++++++++++---------- pkg/controller/operator_config.go | 2 +- pkg/util/config/config.go | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index dcd9317d9..b9e27f1eb 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -343,7 +343,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio } if *numberOfInstances < constants.ConnectionPoolerMinInstances { - msg := "Adjusted number of connection pooler instances from %d to %d" + msg := "adjusted number of connection pooler instances from %d to %d" c.logger.Warningf(msg, *numberOfInstances, constants.ConnectionPoolerMinInstances) *numberOfInstances = constants.ConnectionPoolerMinInstances @@ -613,7 +613,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1. *deployment.Spec.Replicas != *config.NumberOfInstances { sync = true - msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + msg := fmt.Sprintf("numberOfInstances is different (having %d, required %d)", *deployment.Spec.Replicas, *config.NumberOfInstances) reasons = append(reasons, msg) } @@ -622,7 +622,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1. poolerContainer.Image != config.Image { sync = true - msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + msg := fmt.Sprintf("dockerImage is different (having %s, required %s)", poolerContainer.Image, config.Image) reasons = append(reasons, msg) } @@ -636,7 +636,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1. // updates for new resource values). if err == nil && syncResources(&poolerContainer.Resources, expectedResources) { sync = true - msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + msg := fmt.Sprintf("resources are different (having %+v, required %+v)", poolerContainer.Resources, expectedResources) reasons = append(reasons, msg) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 6fa983c80..2b2ac9c08 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1100,7 +1100,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // backward compatible check for InitContainers if spec.InitContainersOld != nil { - msg := "Manifest parameter init_containers is deprecated." + msg := "manifest parameter init_containers is deprecated." if spec.InitContainers == nil { c.logger.Warningf("%s Consider using initContainers instead.", msg) spec.InitContainers = spec.InitContainersOld @@ -1111,7 +1111,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // backward compatible check for PodPriorityClassName if spec.PodPriorityClassNameOld != "" { - msg := "Manifest parameter pod_priority_class_name is deprecated." + msg := "manifest parameter pod_priority_class_name is deprecated." if spec.PodPriorityClassName == "" { c.logger.Warningf("%s Consider using podPriorityClassName instead.", msg) spec.PodPriorityClassName = spec.PodPriorityClassNameOld @@ -1504,13 +1504,13 @@ func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec, mountPaths := map[string]acidv1.AdditionalVolume{} for i, additionalVolume := range additionalVolumes { if previousVolume, exist := mountPaths[additionalVolume.MountPath]; exist { - msg := "Volume %+v cannot be mounted to the same path as %+v" + msg := "volume %+v cannot be mounted to the same path as %+v" c.logger.Warningf(msg, additionalVolume, previousVolume) continue } if additionalVolume.MountPath == constants.PostgresDataMount { - msg := "Cannot mount volume on postgresql data directory, %+v" + msg := "cannot mount volume on postgresql data directory, %+v" c.logger.Warningf(msg, additionalVolume) continue } @@ -1523,7 +1523,7 @@ func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec, for _, target := range additionalVolume.TargetContainers { if target == "all" && len(additionalVolume.TargetContainers) != 1 { - msg := `Target containers could be either "all" or a list + msg := `target containers could be either "all" or a list of containers, mixing those is not allowed, %+v` c.logger.Warningf(msg, additionalVolume) continue @@ -1813,11 +1813,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) }) } else { // cloning with S3, find out the bucket to clone - msg := "Clone from S3 bucket" + msg := "clone from S3 bucket" c.logger.Info(msg, description.S3WalPath) if description.S3WalPath == "" { - msg := "Figure out which S3 bucket to use from env" + msg := "figure out which S3 bucket to use from env" c.logger.Info(msg, description.S3WalPath) if c.OpConfig.WALES3Bucket != "" { @@ -1861,7 +1861,7 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) result = append(result, envs...) } else { - msg := "Use custom parsed S3WalPath %s from the manifest" + msg := "use custom parsed S3WalPath %s from the manifest" c.logger.Warningf(msg, description.S3WalPath) result = append(result, v1.EnvVar{ @@ -1910,7 +1910,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript if description.S3WalPath != "" { // standby with S3, find out the bucket to setup standby - msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s " + msg := "standby from S3 bucket using custom parsed S3WalPath from the manifest %s " c.logger.Infof(msg, description.S3WalPath) result = append(result, v1.EnvVar{ @@ -1918,7 +1918,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript Value: description.S3WalPath, }) } else if description.GSWalPath != "" { - msg := "Standby from GS bucket using custom parsed GSWalPath from the manifest %s " + msg := "standby from GS bucket using custom parsed GSWalPath from the manifest %s " c.logger.Infof(msg, description.GSWalPath) envs := []v1.EnvVar{ diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index e470fb59d..4bc85df0f 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -226,7 +226,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur constants.ConnectionPoolerUserName) if result.ConnectionPooler.User == result.SuperUsername { - msg := "Connection pool user is not allowed to be the same as super user, username: %s" + msg := "connection pool user is not allowed to be the same as super user, username: %s" panic(fmt.Errorf(msg, result.ConnectionPooler.User)) } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index fc4b73074..0ed6c3c63 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -290,7 +290,7 @@ func validate(cfg *Config) (err error) { } if cfg.ConnectionPooler.User == cfg.SuperUsername { - msg := "Connection pool user is not allowed to be the same as super user, username: %s" + msg := "connection pool user is not allowed to be the same as super user, username: %s" err = fmt.Errorf(msg, cfg.ConnectionPooler.User) } From ea1a62e73d9e4d8f20cbd2ae4748a26ab5561856 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 14 Mar 2022 15:56:05 +0100 Subject: [PATCH 4/6] stretch pooler e2e test --- e2e/tests/test_e2e.py | 13 +++++++++++-- pkg/cluster/connection_pooler.go | 12 ++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 09b2c9c60..a425354a5 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -479,8 +479,6 @@ def test_enable_disable_connection_pooler(self): 'spec': { 'enableConnectionPooler': True, 'enableReplicaConnectionPooler': True, - 'enableMasterPoolerLoadBalancer': True, - 'enableReplicaPoolerLoadBalancer': True, } }) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") @@ -490,6 +488,17 @@ def test_enable_disable_connection_pooler(self): self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 2, "No pooler replica pods found") self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 2, "No pooler service found") self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Pooler secret not created") + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableMasterPoolerLoadBalancer': True, + 'enableReplicaPoolerLoadBalancer': True, + } + }) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_service_type(master_pooler_label+","+pooler_label), 'LoadBalancer', "Expected LoadBalancer service type for master pooler pod, found {}") diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index b9e27f1eb..5639b0283 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -818,7 +818,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql var ( deployment *appsv1.Deployment newDeployment *appsv1.Deployment - svc *v1.Service + service *v1.Service newService *v1.Service err error ) @@ -907,13 +907,13 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.ConnectionPooler[role].Deployment = deployment } - if svc, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { - c.ConnectionPooler[role].Service = svc + if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { + c.ConnectionPooler[role].Service = service desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - if match, reason := k8sutil.SameService(svc, desiredSvc); !match { + if match, reason := k8sutil.SameService(service, desiredSvc); !match { syncReason = append(syncReason, reason) - c.logServiceChanges(role, svc, desiredSvc, false, reason) - newService, err = c.updateService(role, svc, desiredSvc) + c.logServiceChanges(role, service, desiredSvc, false, reason) + newService, err = c.updateService(role, service, desiredSvc) if err != nil { return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } From 064f6cce7eb22f282e2fc33a922c7ffcac4b2376 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 14 Mar 2022 19:22:07 +0100 Subject: [PATCH 5/6] shorten e2e test for test deployment --- e2e/tests/test_e2e.py | 1033 ----------------------------------------- 1 file changed, 1033 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index a425354a5..93311f099 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -676,1039 +676,6 @@ def test_enable_load_balancer(self): print('Operator log: {}'.format(k8s.get_operator_log())) raise - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_infrastructure_roles(self): - ''' - Test using external secrets for infrastructure roles - ''' - k8s = self.k8s - # update infrastructure roles description - secret_name = "postgresql-infrastructure-roles" - roles = "secretname: postgresql-infrastructure-roles-new, userkey: user,"\ - "rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" - patch_infrastructure_roles = { - "data": { - "infrastructure_roles_secret_name": secret_name, - "infrastructure_roles_secrets": roles, - }, - } - k8s.update_config(patch_infrastructure_roles) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") - - try: - # check that new roles are represented in the config by requesting the - # operator configuration via API - - def verify_role(): - try: - operator_pod = k8s.get_operator_pod() - get_config_cmd = "wget --quiet -O - localhost:8080/config" - result = k8s.exec_with_kubectl(operator_pod.metadata.name, - get_config_cmd) - try: - roles_dict = (json.loads(result.stdout) - .get("controller", {}) - .get("InfrastructureRoles")) - except: - return False - - if "robot_zmon_acid_monitoring_new" in roles_dict: - role = roles_dict["robot_zmon_acid_monitoring_new"] - role.pop("Password", None) - self.assertDictEqual(role, { - "Name": "robot_zmon_acid_monitoring_new", - "Namespace":"", - "Flags": None, - "MemberOf": ["robot_zmon"], - "Parameters": None, - "AdminRole": "", - "Origin": 2, - "IsDbOwner": False, - "Deleted": False - }) - return True - except: - pass - - return False - - self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_lazy_spilo_upgrade(self): - ''' - Test lazy upgrade for the Spilo image: operator changes a stateful set - but lets pods run with the old image until they are recreated for - reasons other than operator's activity. That works because the operator - configures stateful sets to use "onDelete" pod update policy. - The test covers: - 1) enabling lazy upgrade in existing operator deployment - 2) forcing the normal rolling upgrade by changing the operator - configmap and restarting its pod - ''' - - k8s = self.k8s - - pod0 = 'acid-minimal-cluster-0' - pod1 = 'acid-minimal-cluster-1' - - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, - "No 2 pods running") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), - 2, "Postgres status did not enter running") - - patch_lazy_spilo_upgrade = { - "data": { - "docker_image": SPILO_CURRENT, - "enable_lazy_spilo_upgrade": "false" - } - } - k8s.update_config(patch_lazy_spilo_upgrade, - step="Init baseline image version") - - self.eventuallyEqual(lambda: k8s.get_statefulset_image(), SPILO_CURRENT, - "Statefulset not updated initially") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, - "No 2 pods running") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), - 2, "Postgres status did not enter running") - - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), - SPILO_CURRENT, "Rolling upgrade was not executed") - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), - SPILO_CURRENT, "Rolling upgrade was not executed") - - # update docker image in config and enable the lazy upgrade - conf_image = SPILO_LAZY - patch_lazy_spilo_upgrade = { - "data": { - "docker_image": conf_image, - "enable_lazy_spilo_upgrade": "true" - } - } - k8s.update_config(patch_lazy_spilo_upgrade, - step="patch image and lazy upgrade") - self.eventuallyEqual(lambda: k8s.get_statefulset_image(), conf_image, - "Statefulset not updated to next Docker image") - - try: - # restart the pod to get a container with the new image - k8s.api.core_v1.delete_namespaced_pod(pod0, 'default') - - # verify only pod-0 which was deleted got new image from statefulset - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), - conf_image, "Delete pod-0 did not get new spilo image") - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, - "No two pods running after lazy rolling upgrade") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), - 2, "Postgres status did not enter running") - self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), - SPILO_CURRENT, - "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) - - # clean up - unpatch_lazy_spilo_upgrade = { - "data": { - "enable_lazy_spilo_upgrade": "false", - } - } - k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade") - - # at this point operator will complete the normal rolling upgrade - # so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), - conf_image, "Rolling upgrade was not executed", - 50, 3) - self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), - conf_image, "Rolling upgrade was not executed", - 50, 3) - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), - 2, "Postgres status did not enter running") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_logical_backup_cron_job(self): - ''' - Ensure we can (a) create the cron job at user request for a specific PG cluster - (b) update the cluster-wide image for the logical backup pod - (c) delete the job at user request - Limitations: - (a) Does not run the actual batch job because there is no S3 mock to upload backups to - (b) Assumes 'acid-minimal-cluster' exists as defined in setUp - ''' - - k8s = self.k8s - - # create the cron job - schedule = "7 7 7 7 *" - pg_patch_enable_backup = { - "spec": { - "enableLogicalBackup": True, - "logicalBackupSchedule": schedule - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_backup) - - try: - self.eventuallyEqual(lambda: len(k8s.get_logical_backup_job().items), 1, "failed to create logical backup job") - - job = k8s.get_logical_backup_job().items[0] - self.assertEqual(job.metadata.name, "logical-backup-acid-minimal-cluster", - "Expected job name {}, found {}" - .format("logical-backup-acid-minimal-cluster", job.metadata.name)) - self.assertEqual(job.spec.schedule, schedule, - "Expected {} schedule, found {}" - .format(schedule, job.spec.schedule)) - - # update the cluster-wide image of the logical backup pod - image = "test-image-name" - patch_logical_backup_image = { - "data": { - "logical_backup_docker_image": image, - } - } - k8s.update_config(patch_logical_backup_image, step="patch logical backup image") - - def get_docker_image(): - jobs = k8s.get_logical_backup_job().items - return jobs[0].spec.job_template.spec.template.spec.containers[0].image - - self.eventuallyEqual(get_docker_image, image, - "Expected job image {}, found {}".format(image, "{}")) - - # delete the logical backup cron job - pg_patch_disable_backup = { - "spec": { - "enableLogicalBackup": False, - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_backup) - - self.eventuallyEqual(lambda: len(k8s.get_logical_backup_job().items), 0, "failed to create logical backup job") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # ensure cluster is healthy after tests - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - @unittest.skip("Skipping this test until fixed") - def test_major_version_upgrade(self): - k8s = self.k8s - result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml") - self.eventuallyEqual(lambda: k8s.count_running_pods(labels="application=spilo,cluster-name=acid-upgrade-test"), 2, "No 2 pods running") - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - pg_patch_version = { - "spec": { - "postgres": { - "version": "14" - } - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version) - - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - def check_version_14(): - p = k8s.get_patroni_state("acid-upgrade-test-0") - version = p["server_version"][0:2] - return version - - self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14") - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_min_resource_limits(self): - ''' - Lower resource limits below configured minimum and let operator fix it - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - - # get nodes of master and replica(s) (expected target of new master) - _, replica_nodes = k8s.get_pg_nodes(cluster_label) - self.assertNotEqual(replica_nodes, []) - - # configure minimum boundaries for CPU and memory limits - minCPULimit = '503m' - minMemoryLimit = '502Mi' - - patch_min_resource_limits = { - "data": { - "min_cpu_limit": minCPULimit, - "min_memory_limit": minMemoryLimit - } - } - k8s.update_config(patch_min_resource_limits, "Minimum resource test") - - # lower resource limits below minimum - pg_patch_resources = { - "spec": { - "resources": { - "requests": { - "cpu": "10m", - "memory": "50Mi" - }, - "limits": { - "cpu": "200m", - "memory": "200Mi" - } - } - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # wait for switched over - k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") - - def verify_pod_limits(): - pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector="cluster-name=acid-minimal-cluster,application=spilo").items - if len(pods) < 2: - return False - - r = pods[0].spec.containers[0].resources.limits['memory'] == minMemoryLimit - r = r and pods[0].spec.containers[0].resources.limits['cpu'] == minCPULimit - r = r and pods[1].spec.containers[0].resources.limits['memory'] == minMemoryLimit - r = r and pods[1].spec.containers[0].resources.limits['cpu'] == minCPULimit - return r - - self.eventuallyTrue(verify_pod_limits, "Pod limits where not adjusted") - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_multi_namespace_support(self): - ''' - Create a customized Postgres cluster in a non-default namespace. - ''' - k8s = self.k8s - - with open("manifests/complete-postgres-manifest.yaml", 'r+') as f: - pg_manifest = yaml.safe_load(f) - pg_manifest["metadata"]["namespace"] = self.test_namespace - yaml.dump(pg_manifest, f, Dumper=yaml.Dumper) - - try: - k8s.create_with_kubectl("manifests/complete-postgres-manifest.yaml") - k8s.wait_for_pod_start("spilo-role=master", self.test_namespace) - k8s.wait_for_pod_start("spilo-role=replica", self.test_namespace) - self.assert_master_is_unique(self.test_namespace, "acid-test-cluster") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - finally: - # delete the new cluster so that the k8s_api.get_operator_state works correctly in subsequent tests - # ideally we should delete the 'test' namespace here but - # the pods inside the namespace stuck in the Terminating state making the test time out - k8s.api.custom_objects_api.delete_namespaced_custom_object( - "acid.zalan.do", "v1", self.test_namespace, "postgresqls", "acid-test-cluster") - time.sleep(5) - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_node_affinity(self): - ''' - Add label to a node and update postgres cluster spec to deploy only on a node with that label - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - - # verify we are in good state from potential previous tests - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - - # get nodes of master and replica(s) - master_nodes, replica_nodes = k8s.get_cluster_nodes() - self.assertNotEqual(master_nodes, []) - self.assertNotEqual(replica_nodes, []) - - # label node with environment=postgres - node_label_body = { - "metadata": { - "labels": { - "node-affinity-test": "postgres" - } - } - } - - try: - # patch master node with the label - k8s.api.core_v1.patch_node(master_nodes[0], node_label_body) - - # add node affinity to cluster - patch_node_affinity_config = { - "spec": { - "nodeAffinity" : { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "node-affinity-test", - "operator": "In", - "values": [ - "postgres" - ] - } - ] - } - ] - } - } - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - group="acid.zalan.do", - version="v1", - namespace="default", - plural="postgresqls", - name="acid-minimal-cluster", - body=patch_node_affinity_config) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # node affinity change should cause replica to relocate from replica node to master node due to node affinity requirement - k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - # next master will be switched over and pod needs to be replaced as well to finish the rolling update - k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) - for pod in podsList.items: - if pod.metadata.labels.get('spilo-role') == 'replica': - self.assertEqual(master_nodes[0], pod.spec.node_name, - "Sanity check: expected replica to relocate to master node {}, but found on {}".format(master_nodes[0], pod.spec.node_name)) - - # check that pod has correct node affinity - key = pod.spec.affinity.node_affinity.required_during_scheduling_ignored_during_execution.node_selector_terms[0].match_expressions[0].key - value = pod.spec.affinity.node_affinity.required_during_scheduling_ignored_during_execution.node_selector_terms[0].match_expressions[0].values[0] - self.assertEqual("node-affinity-test", key, - "Sanity check: expect node selector key to be equal to 'node-affinity-test' but got {}".format(key)) - self.assertEqual("postgres", value, - "Sanity check: expect node selector value to be equal to 'postgres' but got {}".format(value)) - - patch_node_remove_affinity_config = { - "spec": { - "nodeAffinity" : None - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - group="acid.zalan.do", - version="v1", - namespace="default", - plural="postgresqls", - name="acid-minimal-cluster", - body=patch_node_remove_affinity_config) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # node affinity change should cause another rolling update and relocation of replica - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # toggle pod anti affinity to make sure replica and master run on separate nodes - self.assert_distributed_pods(master_nodes) - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_node_readiness_label(self): - ''' - Remove node readiness label from master node. This must cause a failover. - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - readiness_label = 'lifecycle-status' - readiness_value = 'ready' - - # verify we are in good state from potential previous tests - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - - # get nodes of master and replica(s) (expected target of new master) - master_nodes, replica_nodes = k8s.get_cluster_nodes() - self.assertNotEqual(master_nodes, []) - self.assertNotEqual(replica_nodes, []) - - try: - # add node_readiness_label to potential failover nodes - patch_readiness_label = { - "metadata": { - "labels": { - readiness_label: readiness_value - } - } - } - for replica_node in replica_nodes: - k8s.api.core_v1.patch_node(replica_node, patch_readiness_label) - - # define node_readiness_label in config map which should trigger a rolling update - patch_readiness_label_config = { - "data": { - "node_readiness_label": readiness_label + ':' + readiness_value, - "node_readiness_label_merge": "AND", - } - } - k8s.update_config(patch_readiness_label_config, "setting readiness label") - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # first replica will be replaced and get the new affinity - # however, it might not start due to a volume node affinity conflict - # in this case only if the pvc and pod are deleted it can be scheduled - replica = k8s.get_cluster_replica_pod() - if replica.status.phase == 'Pending': - k8s.api.core_v1.delete_namespaced_persistent_volume_claim('pgdata-' + replica.metadata.name, 'default') - k8s.api.core_v1.delete_namespaced_pod(replica.metadata.name, 'default') - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - # next master will be switched over and pod needs to be replaced as well to finish the rolling update - k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - # patch also node where master ran before - k8s.api.core_v1.patch_node(master_nodes[0], patch_readiness_label) - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # toggle pod anti affinity to move replica away from master node - self.assert_distributed_pods(master_nodes) - - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_overwrite_pooler_deployment(self): - pooler_name = 'acid-minimal-cluster-pooler' - k8s = self.k8s - k8s.create_with_kubectl("manifests/minimal-fake-pooler-deployment.yaml") - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 1, - "Initial broken deployment not rolled out") - - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', - 'postgresqls', 'acid-minimal-cluster', - { - 'spec': { - 'enableConnectionPooler': True - } - }) - - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 2, - "Operator did not succeed in overwriting labels") - - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', - 'postgresqls', 'acid-minimal-cluster', - { - 'spec': { - 'enableConnectionPooler': False - } - }) - - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler="+pooler_name), - 0, "Pooler pods not scaled down") - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_password_rotation(self): - ''' - Test password rotation and removal of users due to retention policy - ''' - k8s = self.k8s - leader = k8s.get_cluster_leader_pod() - today = date.today() - - # enable password rotation for owner of foo database - pg_patch_inplace_rotation_for_owner = { - "spec": { - "usersWithInPlaceSecretRotation": [ - "zalando" - ] - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_inplace_rotation_for_owner) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # check if next rotation date was set in secret - secret_data = k8s.get_secret_data("zalando") - next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ") - today90days = today+timedelta(days=90) - self.assertEqual(today90days, next_rotation_timestamp.date(), - "Unexpected rotation date in secret of zalando user: expected {}, got {}".format(today90days, next_rotation_timestamp.date())) - - # create fake rotation users that should be removed by operator - # but have one that would still fit into the retention period - create_fake_rotation_user = """ - CREATE ROLE foo_user201031 IN ROLE foo_user; - CREATE ROLE foo_user211031 IN ROLE foo_user; - CREATE ROLE foo_user"""+(today-timedelta(days=40)).strftime("%y%m%d")+""" IN ROLE foo_user; - """ - self.query_database(leader.metadata.name, "postgres", create_fake_rotation_user) - - # patch foo_user secret with outdated rotation date - fake_rotation_date = today.isoformat() + 'T00:00:00Z' - fake_rotation_date_encoded = base64.b64encode(fake_rotation_date.encode('utf-8')) - secret_fake_rotation = { - "data": { - "nextRotation": str(fake_rotation_date_encoded, 'utf-8'), - }, - } - k8s.api.core_v1.patch_namespaced_secret( - name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do", - namespace="default", - body=secret_fake_rotation) - - # enable password rotation for all other users (foo_user) - # this will force a sync of secrets for further assertions - enable_password_rotation = { - "data": { - "enable_password_rotation": "true", - "password_rotation_interval": "30", - "password_rotation_user_retention": "30", # should be set to 60 - }, - } - k8s.update_config(enable_password_rotation) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") - - # check if next rotation date and username have been replaced - secret_data = k8s.get_secret_data("foo_user") - secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8') - next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ") - rotation_user = "foo_user"+today.strftime("%y%m%d") - today30days = today+timedelta(days=30) - - self.assertEqual(rotation_user, secret_username, - "Unexpected username in secret of foo_user: expected {}, got {}".format(rotation_user, secret_username)) - self.assertEqual(today30days, next_rotation_timestamp.date(), - "Unexpected rotation date in secret of foo_user: expected {}, got {}".format(today30days, next_rotation_timestamp.date())) - - # check if oldest fake rotation users were deleted - # there should only be foo_user, foo_user+today and foo_user+today-40days - user_query = """ - SELECT rolname - FROM pg_catalog.pg_roles - WHERE rolname LIKE 'foo_user%'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 3, - "Found incorrect number of rotation users", 10, 5) - - # disable password rotation for all other users (foo_user) - # and pick smaller intervals to see if the third fake rotation user is dropped - enable_password_rotation = { - "data": { - "enable_password_rotation": "false", - "password_rotation_interval": "15", - "password_rotation_user_retention": "30", # 2 * rotation interval - }, - } - k8s.update_config(enable_password_rotation) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") - - # check if username in foo_user secret is reset - secret_data = k8s.get_secret_data("foo_user") - secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8') - next_rotation_timestamp = str(base64.b64decode(secret_data["nextRotation"]), 'utf-8') - self.assertEqual("foo_user", secret_username, - "Unexpected username in secret of foo_user: expected {}, got {}".format("foo_user", secret_username)) - self.assertEqual('', next_rotation_timestamp, - "Unexpected rotation date in secret of foo_user: expected empty string, got {}".format(next_rotation_timestamp)) - - # check roles again, there should only be foo_user and foo_user+today - user_query = """ - SELECT rolname - FROM pg_catalog.pg_roles - WHERE rolname LIKE 'foo_user%'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, - "Found incorrect number of rotation users", 10, 5) - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_rolling_update_flag(self): - ''' - Add rolling update flag to only the master and see it failing over - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - - # verify we are in good state from potential previous tests - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - - # get node and replica (expected target of new master) - _, replica_nodes = k8s.get_pg_nodes(cluster_label) - - # rolling update annotation - flag = { - "metadata": { - "annotations": { - "zalando-postgres-operator-rolling-update-required": "true", - } - } - } - - try: - podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) - for pod in podsList.items: - # add flag only to the master to make it appear to the operator as a leftover from a rolling update - if pod.metadata.labels.get('spilo-role') == 'master': - old_creation_timestamp = pod.metadata.creation_timestamp - k8s.patch_pod(flag, pod.metadata.name, pod.metadata.namespace) - else: - # remember replica name to check if operator does a switchover - switchover_target = pod.metadata.name - - # do not wait until the next sync - k8s.delete_operator_pod() - - # operator should now recreate the master pod and do a switchover before - k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - # check if the former replica is now the new master - leader = k8s.get_cluster_leader_pod() - self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") - - # check that the old master has been recreated - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - replica = k8s.get_cluster_replica_pod() - self.assertTrue(replica.metadata.creation_timestamp > old_creation_timestamp, "Old master pod was not recreated") - - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_rolling_update_label_timeout(self): - ''' - Simulate case when replica does not receive label in time and rolling update does not finish - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - flag = "zalando-postgres-operator-rolling-update-required" - - # verify we are in good state from potential previous tests - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - - # get node and replica (expected target of new master) - _, replica_nodes = k8s.get_pg_nodes(cluster_label) - - # rolling update annotation - rolling_update_patch = { - "metadata": { - "annotations": { - flag: "true", - } - } - } - - # make pod_label_wait_timeout so short that rolling update fails on first try - # temporarily lower resync interval to reduce waiting for further tests - # pods should get healthy in the meantime - patch_resync_config = { - "data": { - "pod_label_wait_timeout": "2s", - "resync_period": "30s", - "repair_period": "30s", - } - } - - try: - # patch both pods for rolling update - podList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) - for pod in podList.items: - k8s.patch_pod(rolling_update_patch, pod.metadata.name, pod.metadata.namespace) - if pod.metadata.labels.get('spilo-role') == 'replica': - switchover_target = pod.metadata.name - - # update config and restart operator - k8s.update_config(patch_resync_config, "update resync interval and pod_label_wait_timeout") - - # operator should now recreate the replica pod first and do a switchover after - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - # pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync - # check if the cluster state is "SyncFailed" - self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail") - - # wait for next sync, replica should be running normally by now and be ready for switchover - k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - # check if the former replica is now the new master - leader = k8s.get_cluster_leader_pod() - self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") - - # wait for the old master to get restarted - k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) - - # status should again be "SyncFailed" but turn into "Running" on the next sync - time.sleep(30) - self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs") - - # revert config changes - patch_resync_config = { - "data": { - "pod_label_wait_timeout": "10m", - "resync_period": "4m", - "repair_period": "2m", - } - } - k8s.update_config(patch_resync_config, "revert resync interval and pod_label_wait_timeout") - - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_scaling(self): - ''' - Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime. - ''' - k8s = self.k8s - pod = "acid-minimal-cluster-0" - - k8s.scale_cluster(3) - self.eventuallyEqual(lambda: k8s.count_running_pods(), 3, "Scale up to 3 failed") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod)), 3, "Not all 3 nodes healthy") - - k8s.scale_cluster(2) - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "Scale down to 2 failed") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod)), 2, "Not all members 2 healthy") - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_service_annotations(self): - ''' - Create a Postgres cluster with service annotations and check them. - ''' - k8s = self.k8s - patch_custom_service_annotations = { - "data": { - "custom_service_annotations": "foo:bar", - } - } - k8s.update_config(patch_custom_service_annotations) - - pg_patch_custom_annotations = { - "spec": { - "serviceAnnotations": { - "annotation.key": "value", - "alice": "bob", - } - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_custom_annotations) - - annotations = { - "annotation.key": "value", - "foo": "bar", - "alice": "bob" - } - - self.eventuallyTrue(lambda: k8s.check_service_annotations("cluster-name=acid-minimal-cluster,spilo-role=master", annotations), "Wrong annotations") - self.eventuallyTrue(lambda: k8s.check_service_annotations("cluster-name=acid-minimal-cluster,spilo-role=replica", annotations), "Wrong annotations") - - # clean up - unpatch_custom_service_annotations = { - "data": { - "custom_service_annotations": "", - } - } - k8s.update_config(unpatch_custom_service_annotations) - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_statefulset_annotation_propagation(self): - ''' - Inject annotation to Postgresql CRD and check it's propagation to stateful set - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - - patch_sset_propagate_annotations = { - "data": { - "downscaler_annotations": "deployment-time,downscaler/*", - "inherited_annotations": "owned-by", - } - } - k8s.update_config(patch_sset_propagate_annotations) - - pg_crd_annotations = { - "metadata": { - "annotations": { - "deployment-time": "2020-04-30 12:00:00", - "downscaler/downtime_replicas": "0", - "owned-by": "acid", - }, - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_crd_annotations) - - annotations = { - "deployment-time": "2020-04-30 12:00:00", - "downscaler/downtime_replicas": "0", - "owned-by": "acid", - } - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing") - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_taint_based_eviction(self): - ''' - Add taint "postgres=:NoExecute" to node with master. This must cause a failover. - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - - # verify we are in good state from potential previous tests - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") - - # get nodes of master and replica(s) (expected target of new master) - master_nodes, replica_nodes = k8s.get_cluster_nodes() - self.assertNotEqual(master_nodes, []) - self.assertNotEqual(replica_nodes, []) - - # taint node with postgres=:NoExecute to force failover - body = { - "spec": { - "taints": [ - { - "effect": "NoExecute", - "key": "postgres" - } - ] - } - } - k8s.api.core_v1.patch_node(master_nodes[0], body) - - # add toleration to pods - patch_toleration_config = { - "data": { - "toleration": "key:postgres,operator:Exists,effect:NoExecute" - } - } - - try: - k8s.update_config(patch_toleration_config, step="allow tainted nodes") - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") - - self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") - self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # toggle pod anti affinity to move replica away from master node - self.assert_distributed_pods(master_nodes) - - @timeout_decorator.timeout(TEST_TIMEOUT_SEC) - def test_zz_cluster_deletion(self): - ''' - Test deletion with configured protection - ''' - k8s = self.k8s - cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - - # configure delete protection - patch_delete_annotations = { - "data": { - "delete_annotation_date_key": "delete-date", - "delete_annotation_name_key": "delete-clustername" - } - } - k8s.update_config(patch_delete_annotations) - time.sleep(25) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - try: - # this delete attempt should be omitted because of missing annotations - k8s.api.custom_objects_api.delete_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster") - time.sleep(15) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # check that pods and services are still there - k8s.wait_for_running_pods(cluster_label, 2) - k8s.wait_for_service(cluster_label) - - # recreate Postgres cluster resource - k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") - - # wait a little before proceeding - time.sleep(10) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # add annotations to manifest - delete_date = datetime.today().strftime('%Y-%m-%d') - pg_patch_delete_annotations = { - "metadata": { - "annotations": { - "delete-date": delete_date, - "delete-clustername": "acid-minimal-cluster", - } - } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_delete_annotations) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - - # wait a little before proceeding - time.sleep(20) - k8s.wait_for_running_pods(cluster_label, 2) - k8s.wait_for_service(cluster_label) - - # now delete process should be triggered - k8s.api.custom_objects_api.delete_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster") - - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( - "acid.zalan.do", "v1", "default", "postgresqls", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, "Manifest not deleted") - - # check if everything has been deleted - self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted") - self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") - self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") - self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") - self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") - self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 0, "Secrets not deleted") - - except timeout_decorator.TimeoutError: - print('Operator log: {}'.format(k8s.get_operator_log())) - raise - - # reset configmap - patch_delete_annotations = { - "data": { - "delete_annotation_date_key": "", - "delete_annotation_name_key": "" - } - } - k8s.update_config(patch_delete_annotations) - def assert_master_is_unique(self, namespace='default', clusterName="acid-minimal-cluster"): ''' Check that there is a single pod in the k8s cluster with the label "spilo-role=master" From 3dd2b736bda0337aa5443649471b66723f317a47 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 15 Mar 2022 10:40:34 +0100 Subject: [PATCH 6/6] bring back all e2e tests --- e2e/tests/test_e2e.py | 1033 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1033 insertions(+) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 93311f099..a425354a5 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -676,6 +676,1039 @@ def test_enable_load_balancer(self): print('Operator log: {}'.format(k8s.get_operator_log())) raise + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_infrastructure_roles(self): + ''' + Test using external secrets for infrastructure roles + ''' + k8s = self.k8s + # update infrastructure roles description + secret_name = "postgresql-infrastructure-roles" + roles = "secretname: postgresql-infrastructure-roles-new, userkey: user,"\ + "rolekey: memberof, passwordkey: password, defaultrolevalue: robot_zmon" + patch_infrastructure_roles = { + "data": { + "infrastructure_roles_secret_name": secret_name, + "infrastructure_roles_secrets": roles, + }, + } + k8s.update_config(patch_infrastructure_roles) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + + try: + # check that new roles are represented in the config by requesting the + # operator configuration via API + + def verify_role(): + try: + operator_pod = k8s.get_operator_pod() + get_config_cmd = "wget --quiet -O - localhost:8080/config" + result = k8s.exec_with_kubectl(operator_pod.metadata.name, + get_config_cmd) + try: + roles_dict = (json.loads(result.stdout) + .get("controller", {}) + .get("InfrastructureRoles")) + except: + return False + + if "robot_zmon_acid_monitoring_new" in roles_dict: + role = roles_dict["robot_zmon_acid_monitoring_new"] + role.pop("Password", None) + self.assertDictEqual(role, { + "Name": "robot_zmon_acid_monitoring_new", + "Namespace":"", + "Flags": None, + "MemberOf": ["robot_zmon"], + "Parameters": None, + "AdminRole": "", + "Origin": 2, + "IsDbOwner": False, + "Deleted": False + }) + return True + except: + pass + + return False + + self.eventuallyTrue(verify_role, "infrastructure role setup is not loaded") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_lazy_spilo_upgrade(self): + ''' + Test lazy upgrade for the Spilo image: operator changes a stateful set + but lets pods run with the old image until they are recreated for + reasons other than operator's activity. That works because the operator + configures stateful sets to use "onDelete" pod update policy. + The test covers: + 1) enabling lazy upgrade in existing operator deployment + 2) forcing the normal rolling upgrade by changing the operator + configmap and restarting its pod + ''' + + k8s = self.k8s + + pod0 = 'acid-minimal-cluster-0' + pod1 = 'acid-minimal-cluster-1' + + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No 2 pods running") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") + + patch_lazy_spilo_upgrade = { + "data": { + "docker_image": SPILO_CURRENT, + "enable_lazy_spilo_upgrade": "false" + } + } + k8s.update_config(patch_lazy_spilo_upgrade, + step="Init baseline image version") + + self.eventuallyEqual(lambda: k8s.get_statefulset_image(), SPILO_CURRENT, + "Statefulset not updated initially") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No 2 pods running") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") + + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), + SPILO_CURRENT, "Rolling upgrade was not executed") + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), + SPILO_CURRENT, "Rolling upgrade was not executed") + + # update docker image in config and enable the lazy upgrade + conf_image = SPILO_LAZY + patch_lazy_spilo_upgrade = { + "data": { + "docker_image": conf_image, + "enable_lazy_spilo_upgrade": "true" + } + } + k8s.update_config(patch_lazy_spilo_upgrade, + step="patch image and lazy upgrade") + self.eventuallyEqual(lambda: k8s.get_statefulset_image(), conf_image, + "Statefulset not updated to next Docker image") + + try: + # restart the pod to get a container with the new image + k8s.api.core_v1.delete_namespaced_pod(pod0, 'default') + + # verify only pod-0 which was deleted got new image from statefulset + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), + conf_image, "Delete pod-0 did not get new spilo image") + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, + "No two pods running after lazy rolling upgrade") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") + self.assertNotEqual(lambda: k8s.get_effective_pod_image(pod1), + SPILO_CURRENT, + "pod-1 should not have change Docker image to {}".format(SPILO_CURRENT)) + + # clean up + unpatch_lazy_spilo_upgrade = { + "data": { + "enable_lazy_spilo_upgrade": "false", + } + } + k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade") + + # at this point operator will complete the normal rolling upgrade + # so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0), + conf_image, "Rolling upgrade was not executed", + 50, 3) + self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod1), + conf_image, "Rolling upgrade was not executed", + 50, 3) + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod0)), + 2, "Postgres status did not enter running") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_logical_backup_cron_job(self): + ''' + Ensure we can (a) create the cron job at user request for a specific PG cluster + (b) update the cluster-wide image for the logical backup pod + (c) delete the job at user request + Limitations: + (a) Does not run the actual batch job because there is no S3 mock to upload backups to + (b) Assumes 'acid-minimal-cluster' exists as defined in setUp + ''' + + k8s = self.k8s + + # create the cron job + schedule = "7 7 7 7 *" + pg_patch_enable_backup = { + "spec": { + "enableLogicalBackup": True, + "logicalBackupSchedule": schedule + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_enable_backup) + + try: + self.eventuallyEqual(lambda: len(k8s.get_logical_backup_job().items), 1, "failed to create logical backup job") + + job = k8s.get_logical_backup_job().items[0] + self.assertEqual(job.metadata.name, "logical-backup-acid-minimal-cluster", + "Expected job name {}, found {}" + .format("logical-backup-acid-minimal-cluster", job.metadata.name)) + self.assertEqual(job.spec.schedule, schedule, + "Expected {} schedule, found {}" + .format(schedule, job.spec.schedule)) + + # update the cluster-wide image of the logical backup pod + image = "test-image-name" + patch_logical_backup_image = { + "data": { + "logical_backup_docker_image": image, + } + } + k8s.update_config(patch_logical_backup_image, step="patch logical backup image") + + def get_docker_image(): + jobs = k8s.get_logical_backup_job().items + return jobs[0].spec.job_template.spec.template.spec.containers[0].image + + self.eventuallyEqual(get_docker_image, image, + "Expected job image {}, found {}".format(image, "{}")) + + # delete the logical backup cron job + pg_patch_disable_backup = { + "spec": { + "enableLogicalBackup": False, + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_disable_backup) + + self.eventuallyEqual(lambda: len(k8s.get_logical_backup_job().items), 0, "failed to create logical backup job") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # ensure cluster is healthy after tests + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + @unittest.skip("Skipping this test until fixed") + def test_major_version_upgrade(self): + k8s = self.k8s + result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml") + self.eventuallyEqual(lambda: k8s.count_running_pods(labels="application=spilo,cluster-name=acid-upgrade-test"), 2, "No 2 pods running") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + pg_patch_version = { + "spec": { + "postgres": { + "version": "14" + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + def check_version_14(): + p = k8s.get_patroni_state("acid-upgrade-test-0") + version = p["server_version"][0:2] + return version + + self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14") + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_min_resource_limits(self): + ''' + Lower resource limits below configured minimum and let operator fix it + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # get nodes of master and replica(s) (expected target of new master) + _, replica_nodes = k8s.get_pg_nodes(cluster_label) + self.assertNotEqual(replica_nodes, []) + + # configure minimum boundaries for CPU and memory limits + minCPULimit = '503m' + minMemoryLimit = '502Mi' + + patch_min_resource_limits = { + "data": { + "min_cpu_limit": minCPULimit, + "min_memory_limit": minMemoryLimit + } + } + k8s.update_config(patch_min_resource_limits, "Minimum resource test") + + # lower resource limits below minimum + pg_patch_resources = { + "spec": { + "resources": { + "requests": { + "cpu": "10m", + "memory": "50Mi" + }, + "limits": { + "cpu": "200m", + "memory": "200Mi" + } + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # wait for switched over + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") + + def verify_pod_limits(): + pods = k8s.api.core_v1.list_namespaced_pod('default', label_selector="cluster-name=acid-minimal-cluster,application=spilo").items + if len(pods) < 2: + return False + + r = pods[0].spec.containers[0].resources.limits['memory'] == minMemoryLimit + r = r and pods[0].spec.containers[0].resources.limits['cpu'] == minCPULimit + r = r and pods[1].spec.containers[0].resources.limits['memory'] == minMemoryLimit + r = r and pods[1].spec.containers[0].resources.limits['cpu'] == minCPULimit + return r + + self.eventuallyTrue(verify_pod_limits, "Pod limits where not adjusted") + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_multi_namespace_support(self): + ''' + Create a customized Postgres cluster in a non-default namespace. + ''' + k8s = self.k8s + + with open("manifests/complete-postgres-manifest.yaml", 'r+') as f: + pg_manifest = yaml.safe_load(f) + pg_manifest["metadata"]["namespace"] = self.test_namespace + yaml.dump(pg_manifest, f, Dumper=yaml.Dumper) + + try: + k8s.create_with_kubectl("manifests/complete-postgres-manifest.yaml") + k8s.wait_for_pod_start("spilo-role=master", self.test_namespace) + k8s.wait_for_pod_start("spilo-role=replica", self.test_namespace) + self.assert_master_is_unique(self.test_namespace, "acid-test-cluster") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + finally: + # delete the new cluster so that the k8s_api.get_operator_state works correctly in subsequent tests + # ideally we should delete the 'test' namespace here but + # the pods inside the namespace stuck in the Terminating state making the test time out + k8s.api.custom_objects_api.delete_namespaced_custom_object( + "acid.zalan.do", "v1", self.test_namespace, "postgresqls", "acid-test-cluster") + time.sleep(5) + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_node_affinity(self): + ''' + Add label to a node and update postgres cluster spec to deploy only on a node with that label + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # get nodes of master and replica(s) + master_nodes, replica_nodes = k8s.get_cluster_nodes() + self.assertNotEqual(master_nodes, []) + self.assertNotEqual(replica_nodes, []) + + # label node with environment=postgres + node_label_body = { + "metadata": { + "labels": { + "node-affinity-test": "postgres" + } + } + } + + try: + # patch master node with the label + k8s.api.core_v1.patch_node(master_nodes[0], node_label_body) + + # add node affinity to cluster + patch_node_affinity_config = { + "spec": { + "nodeAffinity" : { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "node-affinity-test", + "operator": "In", + "values": [ + "postgres" + ] + } + ] + } + ] + } + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + group="acid.zalan.do", + version="v1", + namespace="default", + plural="postgresqls", + name="acid-minimal-cluster", + body=patch_node_affinity_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # node affinity change should cause replica to relocate from replica node to master node due to node affinity requirement + k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + # next master will be switched over and pod needs to be replaced as well to finish the rolling update + k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + for pod in podsList.items: + if pod.metadata.labels.get('spilo-role') == 'replica': + self.assertEqual(master_nodes[0], pod.spec.node_name, + "Sanity check: expected replica to relocate to master node {}, but found on {}".format(master_nodes[0], pod.spec.node_name)) + + # check that pod has correct node affinity + key = pod.spec.affinity.node_affinity.required_during_scheduling_ignored_during_execution.node_selector_terms[0].match_expressions[0].key + value = pod.spec.affinity.node_affinity.required_during_scheduling_ignored_during_execution.node_selector_terms[0].match_expressions[0].values[0] + self.assertEqual("node-affinity-test", key, + "Sanity check: expect node selector key to be equal to 'node-affinity-test' but got {}".format(key)) + self.assertEqual("postgres", value, + "Sanity check: expect node selector value to be equal to 'postgres' but got {}".format(value)) + + patch_node_remove_affinity_config = { + "spec": { + "nodeAffinity" : None + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + group="acid.zalan.do", + version="v1", + namespace="default", + plural="postgresqls", + name="acid-minimal-cluster", + body=patch_node_remove_affinity_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # node affinity change should cause another rolling update and relocation of replica + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=master,' + cluster_label) + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # toggle pod anti affinity to make sure replica and master run on separate nodes + self.assert_distributed_pods(master_nodes) + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_node_readiness_label(self): + ''' + Remove node readiness label from master node. This must cause a failover. + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + readiness_label = 'lifecycle-status' + readiness_value = 'ready' + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # get nodes of master and replica(s) (expected target of new master) + master_nodes, replica_nodes = k8s.get_cluster_nodes() + self.assertNotEqual(master_nodes, []) + self.assertNotEqual(replica_nodes, []) + + try: + # add node_readiness_label to potential failover nodes + patch_readiness_label = { + "metadata": { + "labels": { + readiness_label: readiness_value + } + } + } + for replica_node in replica_nodes: + k8s.api.core_v1.patch_node(replica_node, patch_readiness_label) + + # define node_readiness_label in config map which should trigger a rolling update + patch_readiness_label_config = { + "data": { + "node_readiness_label": readiness_label + ':' + readiness_value, + "node_readiness_label_merge": "AND", + } + } + k8s.update_config(patch_readiness_label_config, "setting readiness label") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # first replica will be replaced and get the new affinity + # however, it might not start due to a volume node affinity conflict + # in this case only if the pvc and pod are deleted it can be scheduled + replica = k8s.get_cluster_replica_pod() + if replica.status.phase == 'Pending': + k8s.api.core_v1.delete_namespaced_persistent_volume_claim('pgdata-' + replica.metadata.name, 'default') + k8s.api.core_v1.delete_namespaced_pod(replica.metadata.name, 'default') + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # next master will be switched over and pod needs to be replaced as well to finish the rolling update + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # patch also node where master ran before + k8s.api.core_v1.patch_node(master_nodes[0], patch_readiness_label) + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # toggle pod anti affinity to move replica away from master node + self.assert_distributed_pods(master_nodes) + + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_overwrite_pooler_deployment(self): + pooler_name = 'acid-minimal-cluster-pooler' + k8s = self.k8s + k8s.create_with_kubectl("manifests/minimal-fake-pooler-deployment.yaml") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 1, + "Initial broken deployment not rolled out") + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPooler': True + } + }) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 2, + "Operator did not succeed in overwriting labels") + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPooler': False + } + }) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler="+pooler_name), + 0, "Pooler pods not scaled down") + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_password_rotation(self): + ''' + Test password rotation and removal of users due to retention policy + ''' + k8s = self.k8s + leader = k8s.get_cluster_leader_pod() + today = date.today() + + # enable password rotation for owner of foo database + pg_patch_inplace_rotation_for_owner = { + "spec": { + "usersWithInPlaceSecretRotation": [ + "zalando" + ] + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_inplace_rotation_for_owner) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check if next rotation date was set in secret + secret_data = k8s.get_secret_data("zalando") + next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ") + today90days = today+timedelta(days=90) + self.assertEqual(today90days, next_rotation_timestamp.date(), + "Unexpected rotation date in secret of zalando user: expected {}, got {}".format(today90days, next_rotation_timestamp.date())) + + # create fake rotation users that should be removed by operator + # but have one that would still fit into the retention period + create_fake_rotation_user = """ + CREATE ROLE foo_user201031 IN ROLE foo_user; + CREATE ROLE foo_user211031 IN ROLE foo_user; + CREATE ROLE foo_user"""+(today-timedelta(days=40)).strftime("%y%m%d")+""" IN ROLE foo_user; + """ + self.query_database(leader.metadata.name, "postgres", create_fake_rotation_user) + + # patch foo_user secret with outdated rotation date + fake_rotation_date = today.isoformat() + 'T00:00:00Z' + fake_rotation_date_encoded = base64.b64encode(fake_rotation_date.encode('utf-8')) + secret_fake_rotation = { + "data": { + "nextRotation": str(fake_rotation_date_encoded, 'utf-8'), + }, + } + k8s.api.core_v1.patch_namespaced_secret( + name="foo-user.acid-minimal-cluster.credentials.postgresql.acid.zalan.do", + namespace="default", + body=secret_fake_rotation) + + # enable password rotation for all other users (foo_user) + # this will force a sync of secrets for further assertions + enable_password_rotation = { + "data": { + "enable_password_rotation": "true", + "password_rotation_interval": "30", + "password_rotation_user_retention": "30", # should be set to 60 + }, + } + k8s.update_config(enable_password_rotation) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + + # check if next rotation date and username have been replaced + secret_data = k8s.get_secret_data("foo_user") + secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8') + next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ") + rotation_user = "foo_user"+today.strftime("%y%m%d") + today30days = today+timedelta(days=30) + + self.assertEqual(rotation_user, secret_username, + "Unexpected username in secret of foo_user: expected {}, got {}".format(rotation_user, secret_username)) + self.assertEqual(today30days, next_rotation_timestamp.date(), + "Unexpected rotation date in secret of foo_user: expected {}, got {}".format(today30days, next_rotation_timestamp.date())) + + # check if oldest fake rotation users were deleted + # there should only be foo_user, foo_user+today and foo_user+today-40days + user_query = """ + SELECT rolname + FROM pg_catalog.pg_roles + WHERE rolname LIKE 'foo_user%'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 3, + "Found incorrect number of rotation users", 10, 5) + + # disable password rotation for all other users (foo_user) + # and pick smaller intervals to see if the third fake rotation user is dropped + enable_password_rotation = { + "data": { + "enable_password_rotation": "false", + "password_rotation_interval": "15", + "password_rotation_user_retention": "30", # 2 * rotation interval + }, + } + k8s.update_config(enable_password_rotation) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + + # check if username in foo_user secret is reset + secret_data = k8s.get_secret_data("foo_user") + secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8') + next_rotation_timestamp = str(base64.b64decode(secret_data["nextRotation"]), 'utf-8') + self.assertEqual("foo_user", secret_username, + "Unexpected username in secret of foo_user: expected {}, got {}".format("foo_user", secret_username)) + self.assertEqual('', next_rotation_timestamp, + "Unexpected rotation date in secret of foo_user: expected empty string, got {}".format(next_rotation_timestamp)) + + # check roles again, there should only be foo_user and foo_user+today + user_query = """ + SELECT rolname + FROM pg_catalog.pg_roles + WHERE rolname LIKE 'foo_user%'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, + "Found incorrect number of rotation users", 10, 5) + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_rolling_update_flag(self): + ''' + Add rolling update flag to only the master and see it failing over + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # get node and replica (expected target of new master) + _, replica_nodes = k8s.get_pg_nodes(cluster_label) + + # rolling update annotation + flag = { + "metadata": { + "annotations": { + "zalando-postgres-operator-rolling-update-required": "true", + } + } + } + + try: + podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + for pod in podsList.items: + # add flag only to the master to make it appear to the operator as a leftover from a rolling update + if pod.metadata.labels.get('spilo-role') == 'master': + old_creation_timestamp = pod.metadata.creation_timestamp + k8s.patch_pod(flag, pod.metadata.name, pod.metadata.namespace) + else: + # remember replica name to check if operator does a switchover + switchover_target = pod.metadata.name + + # do not wait until the next sync + k8s.delete_operator_pod() + + # operator should now recreate the master pod and do a switchover before + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # check if the former replica is now the new master + leader = k8s.get_cluster_leader_pod() + self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") + + # check that the old master has been recreated + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + replica = k8s.get_cluster_replica_pod() + self.assertTrue(replica.metadata.creation_timestamp > old_creation_timestamp, "Old master pod was not recreated") + + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_rolling_update_label_timeout(self): + ''' + Simulate case when replica does not receive label in time and rolling update does not finish + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + flag = "zalando-postgres-operator-rolling-update-required" + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + + # get node and replica (expected target of new master) + _, replica_nodes = k8s.get_pg_nodes(cluster_label) + + # rolling update annotation + rolling_update_patch = { + "metadata": { + "annotations": { + flag: "true", + } + } + } + + # make pod_label_wait_timeout so short that rolling update fails on first try + # temporarily lower resync interval to reduce waiting for further tests + # pods should get healthy in the meantime + patch_resync_config = { + "data": { + "pod_label_wait_timeout": "2s", + "resync_period": "30s", + "repair_period": "30s", + } + } + + try: + # patch both pods for rolling update + podList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label) + for pod in podList.items: + k8s.patch_pod(rolling_update_patch, pod.metadata.name, pod.metadata.namespace) + if pod.metadata.labels.get('spilo-role') == 'replica': + switchover_target = pod.metadata.name + + # update config and restart operator + k8s.update_config(patch_resync_config, "update resync interval and pod_label_wait_timeout") + + # operator should now recreate the replica pod first and do a switchover after + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync + # check if the cluster state is "SyncFailed" + self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail") + + # wait for next sync, replica should be running normally by now and be ready for switchover + k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # check if the former replica is now the new master + leader = k8s.get_cluster_leader_pod() + self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover") + + # wait for the old master to get restarted + k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) + + # status should again be "SyncFailed" but turn into "Running" on the next sync + time.sleep(30) + self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs") + + # revert config changes + patch_resync_config = { + "data": { + "pod_label_wait_timeout": "10m", + "resync_period": "4m", + "repair_period": "2m", + } + } + k8s.update_config(patch_resync_config, "revert resync interval and pod_label_wait_timeout") + + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_scaling(self): + ''' + Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime. + ''' + k8s = self.k8s + pod = "acid-minimal-cluster-0" + + k8s.scale_cluster(3) + self.eventuallyEqual(lambda: k8s.count_running_pods(), 3, "Scale up to 3 failed") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod)), 3, "Not all 3 nodes healthy") + + k8s.scale_cluster(2) + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "Scale down to 2 failed") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members(pod)), 2, "Not all members 2 healthy") + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_service_annotations(self): + ''' + Create a Postgres cluster with service annotations and check them. + ''' + k8s = self.k8s + patch_custom_service_annotations = { + "data": { + "custom_service_annotations": "foo:bar", + } + } + k8s.update_config(patch_custom_service_annotations) + + pg_patch_custom_annotations = { + "spec": { + "serviceAnnotations": { + "annotation.key": "value", + "alice": "bob", + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_custom_annotations) + + annotations = { + "annotation.key": "value", + "foo": "bar", + "alice": "bob" + } + + self.eventuallyTrue(lambda: k8s.check_service_annotations("cluster-name=acid-minimal-cluster,spilo-role=master", annotations), "Wrong annotations") + self.eventuallyTrue(lambda: k8s.check_service_annotations("cluster-name=acid-minimal-cluster,spilo-role=replica", annotations), "Wrong annotations") + + # clean up + unpatch_custom_service_annotations = { + "data": { + "custom_service_annotations": "", + } + } + k8s.update_config(unpatch_custom_service_annotations) + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_statefulset_annotation_propagation(self): + ''' + Inject annotation to Postgresql CRD and check it's propagation to stateful set + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + patch_sset_propagate_annotations = { + "data": { + "downscaler_annotations": "deployment-time,downscaler/*", + "inherited_annotations": "owned-by", + } + } + k8s.update_config(patch_sset_propagate_annotations) + + pg_crd_annotations = { + "metadata": { + "annotations": { + "deployment-time": "2020-04-30 12:00:00", + "downscaler/downtime_replicas": "0", + "owned-by": "acid", + }, + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_crd_annotations) + + annotations = { + "deployment-time": "2020-04-30 12:00:00", + "downscaler/downtime_replicas": "0", + "owned-by": "acid", + } + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing") + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_taint_based_eviction(self): + ''' + Add taint "postgres=:NoExecute" to node with master. This must cause a failover. + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # verify we are in good state from potential previous tests + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") + + # get nodes of master and replica(s) (expected target of new master) + master_nodes, replica_nodes = k8s.get_cluster_nodes() + self.assertNotEqual(master_nodes, []) + self.assertNotEqual(replica_nodes, []) + + # taint node with postgres=:NoExecute to force failover + body = { + "spec": { + "taints": [ + { + "effect": "NoExecute", + "key": "postgres" + } + ] + } + } + k8s.api.core_v1.patch_node(master_nodes[0], body) + + # add toleration to pods + patch_toleration_config = { + "data": { + "toleration": "key:postgres,operator:Exists,effect:NoExecute" + } + } + + try: + k8s.update_config(patch_toleration_config, step="allow tainted nodes") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + + self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running") + self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # toggle pod anti affinity to move replica away from master node + self.assert_distributed_pods(master_nodes) + + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_zz_cluster_deletion(self): + ''' + Test deletion with configured protection + ''' + k8s = self.k8s + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' + + # configure delete protection + patch_delete_annotations = { + "data": { + "delete_annotation_date_key": "delete-date", + "delete_annotation_name_key": "delete-clustername" + } + } + k8s.update_config(patch_delete_annotations) + time.sleep(25) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + try: + # this delete attempt should be omitted because of missing annotations + k8s.api.custom_objects_api.delete_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster") + time.sleep(15) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check that pods and services are still there + k8s.wait_for_running_pods(cluster_label, 2) + k8s.wait_for_service(cluster_label) + + # recreate Postgres cluster resource + k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") + + # wait a little before proceeding + time.sleep(10) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # add annotations to manifest + delete_date = datetime.today().strftime('%Y-%m-%d') + pg_patch_delete_annotations = { + "metadata": { + "annotations": { + "delete-date": delete_date, + "delete-clustername": "acid-minimal-cluster", + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_delete_annotations) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # wait a little before proceeding + time.sleep(20) + k8s.wait_for_running_pods(cluster_label, 2) + k8s.wait_for_service(cluster_label) + + # now delete process should be triggered + k8s.api.custom_objects_api.delete_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster") + + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, "Manifest not deleted") + + # check if everything has been deleted + self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted") + self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted") + self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted") + self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") + self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") + self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 0, "Secrets not deleted") + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + + # reset configmap + patch_delete_annotations = { + "data": { + "delete_annotation_date_key": "", + "delete_annotation_name_key": "" + } + } + k8s.update_config(patch_delete_annotations) + def assert_master_is_unique(self, namespace='default', clusterName="acid-minimal-cluster"): ''' Check that there is a single pod in the k8s cluster with the label "spilo-role=master"