diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 09b2c9c60..a425354a5 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -479,8 +479,6 @@ def test_enable_disable_connection_pooler(self): 'spec': { 'enableConnectionPooler': True, 'enableReplicaConnectionPooler': True, - 'enableMasterPoolerLoadBalancer': True, - 'enableReplicaPoolerLoadBalancer': True, } }) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") @@ -490,6 +488,17 @@ def test_enable_disable_connection_pooler(self): self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 2, "No pooler replica pods found") self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 2, "No pooler service found") self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Pooler secret not created") + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableMasterPoolerLoadBalancer': True, + 'enableReplicaPoolerLoadBalancer': True, + } + }) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_service_type(master_pooler_label+","+pooler_label), 'LoadBalancer', "Expected LoadBalancer service type for master pooler pod, found {}") diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 12c0be6ae..5639b0283 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -3,7 +3,6 @@ package cluster import ( "context" "fmt" - "reflect" "strings" "github.com/r3labs/diff" @@ -344,7 +343,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio } if *numberOfInstances < constants.ConnectionPoolerMinInstances { - msg := "Adjusted number of connection pooler instances from %d to %d" + msg := "adjusted number of connection pooler instances from %d to %d" c.logger.Warningf(msg, *numberOfInstances, constants.ConnectionPoolerMinInstances) *numberOfInstances = constants.ConnectionPoolerMinInstances @@ -614,7 +613,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1. *deployment.Spec.Replicas != *config.NumberOfInstances { sync = true - msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + msg := fmt.Sprintf("numberOfInstances is different (having %d, required %d)", *deployment.Spec.Replicas, *config.NumberOfInstances) reasons = append(reasons, msg) } @@ -623,7 +622,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1. poolerContainer.Image != config.Image { sync = true - msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + msg := fmt.Sprintf("dockerImage is different (having %s, required %s)", poolerContainer.Image, config.Image) reasons = append(reasons, msg) } @@ -637,7 +636,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1. // updates for new resource values). if err == nil && syncResources(&poolerContainer.Resources, expectedResources) { sync = true - msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + msg := fmt.Sprintf("resources are different (having %+v, required %+v)", poolerContainer.Resources, expectedResources) reasons = append(reasons, msg) } @@ -722,29 +721,6 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look var err error var connectionPoolerNeeded bool - needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler) - masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler) - if err != nil { - c.logger.Error("Error in getting diff of master connection pooler changes") - } - replicaChanges, err := diff.Diff(oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler) - if err != nil { - c.logger.Error("Error in getting diff of replica connection pooler changes") - } - - // skip pooler sync when theres no diff or it's deactivated - // but, handling the case when connectionPooler is not there but it is required - // as per spec, hence do not skip syncing in that case, even though there - // is no diff in specs - if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) && - ((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) || - (c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) && - ((c.ConnectionPooler[Master] != nil && c.ConnectionPooler[Master].LookupFunction) || - (c.ConnectionPooler[Replica] != nil && c.ConnectionPooler[Replica].LookupFunction)))) { - c.logger.Debugln("syncing pooler is not required") - return nil, nil - } - logPoolerEssentials(c.logger, oldSpec, newSpec) // Check and perform the sync requirements for each of the roles. @@ -781,7 +757,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look // in between // in this case also do not forget to install lookup function - if !c.ConnectionPooler[role].LookupFunction { + // skip installation in standby clusters, since they are read-only + if !c.ConnectionPooler[role].LookupFunction && c.Spec.StandbyCluster == nil { connectionPooler := c.Spec.ConnectionPooler specSchema := "" specUser := "" @@ -838,32 +815,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( SyncReason, error) { + var ( + deployment *appsv1.Deployment + newDeployment *appsv1.Deployment + service *v1.Service + newService *v1.Service + err error + ) + syncReason := make([]string, 0) - deployment, err := c.KubeClient. + deployment, err = c.KubeClient. Deployments(c.Namespace). Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) if err != nil && k8sutil.ResourceNotFound(err) { - msg := "deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) + c.logger.Warningf("deployment %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role)) - deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not generate deployment for connection pooler: %v", err) } - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + deployment, err = c.KubeClient. + Deployments(newDeployment.Namespace). + Create(context.TODO(), newDeployment, metav1.CreateOptions{}) if err != nil { return NoSync, err } c.ConnectionPooler[role].Deployment = deployment } else if err != nil { - msg := "could not get connection pooler deployment to sync: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not get connection pooler deployment to sync: %v", err) } else { c.ConnectionPooler[role].Deployment = deployment // actual synchronization @@ -900,16 +882,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql syncReason = append(syncReason, defaultsReason...) if specSync || defaultsSync { - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.logger.Infof("update connection pooler deployment %s, reason: %+v", c.connectionPoolerName(role), syncReason) - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return syncReason, fmt.Errorf(msg, err) + return syncReason, fmt.Errorf("could not generate deployment for connection pooler: %v", err) } - deployment, err := updateConnectionPoolerDeployment(c.KubeClient, - newDeploymentSpec) + deployment, err = updateConnectionPoolerDeployment(c.KubeClient, newDeployment) if err != nil { return syncReason, err @@ -927,40 +907,38 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.ConnectionPooler[role].Deployment = deployment } - if svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { - c.ConnectionPooler[role].Service = svc + if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { + c.ConnectionPooler[role].Service = service desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - if match, reason := k8sutil.SameService(svc, desiredSvc); !match { + if match, reason := k8sutil.SameService(service, desiredSvc); !match { syncReason = append(syncReason, reason) - c.logServiceChanges(role, svc, desiredSvc, false, reason) - updatedService, err := c.updateService(role, svc, desiredSvc) + c.logServiceChanges(role, service, desiredSvc, false, reason) + newService, err = c.updateService(role, service, desiredSvc) if err != nil { return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.ConnectionPooler[role].Service = updatedService + c.ConnectionPooler[role].Service = newService c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) } return NoSync, nil } if !k8sutil.ResourceNotFound(err) { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) + return NoSync, fmt.Errorf("could not get connection pooler service to sync: %v", err) } c.ConnectionPooler[role].Service = nil - msg := "Service %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) + c.logger.Warningf("service %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role)) serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - service, err := c.KubeClient. + newService, err = c.KubeClient. Services(serviceSpec.Namespace). Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return NoSync, err } - c.ConnectionPooler[role].Service = service + c.ConnectionPooler[role].Service = newService return NoSync, nil } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 6fa983c80..2b2ac9c08 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1100,7 +1100,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // backward compatible check for InitContainers if spec.InitContainersOld != nil { - msg := "Manifest parameter init_containers is deprecated." + msg := "manifest parameter init_containers is deprecated." if spec.InitContainers == nil { c.logger.Warningf("%s Consider using initContainers instead.", msg) spec.InitContainers = spec.InitContainersOld @@ -1111,7 +1111,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // backward compatible check for PodPriorityClassName if spec.PodPriorityClassNameOld != "" { - msg := "Manifest parameter pod_priority_class_name is deprecated." + msg := "manifest parameter pod_priority_class_name is deprecated." if spec.PodPriorityClassName == "" { c.logger.Warningf("%s Consider using podPriorityClassName instead.", msg) spec.PodPriorityClassName = spec.PodPriorityClassNameOld @@ -1504,13 +1504,13 @@ func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec, mountPaths := map[string]acidv1.AdditionalVolume{} for i, additionalVolume := range additionalVolumes { if previousVolume, exist := mountPaths[additionalVolume.MountPath]; exist { - msg := "Volume %+v cannot be mounted to the same path as %+v" + msg := "volume %+v cannot be mounted to the same path as %+v" c.logger.Warningf(msg, additionalVolume, previousVolume) continue } if additionalVolume.MountPath == constants.PostgresDataMount { - msg := "Cannot mount volume on postgresql data directory, %+v" + msg := "cannot mount volume on postgresql data directory, %+v" c.logger.Warningf(msg, additionalVolume) continue } @@ -1523,7 +1523,7 @@ func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec, for _, target := range additionalVolume.TargetContainers { if target == "all" && len(additionalVolume.TargetContainers) != 1 { - msg := `Target containers could be either "all" or a list + msg := `target containers could be either "all" or a list of containers, mixing those is not allowed, %+v` c.logger.Warningf(msg, additionalVolume) continue @@ -1813,11 +1813,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) }) } else { // cloning with S3, find out the bucket to clone - msg := "Clone from S3 bucket" + msg := "clone from S3 bucket" c.logger.Info(msg, description.S3WalPath) if description.S3WalPath == "" { - msg := "Figure out which S3 bucket to use from env" + msg := "figure out which S3 bucket to use from env" c.logger.Info(msg, description.S3WalPath) if c.OpConfig.WALES3Bucket != "" { @@ -1861,7 +1861,7 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) result = append(result, envs...) } else { - msg := "Use custom parsed S3WalPath %s from the manifest" + msg := "use custom parsed S3WalPath %s from the manifest" c.logger.Warningf(msg, description.S3WalPath) result = append(result, v1.EnvVar{ @@ -1910,7 +1910,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript if description.S3WalPath != "" { // standby with S3, find out the bucket to setup standby - msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s " + msg := "standby from S3 bucket using custom parsed S3WalPath from the manifest %s " c.logger.Infof(msg, description.S3WalPath) result = append(result, v1.EnvVar{ @@ -1918,7 +1918,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript Value: description.S3WalPath, }) } else if description.GSWalPath != "" { - msg := "Standby from GS bucket using custom parsed GSWalPath from the manifest %s " + msg := "standby from GS bucket using custom parsed GSWalPath from the manifest %s " c.logger.Infof(msg, description.GSWalPath) envs := []v1.EnvVar{ diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index e470fb59d..4bc85df0f 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -226,7 +226,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur constants.ConnectionPoolerUserName) if result.ConnectionPooler.User == result.SuperUsername { - msg := "Connection pool user is not allowed to be the same as super user, username: %s" + msg := "connection pool user is not allowed to be the same as super user, username: %s" panic(fmt.Errorf(msg, result.ConnectionPooler.User)) } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index fc4b73074..0ed6c3c63 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -290,7 +290,7 @@ func validate(cfg *Config) (err error) { } if cfg.ConnectionPooler.User == cfg.SuperUsername { - msg := "Connection pool user is not allowed to be the same as super user, username: %s" + msg := "connection pool user is not allowed to be the same as super user, username: %s" err = fmt.Errorf(msg, cfg.ConnectionPooler.User) }