Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,6 @@ def test_enable_disable_connection_pooler(self):
'spec': {
'enableConnectionPooler': True,
'enableReplicaConnectionPooler': True,
'enableMasterPoolerLoadBalancer': True,
'enableReplicaPoolerLoadBalancer': True,
}
})
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
Expand All @@ -490,6 +488,17 @@ def test_enable_disable_connection_pooler(self):
self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 2, "No pooler replica pods found")
self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 2, "No pooler service found")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Pooler secret not created")

k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default',
'postgresqls', 'acid-minimal-cluster',
{
'spec': {
'enableMasterPoolerLoadBalancer': True,
'enableReplicaPoolerLoadBalancer': True,
}
})
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.get_service_type(master_pooler_label+","+pooler_label),
'LoadBalancer',
"Expected LoadBalancer service type for master pooler pod, found {}")
Expand Down
94 changes: 36 additions & 58 deletions pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
import (
"context"
"fmt"
"reflect"
"strings"

"github.com/r3labs/diff"
Expand Down Expand Up @@ -344,7 +343,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio
}

if *numberOfInstances < constants.ConnectionPoolerMinInstances {
msg := "Adjusted number of connection pooler instances from %d to %d"
msg := "adjusted number of connection pooler instances from %d to %d"
c.logger.Warningf(msg, *numberOfInstances, constants.ConnectionPoolerMinInstances)

*numberOfInstances = constants.ConnectionPoolerMinInstances
Expand Down Expand Up @@ -614,7 +613,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1.
*deployment.Spec.Replicas != *config.NumberOfInstances {

sync = true
msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)",
msg := fmt.Sprintf("numberOfInstances is different (having %d, required %d)",
*deployment.Spec.Replicas, *config.NumberOfInstances)
reasons = append(reasons, msg)
}
Expand All @@ -623,7 +622,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1.
poolerContainer.Image != config.Image {

sync = true
msg := fmt.Sprintf("DockerImage is different (having %s, required %s)",
msg := fmt.Sprintf("dockerImage is different (having %s, required %s)",
poolerContainer.Image, config.Image)
reasons = append(reasons, msg)
}
Expand All @@ -637,7 +636,7 @@ func (c *Cluster) needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1.
// updates for new resource values).
if err == nil && syncResources(&poolerContainer.Resources, expectedResources) {
sync = true
msg := fmt.Sprintf("Resources are different (having %+v, required %+v)",
msg := fmt.Sprintf("resources are different (having %+v, required %+v)",
poolerContainer.Resources, expectedResources)
reasons = append(reasons, msg)
}
Expand Down Expand Up @@ -722,29 +721,6 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
var err error
var connectionPoolerNeeded bool

needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler)
masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler)
if err != nil {
c.logger.Error("Error in getting diff of master connection pooler changes")
}
replicaChanges, err := diff.Diff(oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler)
if err != nil {
c.logger.Error("Error in getting diff of replica connection pooler changes")
}

// skip pooler sync when theres no diff or it's deactivated
// but, handling the case when connectionPooler is not there but it is required
// as per spec, hence do not skip syncing in that case, even though there
// is no diff in specs
if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) &&
((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) ||
(c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) &&
((c.ConnectionPooler[Master] != nil && c.ConnectionPooler[Master].LookupFunction) ||
(c.ConnectionPooler[Replica] != nil && c.ConnectionPooler[Replica].LookupFunction)))) {
c.logger.Debugln("syncing pooler is not required")
return nil, nil
}

logPoolerEssentials(c.logger, oldSpec, newSpec)

// Check and perform the sync requirements for each of the roles.
Expand Down Expand Up @@ -781,7 +757,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
// in between

// in this case also do not forget to install lookup function
if !c.ConnectionPooler[role].LookupFunction {
// skip installation in standby clusters, since they are read-only
if !c.ConnectionPooler[role].LookupFunction && c.Spec.StandbyCluster == nil {
connectionPooler := c.Spec.ConnectionPooler
specSchema := ""
specUser := ""
Expand Down Expand Up @@ -838,32 +815,37 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) (
SyncReason, error) {

var (
deployment *appsv1.Deployment
newDeployment *appsv1.Deployment
service *v1.Service
newService *v1.Service
err error
)

syncReason := make([]string, 0)
deployment, err := c.KubeClient.
deployment, err = c.KubeClient.
Deployments(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{})

if err != nil && k8sutil.ResourceNotFound(err) {
msg := "deployment %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connectionPoolerName(role))
c.logger.Warningf("deployment %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role))

deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
if err != nil {
msg = "could not generate deployment for connection pooler: %v"
return NoSync, fmt.Errorf(msg, err)
return NoSync, fmt.Errorf("could not generate deployment for connection pooler: %v", err)
}

deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
deployment, err = c.KubeClient.
Deployments(newDeployment.Namespace).
Create(context.TODO(), newDeployment, metav1.CreateOptions{})

if err != nil {
return NoSync, err
}
c.ConnectionPooler[role].Deployment = deployment
} else if err != nil {
msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err)
return NoSync, fmt.Errorf("could not get connection pooler deployment to sync: %v", err)
} else {
c.ConnectionPooler[role].Deployment = deployment
// actual synchronization
Expand Down Expand Up @@ -900,16 +882,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
syncReason = append(syncReason, defaultsReason...)

if specSync || defaultsSync {
c.logger.Infof("Update connection pooler deployment %s, reason: %+v",
c.logger.Infof("update connection pooler deployment %s, reason: %+v",
c.connectionPoolerName(role), syncReason)
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
newDeployment, err = c.generateConnectionPoolerDeployment(c.ConnectionPooler[role])
if err != nil {
msg := "could not generate deployment for connection pooler: %v"
return syncReason, fmt.Errorf(msg, err)
return syncReason, fmt.Errorf("could not generate deployment for connection pooler: %v", err)
}

deployment, err := updateConnectionPoolerDeployment(c.KubeClient,
newDeploymentSpec)
deployment, err = updateConnectionPoolerDeployment(c.KubeClient, newDeployment)

if err != nil {
return syncReason, err
Expand All @@ -927,40 +907,38 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
c.ConnectionPooler[role].Deployment = deployment
}

if svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
c.ConnectionPooler[role].Service = svc
if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
c.ConnectionPooler[role].Service = service
desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
if match, reason := k8sutil.SameService(svc, desiredSvc); !match {
if match, reason := k8sutil.SameService(service, desiredSvc); !match {
syncReason = append(syncReason, reason)
c.logServiceChanges(role, svc, desiredSvc, false, reason)
updatedService, err := c.updateService(role, svc, desiredSvc)
c.logServiceChanges(role, service, desiredSvc, false, reason)
newService, err = c.updateService(role, service, desiredSvc)
if err != nil {
return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
}
c.ConnectionPooler[role].Service = updatedService
c.ConnectionPooler[role].Service = newService
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
}
return NoSync, nil
}

if !k8sutil.ResourceNotFound(err) {
msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
return NoSync, fmt.Errorf("could not get connection pooler service to sync: %v", err)
}

c.ConnectionPooler[role].Service = nil
msg := "Service %s for connection pooler synchronization is not found, create it"
c.logger.Warningf(msg, c.connectionPoolerName(role))
c.logger.Warningf("service %s for connection pooler synchronization is not found, create it", c.connectionPoolerName(role))

serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role])
service, err := c.KubeClient.
newService, err = c.KubeClient.
Services(serviceSpec.Namespace).
Create(context.TODO(), serviceSpec, metav1.CreateOptions{})

if err != nil {
return NoSync, err
}
c.ConnectionPooler[role].Service = service
c.ConnectionPooler[role].Service = newService

return NoSync, nil
}
20 changes: 10 additions & 10 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef

// backward compatible check for InitContainers
if spec.InitContainersOld != nil {
msg := "Manifest parameter init_containers is deprecated."
msg := "manifest parameter init_containers is deprecated."
if spec.InitContainers == nil {
c.logger.Warningf("%s Consider using initContainers instead.", msg)
spec.InitContainers = spec.InitContainersOld
Expand All @@ -1111,7 +1111,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef

// backward compatible check for PodPriorityClassName
if spec.PodPriorityClassNameOld != "" {
msg := "Manifest parameter pod_priority_class_name is deprecated."
msg := "manifest parameter pod_priority_class_name is deprecated."
if spec.PodPriorityClassName == "" {
c.logger.Warningf("%s Consider using podPriorityClassName instead.", msg)
spec.PodPriorityClassName = spec.PodPriorityClassNameOld
Expand Down Expand Up @@ -1504,13 +1504,13 @@ func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec,
mountPaths := map[string]acidv1.AdditionalVolume{}
for i, additionalVolume := range additionalVolumes {
if previousVolume, exist := mountPaths[additionalVolume.MountPath]; exist {
msg := "Volume %+v cannot be mounted to the same path as %+v"
msg := "volume %+v cannot be mounted to the same path as %+v"
c.logger.Warningf(msg, additionalVolume, previousVolume)
continue
}

if additionalVolume.MountPath == constants.PostgresDataMount {
msg := "Cannot mount volume on postgresql data directory, %+v"
msg := "cannot mount volume on postgresql data directory, %+v"
c.logger.Warningf(msg, additionalVolume)
continue
}
Expand All @@ -1523,7 +1523,7 @@ func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec,

for _, target := range additionalVolume.TargetContainers {
if target == "all" && len(additionalVolume.TargetContainers) != 1 {
msg := `Target containers could be either "all" or a list
msg := `target containers could be either "all" or a list
of containers, mixing those is not allowed, %+v`
c.logger.Warningf(msg, additionalVolume)
continue
Expand Down Expand Up @@ -1813,11 +1813,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
})
} else {
// cloning with S3, find out the bucket to clone
msg := "Clone from S3 bucket"
msg := "clone from S3 bucket"
c.logger.Info(msg, description.S3WalPath)

if description.S3WalPath == "" {
msg := "Figure out which S3 bucket to use from env"
msg := "figure out which S3 bucket to use from env"
c.logger.Info(msg, description.S3WalPath)

if c.OpConfig.WALES3Bucket != "" {
Expand Down Expand Up @@ -1861,7 +1861,7 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)

result = append(result, envs...)
} else {
msg := "Use custom parsed S3WalPath %s from the manifest"
msg := "use custom parsed S3WalPath %s from the manifest"
c.logger.Warningf(msg, description.S3WalPath)

result = append(result, v1.EnvVar{
Expand Down Expand Up @@ -1910,15 +1910,15 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript

if description.S3WalPath != "" {
// standby with S3, find out the bucket to setup standby
msg := "Standby from S3 bucket using custom parsed S3WalPath from the manifest %s "
msg := "standby from S3 bucket using custom parsed S3WalPath from the manifest %s "
c.logger.Infof(msg, description.S3WalPath)

result = append(result, v1.EnvVar{
Name: "STANDBY_WALE_S3_PREFIX",
Value: description.S3WalPath,
})
} else if description.GSWalPath != "" {
msg := "Standby from GS bucket using custom parsed GSWalPath from the manifest %s "
msg := "standby from GS bucket using custom parsed GSWalPath from the manifest %s "
c.logger.Infof(msg, description.GSWalPath)

envs := []v1.EnvVar{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/operator_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
constants.ConnectionPoolerUserName)

if result.ConnectionPooler.User == result.SuperUsername {
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
msg := "connection pool user is not allowed to be the same as super user, username: %s"
panic(fmt.Errorf(msg, result.ConnectionPooler.User))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func validate(cfg *Config) (err error) {
}

if cfg.ConnectionPooler.User == cfg.SuperUsername {
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
msg := "connection pool user is not allowed to be the same as super user, username: %s"
err = fmt.Errorf(msg, cfg.ConnectionPooler.User)
}

Expand Down