diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0722834f9..6c1505b3c 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "github.com/r3labs/diff" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -25,6 +24,7 @@ import ( "k8s.io/client-go/tools/reference" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" @@ -53,26 +53,11 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1.RoleBinding } -// K8S objects that are belongs to a connection pooler -type ConnectionPoolerObjects struct { - Deployment map[PostgresRole]*appsv1.Deployment - Service map[PostgresRole]*v1.Service - - // It could happen that a connection pooler was enabled, but the operator - // was not able to properly process a corresponding event or was restarted. - // In this case we will miss missing/require situation and a lookup function - // will not be installed. To avoid synchronizing it all the time to prevent - // this, we can remember the result in memory at least until the next - // restart. - LookupFunction bool -} - type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet - ConnectionPooler *ConnectionPoolerObjects PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -102,9 +87,8 @@ type Cluster struct { currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - + ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects } - type compareStatefulsetResult struct { match bool replace bool @@ -346,20 +330,7 @@ func (c *Cluster) Create() error { // // Do not consider connection pooler as a strict requirement, and if // something fails, report warning - roles := c.RolesConnectionPooler() - for _, r := range roles { - if c.ConnectionPooler != nil { - c.logger.Warning("Connection pooler already exists in the cluster") - return nil - } - connectionPooler, err := c.createConnectionPooler(c.installLookupFunction) - if err != nil { - c.logger.Warningf("could not create connection pooler: %v", err) - return nil - } - c.logger.Infof("connection pooler %q has been successfully created", - util.NameFromMeta(connectionPooler.Deployment[r].ObjectMeta)) - } + c.createConnectionPooler(c.installLookupFunction) return nil } @@ -649,7 +620,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // initUsers. Check if it needs to be called. sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) && reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) - needConnectionPooler := c.needMasterConnectionPoolerWorker(&newSpec.Spec) + needConnectionPooler := needMasterConnectionPoolerWorker(&newSpec.Spec) if !sameUsers || needConnectionPooler { c.logger.Debugf("syncing secrets") if err := c.initUsers(); err != nil { @@ -786,7 +757,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // need to process. In the future we may want to do this more careful and // check which databases we need to process, but even repeating the whole // installation process should be good enough. - c.ConnectionPooler.LookupFunction = false if _, err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil { @@ -797,6 +767,20 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { return nil } +func syncResources(a, b *v1.ResourceRequirements) bool { + for _, res := range []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } { + if !a.Limits[res].Equal(b.Limits[res]) || + !a.Requests[res].Equal(b.Requests[res]) { + return true + } + } + + return false +} + // Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). // The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes // DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint @@ -920,7 +904,7 @@ func (c *Cluster) initSystemUsers() { // Connection pooler user is an exception, if requested it's going to be // created by operator as a normal pgUser - if c.needConnectionPooler() { + if needConnectionPooler(&c.Spec) { // initialize empty connection pooler if not done yet if c.Spec.ConnectionPooler == nil { c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} @@ -1394,119 +1378,3 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } - -// Test if two connection pooler configuration needs to be synced. For simplicity -// compare not the actual K8S objects, but the configuration itself and request -// sync if there is any difference. -func (c *Cluster) needSyncConnectionPoolerSpecs(oldSpec, newSpec *acidv1.ConnectionPooler) (sync bool, reasons []string) { - reasons = []string{} - sync = false - - changelog, err := diff.Diff(oldSpec, newSpec) - if err != nil { - c.logger.Infof("Cannot get diff, do not do anything, %+v", err) - return false, reasons - } - - if len(changelog) > 0 { - sync = true - } - - for _, change := range changelog { - msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", - change.Type, change.Path, change.From, change.To) - reasons = append(reasons, msg) - } - - return sync, reasons -} - -func syncResources(a, b *v1.ResourceRequirements) bool { - for _, res := range []v1.ResourceName{ - v1.ResourceCPU, - v1.ResourceMemory, - } { - if !a.Limits[res].Equal(b.Limits[res]) || - !a.Requests[res].Equal(b.Requests[res]) { - return true - } - } - - return false -} - -// Check if we need to synchronize connection pooler deployment due to new -// defaults, that are different from what we see in the DeploymentSpec -func (c *Cluster) needSyncConnectionPoolerDefaults( - spec *acidv1.ConnectionPooler, - deployment *appsv1.Deployment) (sync bool, reasons []string) { - - reasons = []string{} - sync = false - - config := c.OpConfig.ConnectionPooler - podTemplate := deployment.Spec.Template - poolerContainer := podTemplate.Spec.Containers[constants.ConnectionPoolerContainer] - - if spec == nil { - spec = &acidv1.ConnectionPooler{} - } - - if spec.NumberOfInstances == nil && - *deployment.Spec.Replicas != *config.NumberOfInstances { - - sync = true - msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", - *deployment.Spec.Replicas, *config.NumberOfInstances) - reasons = append(reasons, msg) - } - - if spec.DockerImage == "" && - poolerContainer.Image != config.Image { - - sync = true - msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", - poolerContainer.Image, config.Image) - reasons = append(reasons, msg) - } - - expectedResources, err := generateResourceRequirements(spec.Resources, - c.makeDefaultConnectionPoolerResources()) - - // An error to generate expected resources means something is not quite - // right, but for the purpose of robustness do not panic here, just report - // and ignore resources comparison (in the worst case there will be no - // updates for new resource values). - if err == nil && syncResources(&poolerContainer.Resources, expectedResources) { - sync = true - msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", - poolerContainer.Resources, expectedResources) - reasons = append(reasons, msg) - } - - if err != nil { - c.logger.Warningf("Cannot generate expected resources, %v", err) - } - - for _, env := range poolerContainer.Env { - if spec.User == "" && env.Name == "PGUSER" { - ref := env.ValueFrom.SecretKeyRef.LocalObjectReference - - if ref.Name != c.credentialSecretName(config.User) { - sync = true - msg := fmt.Sprintf("pooler user is different (having %s, required %s)", - ref.Name, config.User) - reasons = append(reasons, msg) - } - } - - if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { - sync = true - msg := fmt.Sprintf("pooler schema is different (having %s, required %s)", - env.Value, config.Schema) - reasons = append(reasons, msg) - } - } - - return sync, reasons -} diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go new file mode 100644 index 000000000..8fedf48ed --- /dev/null +++ b/pkg/cluster/connection_pooler.go @@ -0,0 +1,871 @@ +package cluster + +import ( + "context" + "fmt" + "strings" + + "github.com/r3labs/diff" + acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/zalando/postgres-operator/pkg/util" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" +) + +// K8S objects that are belong to connection pooler +type ConnectionPoolerObjects struct { + Deployment *appsv1.Deployment + Service *v1.Service + Name string + ClusterName string + Namespace string + Role PostgresRole + // It could happen that a connection pooler was enabled, but the operator + // was not able to properly process a corresponding event or was restarted. + // In this case we will miss missing/require situation and a lookup function + // will not be installed. To avoid synchronizing it all the time to prevent + // this, we can remember the result in memory at least until the next + // restart. + LookupFunction bool + // Careful with referencing cluster.spec this object pointer changes + // during runtime and lifetime of cluster +} + +func (c *Cluster) connectionPoolerName(role PostgresRole) string { + name := c.Name + "-pooler" + if role == Replica { + name = name + "-repl" + } + return name +} + +// isConnectionPoolerEnabled +func needConnectionPooler(spec *acidv1.PostgresSpec) bool { + return needMasterConnectionPoolerWorker(spec) || + needReplicaConnectionPoolerWorker(spec) +} + +func needMasterConnectionPooler(spec *acidv1.PostgresSpec) bool { + return needMasterConnectionPoolerWorker(spec) +} + +func needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { + return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || + (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) +} + +func needReplicaConnectionPooler(spec *acidv1.PostgresSpec) bool { + return needReplicaConnectionPoolerWorker(spec) +} + +func needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { + return spec.EnableReplicaConnectionPooler != nil && + *spec.EnableReplicaConnectionPooler +} + +// Return connection pooler labels selector, which should from one point of view +// inherit most of the labels from the cluster itself, but at the same time +// have e.g. different `application` label, so that recreatePod operation will +// not interfere with it (it lists all the pods via labels, and if there would +// be no difference, it will recreate also pooler pods). +func (c *Cluster) connectionPoolerLabelsSelector(name string, role PostgresRole) *metav1.LabelSelector { + connectionPoolerLabels := labels.Set(map[string]string{}) + + extraLabels := labels.Set(map[string]string{ + "connection-pooler-name": name, + "application": "db-connection-pooler", + "role": string(role), + "cluster-name": c.Name, + "Namespace": c.Namespace, + }) + + connectionPoolerLabels = labels.Merge(connectionPoolerLabels, c.labelsSet(false)) + connectionPoolerLabels = labels.Merge(connectionPoolerLabels, extraLabels) + + return &metav1.LabelSelector{ + MatchLabels: connectionPoolerLabels, + MatchExpressions: nil, + } +} + +// Prepare the database for connection pooler to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pooler user exists. +// +// After that create all the objects for connection pooler, namely a deployment +// with a chosen pooler and a service to expose it. + +// have connectionpooler name in the cp object to have it immutable name +// add these cp related functions to a new cp file +// opConfig, cluster, and database name +func (c *Cluster) createConnectionPooler(lookup InstallFunction) (SyncReason, error) { + var reason SyncReason + c.setProcessName("creating connection pooler") + + //this is essentially sync with nil as oldSpec + if reason, err := c.syncConnectionPooler(nil, &c.Postgresql, lookup); err != nil { + return reason, err + } + return reason, nil +} + +// +// Generate pool size related environment variables. +// +// MAX_DB_CONN would specify the global maximum for connections to a target +// database. +// +// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. +// +// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when +// most of the queries coming through a connection pooler are from the same +// user to the same db). In case if we want to spin up more connection pooler +// instances, take this into account and maintain the same number of +// connections. +// +// MIN_SIZE is a pool's minimal size, to prevent situation when sudden workload +// have to wait for spinning up a new connections. +// +// RESERVE_SIZE is how many additional connections to allow for a pooler. +func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { + spec := &c.Spec + effectiveMode := util.Coalesce( + spec.ConnectionPooler.Mode, + c.OpConfig.ConnectionPooler.Mode) + + numberOfInstances := spec.ConnectionPooler.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPooler.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + effectiveMaxDBConn := util.CoalesceInt32( + spec.ConnectionPooler.MaxDBConnections, + c.OpConfig.ConnectionPooler.MaxDBConnections) + + if effectiveMaxDBConn == nil { + effectiveMaxDBConn = k8sutil.Int32ToPointer( + constants.ConnectionPoolerMaxDBConnections) + } + + maxDBConn := *effectiveMaxDBConn / *numberOfInstances + + defaultSize := maxDBConn / 2 + minSize := defaultSize / 2 + reserveSize := minSize + + return []v1.EnvVar{ + { + Name: "CONNECTION_POOLER_PORT", + Value: fmt.Sprint(pgPort), + }, + { + Name: "CONNECTION_POOLER_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOLER_DEFAULT_SIZE", + Value: fmt.Sprint(defaultSize), + }, + { + Name: "CONNECTION_POOLER_MIN_SIZE", + Value: fmt.Sprint(minSize), + }, + { + Name: "CONNECTION_POOLER_RESERVE_SIZE", + Value: fmt.Sprint(reserveSize), + }, + { + Name: "CONNECTION_POOLER_MAX_CLIENT_CONN", + Value: fmt.Sprint(constants.ConnectionPoolerMaxClientConnections), + }, + { + Name: "CONNECTION_POOLER_MAX_DB_CONN", + Value: fmt.Sprint(maxDBConn), + }, + } +} + +func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( + *v1.PodTemplateSpec, error) { + spec := &c.Spec + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPooler.Resources, + makeDefaultConnectionPoolerResources(&c.OpConfig)) + + effectiveDockerImage := util.Coalesce( + spec.ConnectionPooler.DockerImage, + c.OpConfig.ConnectionPooler.Image) + + effectiveSchema := util.Coalesce( + spec.ConnectionPooler.Schema, + c.OpConfig.ConnectionPooler.Schema) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + effectiveUser := util.Coalesce( + spec.ConnectionPooler.User, + c.OpConfig.ConnectionPooler.User) + + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(effectiveUser), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(role), + }, + { + Name: "PGPORT", + Value: c.servicePort(role), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pooler username + { + Name: "PGSCHEMA", + Value: effectiveSchema, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + } + envVars = append(envVars, c.getConnectionPoolerEnvVars()...) + + poolerContainer := v1.Container{ + Name: connectionPoolerContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + ReadinessProbe: &v1.Probe{ + Handler: v1.Handler{ + TCPSocket: &v1.TCPSocketAction{ + Port: intstr.IntOrString{IntVal: pgPort}, + }, + }, + }, + } + + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connectionPoolerLabelsSelector(c.connectionPoolerName(role), role).MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + + return podTemplate, nil +} + +func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *ConnectionPoolerObjects) ( + *appsv1.Deployment, error) { + spec := &c.Spec + + // there are two ways to enable connection pooler, either to specify a + // connectionPooler section or enableConnectionPooler. In the second case + // spec.connectionPooler will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPooler == nil { + spec.ConnectionPooler = &acidv1.ConnectionPooler{} + } + podTemplate, err := c.generateConnectionPoolerPodTemplate(connectionPooler.Role) + + numberOfInstances := spec.ConnectionPooler.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPooler.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + if *numberOfInstances < constants.ConnectionPoolerMinInstances { + msg := "Adjusted number of connection pooler instances from %d to %d" + c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances) + + *numberOfInstances = constants.ConnectionPoolerMinInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: connectionPooler.Name, + Namespace: connectionPooler.Namespace, + Labels: c.connectionPoolerLabelsSelector(connectionPooler.Name, connectionPooler.Role).MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connectionPoolerLabelsSelector(connectionPooler.Name, connectionPooler.Role), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service { + + spec := &c.Spec + // there are two ways to enable connection pooler, either to specify a + // connectionPooler section or enableConnectionPooler. In the second case + // spec.connectionPooler will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPooler == nil { + spec.ConnectionPooler = &acidv1.ConnectionPooler{} + } + + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: connectionPooler.Name, + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(connectionPooler.Role)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pooler": connectionPooler.Name, + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: connectionPooler.Name, + Namespace: connectionPooler.Namespace, + Labels: c.connectionPoolerLabelsSelector(connectionPooler.Name, connectionPooler.Role).MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: serviceSpec, + } + + return service +} + +//delete connection pooler +func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { + c.logger.Debugln("deleting connection pooler") + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.ConnectionPooler[role] == nil || role == "" { + c.logger.Infof("No connection pooler to delete") + return nil + } + + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + var deployment *appsv1.Deployment + deployment = c.ConnectionPooler[role].Deployment + + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} + + if deployment != nil { + + // set delete propagation policy to foreground, so that replica set will be + // also deleted. + + err = c.KubeClient. + Deployments(c.Namespace). + Delete(context.TODO(), deployment.Name, options) + + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pooler deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete deployment: %v", err) + } + + c.logger.Infof("Connection pooler deployment %q has been deleted for role %s", deployment.Name, role) + } + + // Repeat the same for the service object + var service *v1.Service + service = c.ConnectionPooler[role].Service + if service == nil { + c.logger.Infof("nil service to be deleted") + } + if service != nil { + + err = c.KubeClient. + Services(c.Namespace). + Delete(context.TODO(), service.Name, options) + + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pooler service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete service: %v", err) + } + + c.logger.Infof("Connection pooler service %q has been deleted for role %s", service.Name, role) + } + // Repeat the same for the secret object + secretName := c.credentialSecretName(c.OpConfig.ConnectionPooler.User) + + secret, err := c.KubeClient. + Secrets(c.Namespace). + Get(context.TODO(), secretName, metav1.GetOptions{}) + + if err != nil { + c.logger.Debugf("could not get connection pooler secret %q: %v", secretName, err) + } else { + if err = c.deleteSecret(secret.UID, *secret); err != nil { + return fmt.Errorf("could not delete pooler secret: %v", err) + } + } + + c.ConnectionPooler[role] = nil + return nil +} + +// Perform actual patching of a connection pooler deployment, assuming that all +// the check were already done before. +func updateConnectionPoolerDeployment(KubeClient k8sutil.KubernetesClient, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + if newDeployment == nil { + return nil, fmt.Errorf("there is no connection pooler in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it chances of conflicts are + // minimal. + deployment, err := KubeClient. + Deployments(newDeployment.Namespace).Patch( + context.TODO(), + newDeployment.Name, + types.MergePatchType, + patchData, + metav1.PatchOptions{}, + "") + if err != nil { + return nil, fmt.Errorf("could not patch deployment: %v", err) + } + + return deployment, nil +} + +//updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment +func updateConnectionPoolerAnnotations(KubeClient k8sutil.KubernetesClient, deployment *appsv1.Deployment, annotations map[string]string) (*appsv1.Deployment, error) { + patchData, err := metaAnnotationsPatch(annotations) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment metadata: %v", err) + } + result, err := KubeClient.Deployments(deployment.Namespace).Patch( + context.TODO(), + deployment.Name, + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "") + if err != nil { + return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err) + } + return result, nil + +} + +// Test if two connection pooler configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func needSyncConnectionPoolerSpecs(oldSpec, newSpec *acidv1.ConnectionPooler) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + //c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } + + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + + return sync, reasons +} + +// Check if we need to synchronize connection pooler deployment due to new +// defaults, that are different from what we see in the DeploymentSpec +func needSyncConnectionPoolerDefaults(Config *Config, spec *acidv1.ConnectionPooler, deployment *appsv1.Deployment) (sync bool, reasons []string) { + + reasons = []string{} + sync = false + + config := Config.OpConfig.ConnectionPooler + podTemplate := deployment.Spec.Template + poolerContainer := podTemplate.Spec.Containers[constants.ConnectionPoolerContainer] + + if spec == nil { + spec = &acidv1.ConnectionPooler{} + } + if spec.NumberOfInstances == nil && + *deployment.Spec.Replicas != *config.NumberOfInstances { + + sync = true + msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + *deployment.Spec.Replicas, *config.NumberOfInstances) + reasons = append(reasons, msg) + } + + if spec.DockerImage == "" && + poolerContainer.Image != config.Image { + + sync = true + msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + poolerContainer.Image, config.Image) + reasons = append(reasons, msg) + } + + expectedResources, err := generateResourceRequirements(spec.Resources, + makeDefaultConnectionPoolerResources(&Config.OpConfig)) + + // An error to generate expected resources means something is not quite + // right, but for the purpose of robustness do not panic here, just report + // and ignore resources comparison (in the worst case there will be no + // updates for new resource values). + if err == nil && syncResources(&poolerContainer.Resources, expectedResources) { + sync = true + msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + poolerContainer.Resources, expectedResources) + reasons = append(reasons, msg) + } + + if err != nil { + return false, reasons + } + + for _, env := range poolerContainer.Env { + if spec.User == "" && env.Name == "PGUSER" { + ref := env.ValueFrom.SecretKeyRef.LocalObjectReference + secretName := Config.OpConfig.SecretNameTemplate.Format( + "username", strings.Replace(config.User, "_", "-", -1), + "cluster", deployment.ClusterName, + "tprkind", acidv1.PostgresCRDResourceKind, + "tprgroup", acidzalando.GroupName) + + if ref.Name != secretName { + sync = true + msg := fmt.Sprintf("pooler user is different (having %s, required %s)", + ref.Name, config.User) + reasons = append(reasons, msg) + } + } + + if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { + sync = true + msg := fmt.Sprintf("pooler schema is different (having %s, required %s)", + env.Value, config.Schema) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} + +// Generate default resource section for connection pooler deployment, to be +// used if nothing custom is specified in the manifest +func makeDefaultConnectionPoolerResources(config *config.Config) acidv1.Resources { + + defaultRequests := acidv1.ResourceDescription{ + CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPURequest, + Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPULimit, + Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit, + } + + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } +} + +func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, installLookupFunction InstallFunction) (SyncReason, error) { + + var reason SyncReason + var err error + var newNeedConnectionPooler, oldNeedConnectionPooler bool + oldNeedConnectionPooler = false + + // Check and perform the sync requirements for each of the roles. + for _, role := range [2]PostgresRole{Master, Replica} { + + if role == Master { + newNeedConnectionPooler = needMasterConnectionPoolerWorker(&newSpec.Spec) + if oldSpec != nil { + oldNeedConnectionPooler = needMasterConnectionPoolerWorker(&oldSpec.Spec) + } + } else { + newNeedConnectionPooler = needReplicaConnectionPoolerWorker(&newSpec.Spec) + if oldSpec != nil { + oldNeedConnectionPooler = needReplicaConnectionPoolerWorker(&oldSpec.Spec) + } + } + + // if the call is via createConnectionPooler, then it is required to initialize + // the structure + if c.ConnectionPooler == nil { + c.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{} + } + if c.ConnectionPooler[role] == nil { + c.ConnectionPooler[role] = &ConnectionPoolerObjects{ + Deployment: nil, + Service: nil, + Name: c.connectionPoolerName(role), + ClusterName: c.ClusterName, + Namespace: c.Namespace, + LookupFunction: false, + Role: role, + } + } + if newNeedConnectionPooler { + // Try to sync in any case. If we didn't needed connection pooler before, + // it means we want to create it. If it was already present, still sync + // since it could happen that there is no difference in specs, and all + // the resources are remembered, but the deployment was manually deleted + // in between + + // in this case also do not forget to install lookup function as for + // creating cluster + if !oldNeedConnectionPooler || !c.ConnectionPooler[role].LookupFunction { + newConnectionPooler := newSpec.Spec.ConnectionPooler + + specSchema := "" + specUser := "" + + if newConnectionPooler != nil { + specSchema = newConnectionPooler.Schema + specUser = newConnectionPooler.User + } + + schema := util.Coalesce( + specSchema, + c.OpConfig.ConnectionPooler.Schema) + + user := util.Coalesce( + specUser, + c.OpConfig.ConnectionPooler.User) + + if err = installLookupFunction(schema, user, role); err != nil { + return NoSync, err + } + } + if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil { + c.logger.Errorf("could not sync connection pooler: %v", err) + return reason, err + } + } else { + // delete and cleanup resources + if c.ConnectionPooler[role] != nil && + (c.ConnectionPooler[role].Deployment != nil || + c.ConnectionPooler[role].Service != nil) { + + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) + } + } + } + } + + return reason, nil +} + +// Synchronize connection pooler resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( + SyncReason, error) { + + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + deploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return NoSync, fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + + if err != nil { + return NoSync, err + } + c.ConnectionPooler[role].Deployment = deployment + } else if err != nil { + msg := "could not get connection pooler deployment to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } else { + c.ConnectionPooler[role].Deployment = deployment + // actual synchronization + + var oldConnectionPooler *acidv1.ConnectionPooler + + if oldSpec != nil { + oldConnectionPooler = oldSpec.Spec.ConnectionPooler + } + + newConnectionPooler := newSpec.Spec.ConnectionPooler + // sync implementation below assumes that both old and new specs are + // not nil, but it can happen. To avoid any confusion like updating a + // deployment because the specification changed from nil to an empty + // struct (that was initialized somewhere before) replace any nil with + // an empty spec. + if oldConnectionPooler == nil { + oldConnectionPooler = &acidv1.ConnectionPooler{} + } + + if newConnectionPooler == nil { + newConnectionPooler = &acidv1.ConnectionPooler{} + } + + c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler) + + var specSync bool + var specReason []string + + if oldSpec != nil { + specSync, specReason = needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) + } + + defaultsSync, defaultsReason := needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) + reason := append(specReason, defaultsReason...) + + if specSync || defaultsSync { + c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.connectionPoolerName(role), reason) + newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) + if err != nil { + msg := "could not generate deployment for connection pooler: %v" + return reason, fmt.Errorf(msg, err) + } + + //oldDeploymentSpec := c.ConnectionPooler[role].Deployment + + deployment, err := updateConnectionPoolerDeployment(c.KubeClient, + newDeploymentSpec) + + if err != nil { + return reason, err + } + c.ConnectionPooler[role].Deployment = deployment + } + } + + newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler[role].Deployment.Annotations) + if newAnnotations != nil { + deployment, err = updateConnectionPoolerAnnotations(c.KubeClient, c.ConnectionPooler[role].Deployment, newAnnotations) + if err != nil { + return nil, err + } + c.ConnectionPooler[role].Deployment = deployment + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) + + if err != nil { + return NoSync, err + } + c.ConnectionPooler[role].Service = service + + } else if err != nil { + msg := "could not get connection pooler service to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPooler[role].Service = service + } + + return NoSync, nil +} diff --git a/pkg/cluster/connection_pooler_test.go b/pkg/cluster/connection_pooler_test.go new file mode 100644 index 000000000..4f3f27176 --- /dev/null +++ b/pkg/cluster/connection_pooler_test.go @@ -0,0 +1,959 @@ +package cluster + +import ( + "errors" + "fmt" + "strings" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func mockInstallLookupFunction(schema string, user string, role PostgresRole) error { + return nil +} + +func boolToPointer(value bool) *bool { + return &value +} + +func int32ToPointer(value int32) *int32 { + return &value +} + +func TestConnectionPoolerCreationAndDeletion(t *testing.T) { + testName := "Test connection pooler creation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + } + + reason, err := cluster.createConnectionPooler(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create connection pooler, %s, %+v", + testName, err, reason) + } + for _, role := range [2]PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role] != nil { + if cluster.ConnectionPooler[role].Deployment == nil { + t.Errorf("%s: Connection pooler deployment is empty for role %s", testName, role) + } + + if cluster.ConnectionPooler[role].Service == nil { + t.Errorf("%s: Connection pooler service is empty for role %s", testName, role) + } + + err = cluster.deleteConnectionPooler(role) + if err != nil { + t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) + } + } + } +} + +func TestNeedConnectionPooler(t *testing.T) { + testName := "Test how connection pooler can be enabled" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + } + + if !needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(false), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !needMasterConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Connection pooler is not enabled with flag and full", + testName) + } + + // Test for replica connection pooler + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + } + + if !needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(false), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + } + + if !needReplicaConnectionPooler(&cluster.Spec) { + t.Errorf("%s: Replica Connection pooler is not enabled with flag and full", + testName) + } +} + +func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { + for _, role := range [2]PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role] != nil && cluster.ConnectionPooler[role].Deployment != nil && + (cluster.ConnectionPooler[role].Deployment.Spec.Replicas == nil || + *cluster.ConnectionPooler[role].Deployment.Spec.Replicas != 2) { + return fmt.Errorf("Wrong number of instances") + } + } + return nil +} + +func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler == nil { + return fmt.Errorf("Connection pooler resources are empty") + } + + for _, role := range []PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role].Deployment == nil { + return fmt.Errorf("Deployment was not saved %s", role) + } + + if cluster.ConnectionPooler[role].Service == nil { + return fmt.Errorf("Service was not saved %s", role) + } + } + + return nil +} + +func MasterobjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler == nil { + return fmt.Errorf("Connection pooler resources are empty") + } + + if cluster.ConnectionPooler[Master].Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPooler[Master].Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func ReplicaobjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { + if cluster.ConnectionPooler == nil { + return fmt.Errorf("Connection pooler resources are empty") + } + + if cluster.ConnectionPooler[Replica].Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPooler[Replica].Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { + for _, role := range [2]PostgresRole{Master, Replica} { + if cluster.ConnectionPooler[role] != nil && + (cluster.ConnectionPooler[role].Deployment != nil || cluster.ConnectionPooler[role].Service != nil) { + return fmt.Errorf("Connection pooler was not deleted for role %v", role) + } + } + + return nil +} + +func OnlyMasterDeleted(cluster *Cluster, err error, reason SyncReason) error { + + if cluster.ConnectionPooler[Master] != nil && + (cluster.ConnectionPooler[Master].Deployment != nil || cluster.ConnectionPooler[Master].Service != nil) { + return fmt.Errorf("Connection pooler master was not deleted") + } + return nil +} + +func OnlyReplicaDeleted(cluster *Cluster, err error, reason SyncReason) error { + + if cluster.ConnectionPooler[Replica] != nil && + (cluster.ConnectionPooler[Replica].Deployment != nil || cluster.ConnectionPooler[Replica].Service != nil) { + return fmt.Errorf("Connection pooler replica was not deleted") + } + return nil +} + +func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { + for _, msg := range reason { + if strings.HasPrefix(msg, "update [] from '' to '") { + return fmt.Errorf("There is an empty reason, %s", msg) + } + } + + return nil +} + +func TestConnectionPoolerSynchronization(t *testing.T) { + testName := "Test connection pooler synchronization" + newCluster := func() *Cluster { + return New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + } + cluster := newCluster() + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + clusterMissingObjects := newCluster() + clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() + + clusterMock := newCluster() + clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() + + clusterDirtyMock := newCluster() + clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() + clusterDirtyMock.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{ + Master: { + Deployment: nil, + Service: nil, + LookupFunction: false, + }, + Replica: { + Deployment: nil, + Service: nil, + LookupFunction: false, + }, + } + + clusterDirtyMock.ConnectionPooler[Master].Deployment = &appsv1.Deployment{} + clusterDirtyMock.ConnectionPooler[Master].Service = &v1.Service{} + + clusterDirtyMock.ConnectionPooler[Replica].Deployment = &appsv1.Deployment{} + clusterDirtyMock.ConnectionPooler[Replica].Service = &v1.Service{} + + clusterNewDefaultsMock := newCluster() + clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + defaultImage string + defaultInstances int32 + check func(cluster *Cluster, err error, reason SyncReason) error + }{ + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: MasterobjectsAreSaved, + }, + { + subTest: "create if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + }, + }, + cluster: clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: MasterobjectsAreSaved, + }, + { + subTest: "create replica if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + cluster: clusterDirtyMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: ReplicaobjectsAreSaved, + }, + { + subTest: "create no replica with flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(false), + }, + }, + cluster: clusterDirtyMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, + }, + { + subTest: "create from scratch", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: MasterobjectsAreSaved, + }, + { + subTest: "create both master and replica", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + EnableConnectionPooler: boolToPointer(true), + }, + }, + cluster: clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreSaved, + }, + { + subTest: "delete if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, + }, + { + subTest: "delete only master if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableConnectionPooler: boolToPointer(true), + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: OnlyMasterDeleted, + }, + { + subTest: "delete only replica if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: clusterDirtyMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: OnlyReplicaDeleted, + }, + { + subTest: "cleanup if still there", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: clusterDirtyMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: deploymentUpdated, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: deploymentUpdated, + }, + { + subTest: "update image from changed defaults", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: clusterNewDefaultsMock, + defaultImage: "pooler:2.0", + defaultInstances: 2, + check: deploymentUpdated, + }, + { + subTest: "there is no sync from nil to an empty spec", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: nil, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: noEmptySync, + }, + } + for _, tt := range tests { + tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage + tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances = + int32ToPointer(tt.defaultInstances) + + reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec, + tt.newSpec, mockInstallLookupFunction) + + if err := tt.check(tt.cluster, err, reason); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} + +func TestConnectionPoolerPodSpec(t *testing.T) { + testName := "Test connection pooler pod template generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + MaxDBConnections: int32ToPointer(60), + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + } + var clusterNoDefaultRes = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{}, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + clusterNoDefaultRes.Spec = acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + } + + noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { return nil } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "no default resources", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), + cluster: clusterNoDefaultRes, + check: noCheck, + }, + { + subTest: "default resources are set", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: nil, + cluster: cluster, + check: testResources, + }, + { + subTest: "labels for service", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: testLabels, + }, + { + subTest: "required envs", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + expected: nil, + cluster: cluster, + check: testEnvs, + }, + } + for _, role := range [2]PostgresRole{Master, Replica} { + for _, tt := range tests { + podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(role) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, podSpec, role) + if err != nil { + t.Errorf("%s [%s]: Pod spec is incorrect, %+v", + testName, tt.subTest, err) + } + } + } +} + +func TestConnectionPoolerDeploymentSpec(t *testing.T) { + testName := "Test connection pooler deployment spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{ + Master: { + Deployment: nil, + Service: nil, + LookupFunction: false, + }, + } + + noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, deployment *appsv1.Deployment) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: testDeploymentOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + expected: nil, + cluster: cluster, + check: testSelector, + }, + } + for _, tt := range tests { + deployment, err := tt.cluster.generateConnectionPoolerDeployment(cluster.ConnectionPooler[Master]) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, deployment) + if err != nil { + t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { + cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] + if cpuReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest { + return fmt.Errorf("CPU request doesn't match, got %s, expected %s", + cpuReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest) + } + + memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] + if memReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest { + return fmt.Errorf("Memory request doesn't match, got %s, expected %s", + memReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest) + } + + cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] + if cpuLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit { + return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", + cpuLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit) + } + + memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] + if memLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit { + return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", + memLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit) + } + + return nil +} + +func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { + poolerLabels := podSpec.ObjectMeta.Labels["connection-pooler"] + + if poolerLabels != cluster.connectionPoolerLabelsSelector(cluster.connectionPoolerName(role), role).MatchLabels["connection-pooler"] { + return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", + podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabelsSelector(cluster.connectionPoolerName(role), role).MatchLabels) + } + + return nil +} + +func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { + labels := deployment.Spec.Selector.MatchLabels + expected := cluster.connectionPoolerLabelsSelector(cluster.connectionPoolerName(Master), Master).MatchLabels + + if labels["connection-pooler"] != expected["connection-pooler"] { + return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", + labels, expected) + } + + return nil +} + +func testServiceSelector(cluster *Cluster, service *v1.Service, role PostgresRole) error { + selector := service.Spec.Selector + + if selector["connection-pooler"] != cluster.connectionPoolerName(role) { + return fmt.Errorf("Selector is incorrect, got %s, expected %s", + selector["connection-pooler"], cluster.connectionPoolerName(role)) + } + + return nil +} + +func TestConnectionPoolerServiceSpec(t *testing.T) { + testName := "Test connection pooler service spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{ + Master: { + Deployment: nil, + Service: nil, + LookupFunction: false, + Name: cluster.connectionPoolerName(Master), + ClusterName: cluster.ClusterName, + Namespace: cluster.Namespace, + Role: Master, + }, + Replica: { + Deployment: nil, + Service: nil, + LookupFunction: false, + Name: cluster.connectionPoolerName(Replica), + ClusterName: cluster.ClusterName, + Namespace: cluster.Namespace, + Role: Replica, + }, + } + + noCheck := func(cluster *Cluster, deployment *v1.Service, role PostgresRole) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + cluster *Cluster + check func(cluster *Cluster, deployment *v1.Service, role PostgresRole) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + cluster: cluster, + check: testServiceOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableReplicaConnectionPooler: boolToPointer(true), + }, + cluster: cluster, + check: testServiceSelector, + }, + } + for _, role := range [2]PostgresRole{Master, Replica} { + for _, tt := range tests { + service := tt.cluster.generateConnectionPoolerService(tt.cluster.ConnectionPooler[role]) + + if err := tt.check(cluster, service, role); err != nil { + t.Errorf("%s [%s]: Service spec is incorrect, %+v", + testName, tt.subTest, err) + } + } + } +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index f85455544..ec76e9174 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -474,7 +474,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi // Creates a connection pool credentials lookup function in every database to // perform remote authentication. -func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { +func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error { var stmtBytes bytes.Buffer c.logger.Info("Installing lookup function") @@ -571,9 +571,6 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { c.logger.Infof("pooler lookup function installed into %s", dbname) } - if len(failedDatabases) == 0 { - c.ConnectionPooler.LookupFunction = true - } - + c.ConnectionPooler[role].LookupFunction = true return nil } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 1ef876bbe..1bed789e0 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -75,14 +75,6 @@ func (c *Cluster) statefulSetName() string { return c.Name } -func (c *Cluster) connectionPoolerName(role PostgresRole) string { - name := c.Name + "-pooler" - if role == Replica { - name = name + "-repl" - } - return name -} - func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -146,26 +138,6 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources { } } -// Generate default resource section for connection pooler deployment, to be -// used if nothing custom is specified in the manifest -func (c *Cluster) makeDefaultConnectionPoolerResources() acidv1.Resources { - config := c.OpConfig - - defaultRequests := acidv1.ResourceDescription{ - CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPURequest, - Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest, - } - defaultLimits := acidv1.ResourceDescription{ - CPU: config.ConnectionPooler.ConnectionPoolerDefaultCPULimit, - Memory: config.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit, - } - - return acidv1.Resources{ - ResourceRequests: defaultRequests, - ResourceLimits: defaultLimits, - } -} - func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) { var err error @@ -2068,186 +2040,6 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } -// Generate pool size related environment variables. -// -// MAX_DB_CONN would specify the global maximum for connections to a target -// database. -// -// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. -// -// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when -// most of the queries coming through a connection pooler are from the same -// user to the same db). In case if we want to spin up more connection pooler -// instances, take this into account and maintain the same number of -// connections. -// -// MIN_SIZE is a pool's minimal size, to prevent situation when sudden workload -// have to wait for spinning up a new connections. -// -// RESERVE_SIZE is how many additional connections to allow for a pooler. -func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { - effectiveMode := util.Coalesce( - spec.ConnectionPooler.Mode, - c.OpConfig.ConnectionPooler.Mode) - - numberOfInstances := spec.ConnectionPooler.NumberOfInstances - if numberOfInstances == nil { - numberOfInstances = util.CoalesceInt32( - c.OpConfig.ConnectionPooler.NumberOfInstances, - k8sutil.Int32ToPointer(1)) - } - - effectiveMaxDBConn := util.CoalesceInt32( - spec.ConnectionPooler.MaxDBConnections, - c.OpConfig.ConnectionPooler.MaxDBConnections) - - if effectiveMaxDBConn == nil { - effectiveMaxDBConn = k8sutil.Int32ToPointer( - constants.ConnectionPoolerMaxDBConnections) - } - - maxDBConn := *effectiveMaxDBConn / *numberOfInstances - - defaultSize := maxDBConn / 2 - minSize := defaultSize / 2 - reserveSize := minSize - - return []v1.EnvVar{ - { - Name: "CONNECTION_POOLER_PORT", - Value: fmt.Sprint(pgPort), - }, - { - Name: "CONNECTION_POOLER_MODE", - Value: effectiveMode, - }, - { - Name: "CONNECTION_POOLER_DEFAULT_SIZE", - Value: fmt.Sprint(defaultSize), - }, - { - Name: "CONNECTION_POOLER_MIN_SIZE", - Value: fmt.Sprint(minSize), - }, - { - Name: "CONNECTION_POOLER_RESERVE_SIZE", - Value: fmt.Sprint(reserveSize), - }, - { - Name: "CONNECTION_POOLER_MAX_CLIENT_CONN", - Value: fmt.Sprint(constants.ConnectionPoolerMaxClientConnections), - }, - { - Name: "CONNECTION_POOLER_MAX_DB_CONN", - Value: fmt.Sprint(maxDBConn), - }, - } -} - -func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec, role PostgresRole) ( - *v1.PodTemplateSpec, error) { - - gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) - resources, err := generateResourceRequirements( - spec.ConnectionPooler.Resources, - c.makeDefaultConnectionPoolerResources()) - - effectiveDockerImage := util.Coalesce( - spec.ConnectionPooler.DockerImage, - c.OpConfig.ConnectionPooler.Image) - - effectiveSchema := util.Coalesce( - spec.ConnectionPooler.Schema, - c.OpConfig.ConnectionPooler.Schema) - - if err != nil { - return nil, fmt.Errorf("could not generate resource requirements: %v", err) - } - - secretSelector := func(key string) *v1.SecretKeySelector { - effectiveUser := util.Coalesce( - spec.ConnectionPooler.User, - c.OpConfig.ConnectionPooler.User) - - return &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: c.credentialSecretName(effectiveUser), - }, - Key: key, - } - } - - envVars := []v1.EnvVar{ - { - Name: "PGHOST", - Value: c.serviceAddress(role), - }, - { - Name: "PGPORT", - Value: c.servicePort(role), - }, - { - Name: "PGUSER", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("username"), - }, - }, - // the convention is to use the same schema name as - // connection pooler username - { - Name: "PGSCHEMA", - Value: effectiveSchema, - }, - { - Name: "PGPASSWORD", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("password"), - }, - }, - } - - envVars = append(envVars, c.getConnectionPoolerEnvVars(spec)...) - - poolerContainer := v1.Container{ - Name: connectionPoolerContainer, - Image: effectiveDockerImage, - ImagePullPolicy: v1.PullIfNotPresent, - Resources: *resources, - Ports: []v1.ContainerPort{ - { - ContainerPort: pgPort, - Protocol: v1.ProtocolTCP, - }, - }, - Env: envVars, - ReadinessProbe: &v1.Probe{ - Handler: v1.Handler{ - TCPSocket: &v1.TCPSocketAction{ - Port: intstr.IntOrString{IntVal: pgPort}, - }, - }, - }, - } - - podTemplate := &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: c.connectionPoolerLabelsSelector(role).MatchLabels, - Namespace: c.Namespace, - Annotations: c.generatePodAnnotations(spec), - }, - Spec: v1.PodSpec{ - ServiceAccountName: c.OpConfig.PodServiceAccountName, - TerminationGracePeriodSeconds: &gracePeriod, - Containers: []v1.Container{poolerContainer}, - // TODO: add tolerations to scheduler pooler on the same node - // as database - //Tolerations: *tolerationsSpec, - }, - } - - return podTemplate, nil -} - // Return an array of ownerReferences to make an arbitraty object dependent on // the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD // while the former is represent the actual state, and only it's deletion means @@ -2273,108 +2065,6 @@ func (c *Cluster) ownerReferences() []metav1.OwnerReference { } } -func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec, role PostgresRole) ( - *appsv1.Deployment, error) { - - // there are two ways to enable connection pooler, either to specify a - // connectionPooler section or enableConnectionPooler. In the second case - // spec.connectionPooler will be nil, so to make it easier to calculate - // default values, initialize it to an empty structure. It could be done - // anywhere, but here is the earliest common entry point between sync and - // create code, so init here. - if spec.ConnectionPooler == nil { - spec.ConnectionPooler = &acidv1.ConnectionPooler{} - } - - podTemplate, err := c.generateConnectionPoolerPodTemplate(spec, role) - numberOfInstances := spec.ConnectionPooler.NumberOfInstances - if numberOfInstances == nil { - numberOfInstances = util.CoalesceInt32( - c.OpConfig.ConnectionPooler.NumberOfInstances, - k8sutil.Int32ToPointer(1)) - } - - if *numberOfInstances < constants.ConnectionPoolerMinInstances { - msg := "Adjusted number of connection pooler instances from %d to %d" - c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances) - - *numberOfInstances = constants.ConnectionPoolerMinInstances - } - - if err != nil { - return nil, err - } - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: c.connectionPoolerName(role), - Namespace: c.Namespace, - Labels: c.connectionPoolerLabelsSelector(role).MatchLabels, - Annotations: map[string]string{}, - // make StatefulSet object its owner to represent the dependency. - // By itself StatefulSet is being deleted with "Orphaned" - // propagation policy, which means that it's deletion will not - // clean up this deployment, but there is a hope that this object - // will be garbage collected if something went wrong and operator - // didn't deleted it. - OwnerReferences: c.ownerReferences(), - }, - Spec: appsv1.DeploymentSpec{ - Replicas: numberOfInstances, - Selector: c.connectionPoolerLabelsSelector(role), - Template: *podTemplate, - }, - } - - return deployment, nil -} - -func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec, role PostgresRole) *v1.Service { - - // there are two ways to enable connection pooler, either to specify a - // connectionPooler section or enableConnectionPooler. In the second case - // spec.connectionPooler will be nil, so to make it easier to calculate - // default values, initialize it to an empty structure. It could be done - // anywhere, but here is the earliest common entry point between sync and - // create code, so init here. - if spec.ConnectionPooler == nil { - spec.ConnectionPooler = &acidv1.ConnectionPooler{} - } - - serviceSpec := v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: c.connectionPoolerName(role), - Port: pgPort, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(role)}, - }, - }, - Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "connection-pooler": c.connectionPoolerName(role), - }, - } - - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: c.connectionPoolerName(role), - Namespace: c.Namespace, - Labels: c.connectionPoolerLabelsSelector(role).MatchLabels, - Annotations: map[string]string{}, - // make StatefulSet object its owner to represent the dependency. - // By itself StatefulSet is being deleted with "Orphaned" - // propagation policy, which means that it's deletion will not - // clean up this service, but there is a hope that this object will - // be garbage collected if something went wrong and operator didn't - // deleted it. - OwnerReferences: c.ownerReferences(), - }, - Spec: serviceSpec, - } - - return service -} - func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { return path.Join(defaultDir, defaultFile) diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index da3a56d24..fa4443e06 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2,7 +2,6 @@ package cluster import ( "context" - "errors" "fmt" "reflect" "sort" @@ -929,45 +928,6 @@ func TestPodEnvironmentSecretVariables(t *testing.T) { } -func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { - cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] - if cpuReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest { - return fmt.Errorf("CPU request doesn't match, got %s, expected %s", - cpuReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPURequest) - } - - memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] - if memReq.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest { - return fmt.Errorf("Memory request doesn't match, got %s, expected %s", - memReq.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryRequest) - } - - cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] - if cpuLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit { - return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", - cpuLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultCPULimit) - } - - memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] - if memLim.String() != cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit { - return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", - memLim.String(), cluster.OpConfig.ConnectionPooler.ConnectionPoolerDefaultMemoryLimit) - } - - return nil -} - -func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { - poolerLabels := podSpec.ObjectMeta.Labels["connection-pooler"] - - if poolerLabels != cluster.connectionPoolerLabelsSelector(role).MatchLabels["connection-pooler"] { - return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", - podSpec.ObjectMeta.Labels, cluster.connectionPoolerLabelsSelector(role).MatchLabels) - } - - return nil -} - func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { required := map[string]bool{ "PGHOST": false, @@ -1002,113 +962,6 @@ func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error return nil } -func TestConnectionPoolerPodSpec(t *testing.T) { - testName := "Test connection pooler pod template generation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - MaxDBConnections: int32ToPointer(60), - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - - var clusterNoDefaultRes = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{}, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - - noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error { return nil } - - tests := []struct { - subTest string - spec *acidv1.PostgresSpec - expected error - cluster *Cluster - check func(cluster *Cluster, podSpec *v1.PodTemplateSpec, role PostgresRole) error - }{ - { - subTest: "default configuration", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: noCheck, - }, - { - subTest: "no default resources", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), - cluster: clusterNoDefaultRes, - check: noCheck, - }, - { - subTest: "default resources are set", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testResources, - }, - { - subTest: "labels for service", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - expected: nil, - cluster: cluster, - check: testLabels, - }, - { - subTest: "required envs", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - expected: nil, - cluster: cluster, - check: testEnvs, - }, - } - for _, role := range [2]PostgresRole{Master, Replica} { - for _, tt := range tests { - podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec, role) - - if err != tt.expected && err.Error() != tt.expected.Error() { - t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", - testName, tt.subTest, err, tt.expected) - } - - err = tt.check(cluster, podSpec, role) - if err != nil { - t.Errorf("%s [%s]: Pod spec is incorrect, %+v", - testName, tt.subTest, err) - } - } - } - -} - func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error { owner := deployment.ObjectMeta.OwnerReferences[0] @@ -1120,101 +973,6 @@ func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployme return nil } -func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { - labels := deployment.Spec.Selector.MatchLabels - expected := cluster.connectionPoolerLabelsSelector(Master).MatchLabels - - if labels["connection-pooler"] != expected["connection-pooler"] { - return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", - labels, expected) - } - - return nil -} - -func TestConnectionPoolerDeploymentSpec(t *testing.T) { - testName := "Test connection pooler deployment spec generation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { - return nil - } - - tests := []struct { - subTest string - spec *acidv1.PostgresSpec - expected error - cluster *Cluster - check func(cluster *Cluster, deployment *appsv1.Deployment) error - }{ - { - subTest: "default configuration", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - expected: nil, - cluster: cluster, - check: noCheck, - }, - { - subTest: "owner reference", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - expected: nil, - cluster: cluster, - check: testDeploymentOwnwerReference, - }, - { - subTest: "selector", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - expected: nil, - cluster: cluster, - check: testSelector, - }, - } - for _, tt := range tests { - deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec, Master) - - if err != tt.expected && err.Error() != tt.expected.Error() { - t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", - testName, tt.subTest, err, tt.expected) - } - - err = tt.check(cluster, deployment) - if err != nil { - t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", - testName, tt.subTest, err) - } - } - -} - func testServiceOwnwerReference(cluster *Cluster, service *v1.Service, role PostgresRole) error { owner := service.ObjectMeta.OwnerReferences[0] @@ -1226,89 +984,6 @@ func testServiceOwnwerReference(cluster *Cluster, service *v1.Service, role Post return nil } -func testServiceSelector(cluster *Cluster, service *v1.Service, role PostgresRole) error { - selector := service.Spec.Selector - - if selector["connection-pooler"] != cluster.connectionPoolerName(role) { - return fmt.Errorf("Selector is incorrect, got %s, expected %s", - selector["connection-pooler"], cluster.connectionPoolerName(role)) - } - - return nil -} - -func TestConnectionPoolerServiceSpec(t *testing.T) { - testName := "Test connection pooler service spec generation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - noCheck := func(cluster *Cluster, deployment *v1.Service, role PostgresRole) error { - return nil - } - - tests := []struct { - subTest string - spec *acidv1.PostgresSpec - cluster *Cluster - check func(cluster *Cluster, deployment *v1.Service, role PostgresRole) error - }{ - { - subTest: "default configuration", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - cluster: cluster, - check: noCheck, - }, - { - subTest: "owner reference", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - cluster: cluster, - check: testServiceOwnwerReference, - }, - { - subTest: "selector", - spec: &acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - cluster: cluster, - check: testServiceSelector, - }, - } - for _, role := range [2]PostgresRole{Master, Replica} { - for _, tt := range tests { - service := tt.cluster.generateConnectionPoolerService(tt.spec, role) - - if err := tt.check(cluster, service, role); err != nil { - t.Errorf("%s [%s]: Service spec is incorrect, %+v", - testName, tt.subTest, err) - } - } - } -} - func TestTLS(t *testing.T) { var err error var spec acidv1.PostgresSpec diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 717a7f45f..fc06bad08 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -94,153 +94,6 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } -// Prepare the database for connection pooler to be used, i.e. install lookup -// function (do it first, because it should be fast and if it didn't succeed, -// it doesn't makes sense to create more K8S objects. At this moment we assume -// that necessary connection pooler user exists. -// -// After that create all the objects for connection pooler, namely a deployment -// with a chosen pooler and a service to expose it. -func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoolerObjects, error) { - var msg string - c.setProcessName("creating connection pooler") - - if c.ConnectionPooler == nil { - c.ConnectionPooler = &ConnectionPoolerObjects{} - c.ConnectionPooler.Deployment = make(map[PostgresRole]*appsv1.Deployment) - c.ConnectionPooler.Service = make(map[PostgresRole]*v1.Service) - } - - schema := c.Spec.ConnectionPooler.Schema - - if schema == "" { - schema = c.OpConfig.ConnectionPooler.Schema - } - - user := c.Spec.ConnectionPooler.User - if user == "" { - user = c.OpConfig.ConnectionPooler.User - } - - err := lookup(schema, user) - - if err != nil { - msg = "could not prepare database for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - if c.needConnectionPooler() { - roles := c.RolesConnectionPooler() - for _, r := range roles { - deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, r) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - // client-go does retry 10 times (with NoBackoff by default) when the API - // believe a request can be retried and returns Retry-After header. This - // should be good enough to not think about it here. - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - - serviceSpec := c.generateConnectionPoolerService(&c.Spec, r) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - c.ConnectionPooler.Deployment[r] = deployment - c.ConnectionPooler.Service[r] = service - - c.logger.Debugf("created new connection pooler %q, uid: %q", - util.NameFromMeta(deployment.ObjectMeta), deployment.UID) - } - } - - return c.ConnectionPooler, nil -} - -func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { - c.setProcessName("deleting connection pooler") - c.logger.Debugln("deleting connection pooler") - - // Lack of connection pooler objects is not a fatal error, just log it if - // it was present before in the manifest - if c.ConnectionPooler == nil { - c.logger.Infof("No connection pooler to delete") - return nil - } - - // Clean up the deployment object. If deployment resource we've remembered - // is somehow empty, try to delete based on what would we generate - var deployment *appsv1.Deployment - deployment = c.ConnectionPooler.Deployment[role] - - policy := metav1.DeletePropagationForeground - options := metav1.DeleteOptions{PropagationPolicy: &policy} - - if deployment != nil { - - // set delete propagation policy to foreground, so that replica set will be - // also deleted. - - err = c.KubeClient. - Deployments(c.Namespace). - Delete(context.TODO(), c.connectionPoolerName(role), options) - - if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("Connection pooler deployment was already deleted") - } else if err != nil { - return fmt.Errorf("could not delete deployment: %v", err) - } - - c.logger.Infof("Connection pooler deployment %q has been deleted", c.connectionPoolerName(role)) - } - - // Repeat the same for the service object - var service *v1.Service - service = c.ConnectionPooler.Service[role] - - if service != nil { - - err = c.KubeClient. - Services(c.Namespace). - Delete(context.TODO(), c.connectionPoolerName(role), options) - - if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("Connection pooler service was already deleted") - } else if err != nil { - return fmt.Errorf("could not delete service: %v", err) - } - - c.logger.Infof("Connection pooler service %q has been deleted", c.connectionPoolerName(role)) - } - // Repeat the same for the secret object - secretName := c.credentialSecretName(c.OpConfig.ConnectionPooler.User) - - secret, err := c.KubeClient. - Secrets(c.Namespace). - Get(context.TODO(), secretName, metav1.GetOptions{}) - - if err != nil { - c.logger.Debugf("could not get connection pooler secret %q: %v", secretName, err) - } else { - if err = c.deleteSecret(secret.UID, *secret); err != nil { - return fmt.Errorf("could not delete pooler secret: %v", err) - } - } - - c.ConnectionPooler = nil - return nil -} - func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { @@ -856,57 +709,3 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } - -// Perform actual patching of a connection pooler deployment, assuming that all -// the check were already done before. -func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment, role PostgresRole) (*appsv1.Deployment, error) { - c.setProcessName("updating connection pooler") - if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment[role] == nil { - return nil, fmt.Errorf("there is no connection pooler in the cluster") - } - - patchData, err := specPatch(newDeployment.Spec) - if err != nil { - return nil, fmt.Errorf("could not form patch for the deployment: %v", err) - } - - // An update probably requires RetryOnConflict, but since only one operator - // worker at one time will try to update it chances of conflicts are - // minimal. - deployment, err := c.KubeClient. - Deployments(c.ConnectionPooler.Deployment[role].Namespace).Patch( - context.TODO(), - c.ConnectionPooler.Deployment[role].Name, - types.MergePatchType, - patchData, - metav1.PatchOptions{}, - "") - if err != nil { - return nil, fmt.Errorf("could not patch deployment: %v", err) - } - - c.ConnectionPooler.Deployment[role] = deployment - - return deployment, nil -} - -//updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment -func (c *Cluster) updateConnectionPoolerAnnotations(annotations map[string]string, role PostgresRole) (*appsv1.Deployment, error) { - c.logger.Debugf("updating connection pooler annotations") - patchData, err := metaAnnotationsPatch(annotations) - if err != nil { - return nil, fmt.Errorf("could not form patch for the deployment metadata: %v", err) - } - result, err := c.KubeClient.Deployments(c.ConnectionPooler.Deployment[role].Namespace).Patch( - context.TODO(), - c.ConnectionPooler.Deployment[role].Name, - types.MergePatchType, - []byte(patchData), - metav1.PatchOptions{}, - "") - if err != nil { - return nil, fmt.Errorf("could not patch connection pooler annotations %q: %v", patchData, err) - } - return result, nil - -} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go deleted file mode 100644 index 810119c89..000000000 --- a/pkg/cluster/resources_test.go +++ /dev/null @@ -1,169 +0,0 @@ -package cluster - -import ( - "testing" - - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - "github.com/zalando/postgres-operator/pkg/util/config" - "github.com/zalando/postgres-operator/pkg/util/k8sutil" - - appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func mockInstallLookupFunction(schema string, user string) error { - return nil -} - -func boolToPointer(value bool) *bool { - return &value -} - -func TestConnectionPoolerCreationAndDeletion(t *testing.T) { - testName := "Test connection pooler creation" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) - - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - cluster.Spec = acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - } - poolerResources, err := cluster.createConnectionPooler(mockInstallLookupFunction) - - if err != nil { - t.Errorf("%s: Cannot create connection pooler, %s, %+v", - testName, err, poolerResources) - } - - for _, role := range cluster.RolesConnectionPooler() { - if poolerResources.Deployment[role] == nil { - t.Errorf("%s: Connection pooler deployment is empty for role %s", testName, role) - } - - if poolerResources.Service[role] == nil { - t.Errorf("%s: Connection pooler service is empty for role %s", testName, role) - } - - err = cluster.deleteConnectionPooler(role) - if err != nil { - t.Errorf("%s: Cannot delete connection pooler, %s", testName, err) - } - } -} - -func TestNeedConnectionPooler(t *testing.T) { - testName := "Test how connection pooler can be enabled" - var cluster = New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - }, - }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) - - cluster.Spec = acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if !cluster.needMasterConnectionPooler() { - t.Errorf("%s: Connection pooler is not enabled with full definition", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - } - - if !cluster.needMasterConnectionPooler() { - t.Errorf("%s: Connection pooler is not enabled with flag", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(false), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if cluster.needMasterConnectionPooler() { - t.Errorf("%s: Connection pooler is still enabled with flag being false", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if !cluster.needMasterConnectionPooler() { - t.Errorf("%s: Connection pooler is not enabled with flag and full", - testName) - } - - // Test for replica connection pooler - cluster.Spec = acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if cluster.needReplicaConnectionPooler() { - t.Errorf("%s: Replica Connection pooler is not enabled with full definition", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableReplicaConnectionPooler: boolToPointer(true), - } - - if !cluster.needReplicaConnectionPooler() { - t.Errorf("%s: Replica Connection pooler is not enabled with flag", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableReplicaConnectionPooler: boolToPointer(false), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if cluster.needReplicaConnectionPooler() { - t.Errorf("%s: Replica Connection pooler is still enabled with flag being false", - testName) - } - - cluster.Spec = acidv1.PostgresSpec{ - EnableReplicaConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, - } - - if !cluster.needReplicaConnectionPooler() { - t.Errorf("%s: Replica Connection pooler is not enabled with flag and full", - testName) - } -} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 030fe2932..8961fa80d 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -560,7 +560,7 @@ func (c *Cluster) syncRoles() (err error) { userNames = append(userNames, u.Name) } - if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() { + if needMasterConnectionPooler(&c.Spec) || needReplicaConnectionPooler(&c.Spec) { connectionPoolerUser := c.systemUsers[constants.ConnectionPoolerUserKeyName] userNames = append(userNames, connectionPoolerUser.Name) @@ -838,237 +838,3 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } - -func (c *Cluster) syncConnectionPooler(oldSpec, - newSpec *acidv1.Postgresql, - lookup InstallFunction) (SyncReason, error) { - - var reason SyncReason - var err error - var newNeedConnectionPooler, oldNeedConnectionPooler bool - - // Check and perform the sync requirements for each of the roles. - for _, role := range [2]PostgresRole{Master, Replica} { - if role == Master { - newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec) - oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec) - } else { - newNeedConnectionPooler = c.needReplicaConnectionPoolerWorker(&newSpec.Spec) - oldNeedConnectionPooler = c.needReplicaConnectionPoolerWorker(&oldSpec.Spec) - } - if c.ConnectionPooler == nil { - c.ConnectionPooler = &ConnectionPoolerObjects{} - c.ConnectionPooler.Deployment = make(map[PostgresRole]*appsv1.Deployment) - c.ConnectionPooler.Service = make(map[PostgresRole]*v1.Service) - } - - if newNeedConnectionPooler { - // Try to sync in any case. If we didn't needed connection pooler before, - // it means we want to create it. If it was already present, still sync - // since it could happen that there is no difference in specs, and all - // the resources are remembered, but the deployment was manually deleted - // in between - c.logger.Debug("syncing connection pooler") - - // in this case also do not forget to install lookup function as for - // creating cluster - if !oldNeedConnectionPooler || !c.ConnectionPooler.LookupFunction { - newConnectionPooler := newSpec.Spec.ConnectionPooler - - specSchema := "" - specUser := "" - - if newConnectionPooler != nil { - specSchema = newConnectionPooler.Schema - specUser = newConnectionPooler.User - } - - schema := util.Coalesce( - specSchema, - c.OpConfig.ConnectionPooler.Schema) - - user := util.Coalesce( - specUser, - c.OpConfig.ConnectionPooler.User) - - if err = lookup(schema, user); err != nil { - return NoSync, err - } - } - - if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil { - c.logger.Errorf("could not sync connection pooler: %v", err) - return reason, err - } - } else { - // Lookup function installation seems to be a fragile point, so - // let's log for debugging if we skip it - msg := "Skip lookup function installation, old: %d, already installed %d" - c.logger.Debug(msg, oldNeedConnectionPooler, c.ConnectionPooler.LookupFunction) - } - - if oldNeedConnectionPooler && !newNeedConnectionPooler { - // delete and cleanup resources - otherRole := role - if len(c.RolesConnectionPooler()) == 2 { - if role == Master { - otherRole = Replica - } else { - otherRole = Master - } - } - if c.ConnectionPooler != nil && - (c.ConnectionPooler.Deployment[role] != nil || - c.ConnectionPooler.Service[role] != nil) { - - if err = c.deleteConnectionPooler(role); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) - } - } - if c.ConnectionPooler != nil && c.ConnectionPooler.Deployment[otherRole] == nil && c.ConnectionPooler.Service[otherRole] == nil { - c.ConnectionPooler = nil - } - } - - if !oldNeedConnectionPooler && !newNeedConnectionPooler { - // delete and cleanup resources if not empty - otherRole := role - if len(c.RolesConnectionPooler()) == 2 { - if role == Master { - otherRole = Replica - } else { - otherRole = Master - } - } - if c.ConnectionPooler != nil && - (c.ConnectionPooler.Deployment[role] != nil || - c.ConnectionPooler.Service[role] != nil) { - - if err = c.deleteConnectionPooler(role); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) - } - } else if c.ConnectionPooler.Deployment[otherRole] == nil && c.ConnectionPooler.Service[otherRole] == nil { - c.ConnectionPooler = nil - } - } - } - - return reason, nil -} - -// Synchronize connection pooler resources. Effectively we're interested only in -// synchronizing the corresponding deployment, but in case of deployment or -// service is missing, create it. After checking, also remember an object for -// the future references. -func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( - SyncReason, error) { - - deployment, err := c.KubeClient. - Deployments(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) - - deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return NoSync, fmt.Errorf(msg, err) - } - - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return NoSync, err - } - c.ConnectionPooler.Deployment[role] = deployment - } else if err != nil { - msg := "could not get connection pooler deployment to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } else { - c.ConnectionPooler.Deployment[role] = deployment - - // actual synchronization - oldConnectionPooler := oldSpec.Spec.ConnectionPooler - newConnectionPooler := newSpec.Spec.ConnectionPooler - - // sync implementation below assumes that both old and new specs are - // not nil, but it can happen. To avoid any confusion like updating a - // deployment because the specification changed from nil to an empty - // struct (that was initialized somewhere before) replace any nil with - // an empty spec. - if oldConnectionPooler == nil { - oldConnectionPooler = &acidv1.ConnectionPooler{} - } - - if newConnectionPooler == nil { - newConnectionPooler = &acidv1.ConnectionPooler{} - } - - c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler) - - specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) - defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) - reason := append(specReason, defaultsReason...) - - if specSync || defaultsSync { - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", - c.connectionPoolerName(role), reason) - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) - if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return reason, fmt.Errorf(msg, err) - } - - oldDeploymentSpec := c.ConnectionPooler.Deployment[role] - - deployment, err := c.updateConnectionPoolerDeployment( - oldDeploymentSpec, - newDeploymentSpec, - role) - - if err != nil { - return reason, err - } - c.ConnectionPooler.Deployment[role] = deployment - - return reason, nil - } - } - - newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler.Deployment[role].Annotations) - if newAnnotations != nil { - c.updateConnectionPoolerAnnotations(newAnnotations, role) - } - - service, err := c.KubeClient. - Services(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Service %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) - - serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec, role) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return NoSync, err - } - c.ConnectionPooler.Service[role] = service - - } else if err != nil { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } else { - // Service updates are not supported and probably not that useful anyway - c.ConnectionPooler.Service[role] = service - } - - return NoSync, nil -} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go deleted file mode 100644 index 491329265..000000000 --- a/pkg/cluster/sync_test.go +++ /dev/null @@ -1,413 +0,0 @@ -package cluster - -import ( - "fmt" - "strings" - "testing" - - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - "github.com/zalando/postgres-operator/pkg/util/config" - "github.com/zalando/postgres-operator/pkg/util/k8sutil" - - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func int32ToPointer(value int32) *int32 { - return &value -} - -func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { - for _, role := range cluster.RolesConnectionPooler() { - if cluster.ConnectionPooler.Deployment[role] != nil && - (cluster.ConnectionPooler.Deployment[role].Spec.Replicas == nil || - *cluster.ConnectionPooler.Deployment[role].Spec.Replicas != 2) { - return fmt.Errorf("Wrong number of instances") - } - } - return nil -} - -func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler == nil { - return fmt.Errorf("Connection pooler resources are empty") - } - - for _, role := range []PostgresRole{Master, Replica} { - if cluster.ConnectionPooler.Deployment[role] == nil { - return fmt.Errorf("Deployment was not saved %s", role) - } - - if cluster.ConnectionPooler.Service[role] == nil { - return fmt.Errorf("Service was not saved %s", role) - } - } - - return nil -} - -func MasterobjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler == nil { - return fmt.Errorf("Connection pooler resources are empty") - } - - if cluster.ConnectionPooler.Deployment[Master] == nil { - return fmt.Errorf("Deployment was not saved") - } - - if cluster.ConnectionPooler.Service[Master] == nil { - return fmt.Errorf("Service was not saved") - } - - return nil -} - -func ReplicaobjectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler == nil { - return fmt.Errorf("Connection pooler resources are empty") - } - - if cluster.ConnectionPooler.Deployment[Replica] == nil { - return fmt.Errorf("Deployment was not saved") - } - - if cluster.ConnectionPooler.Service[Replica] == nil { - return fmt.Errorf("Service was not saved") - } - - return nil -} - -func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { - if cluster.ConnectionPooler != nil { - return fmt.Errorf("Connection pooler was not deleted") - } - - return nil -} - -func OnlyMasterDeleted(cluster *Cluster, err error, reason SyncReason) error { - - if cluster.ConnectionPooler != nil && - (cluster.ConnectionPooler.Deployment[Master] != nil || cluster.ConnectionPooler.Service[Master] != nil) { - return fmt.Errorf("Connection pooler master was not deleted") - } - return nil -} - -func OnlyReplicaDeleted(cluster *Cluster, err error, reason SyncReason) error { - - if cluster.ConnectionPooler != nil && - (cluster.ConnectionPooler.Deployment[Replica] != nil || cluster.ConnectionPooler.Service[Replica] != nil) { - return fmt.Errorf("Connection pooler replica was not deleted") - } - return nil -} - -func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { - for _, msg := range reason { - if strings.HasPrefix(msg, "update [] from '' to '") { - return fmt.Errorf("There is an empty reason, %s", msg) - } - } - - return nil -} - -func TestConnectionPoolerSynchronization(t *testing.T) { - testName := "Test connection pooler synchronization" - newCluster := func() *Cluster { - return New( - Config{ - OpConfig: config.Config{ - ProtectedRoles: []string{"admin"}, - Auth: config.Auth{ - SuperUsername: superUserName, - ReplicationUsername: replicationUserName, - }, - ConnectionPooler: config.ConnectionPooler{ - ConnectionPoolerDefaultCPURequest: "100m", - ConnectionPoolerDefaultCPULimit: "100m", - ConnectionPoolerDefaultMemoryRequest: "100Mi", - ConnectionPoolerDefaultMemoryLimit: "100Mi", - NumberOfInstances: int32ToPointer(1), - }, - }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) - } - cluster := newCluster() - - cluster.Statefulset = &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-sts", - }, - } - - clusterMissingObjects := newCluster() - clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() - - clusterMock := newCluster() - clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() - - clusterDirtyMock := newCluster() - clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() - clusterDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: make(map[PostgresRole]*appsv1.Deployment), - Service: make(map[PostgresRole]*v1.Service), - } - clusterDirtyMock.ConnectionPooler.Deployment[Master] = &appsv1.Deployment{} - clusterDirtyMock.ConnectionPooler.Service[Master] = &v1.Service{} - clusterReplicaDirtyMock := newCluster() - clusterReplicaDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() - clusterReplicaDirtyMock.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: make(map[PostgresRole]*appsv1.Deployment), - Service: make(map[PostgresRole]*v1.Service), - } - - clusterDirtyMock.ConnectionPooler.Deployment[Replica] = &appsv1.Deployment{} - clusterDirtyMock.ConnectionPooler.Service[Replica] = &v1.Service{} - - clusterNewDefaultsMock := newCluster() - clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() - - tests := []struct { - subTest string - oldSpec *acidv1.Postgresql - newSpec *acidv1.Postgresql - cluster *Cluster - defaultImage string - defaultInstances int32 - check func(cluster *Cluster, err error, reason SyncReason) error - }{ - { - subTest: "create if doesn't exist", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: MasterobjectsAreSaved, - }, - { - subTest: "create if doesn't exist with a flag", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: MasterobjectsAreSaved, - }, - { - subTest: "create replica if doesn't exist with a flag", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - }, - cluster: clusterDirtyMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: ReplicaobjectsAreSaved, - }, - { - subTest: "create no replica with flag", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableReplicaConnectionPooler: boolToPointer(false), - }, - }, - cluster: clusterDirtyMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreDeleted, - }, - { - subTest: "create from scratch", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: MasterobjectsAreSaved, - }, - { - subTest: "create both master and replica", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - EnableConnectionPooler: boolToPointer(true), - }, - }, - cluster: clusterMissingObjects, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreSaved, - }, - { - subTest: "delete if not needed", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreDeleted, - }, - { - subTest: "delete only master if not needed", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableConnectionPooler: boolToPointer(true), - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableReplicaConnectionPooler: boolToPointer(true), - }, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: OnlyMasterDeleted, - }, - { - subTest: "delete only replica if not needed", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - EnableReplicaConnectionPooler: boolToPointer(true), - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: OnlyReplicaDeleted, - }, - { - subTest: "cleanup if still there", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{}, - }, - cluster: clusterDirtyMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: objectsAreDeleted, - }, - { - subTest: "update deployment", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{ - NumberOfInstances: int32ToPointer(1), - }, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{ - NumberOfInstances: int32ToPointer(2), - }, - }, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: deploymentUpdated, - }, - { - subTest: "update image from changed defaults", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterNewDefaultsMock, - defaultImage: "pooler:2.0", - defaultInstances: 2, - check: deploymentUpdated, - }, - { - subTest: "there is no sync from nil to an empty spec", - oldSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: nil, - }, - }, - newSpec: &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - }, - cluster: clusterMock, - defaultImage: "pooler:1.0", - defaultInstances: 1, - check: noEmptySync, - }, - } - for _, tt := range tests { - tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage - tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances = - int32ToPointer(tt.defaultInstances) - - reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec, - tt.newSpec, mockInstallLookupFunction) - - if err := tt.check(tt.cluster, err, reason); err != nil { - t.Errorf("%s [%s]: Could not synchronize, %+v", - testName, tt.subTest, err) - } - } -} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 199914ccc..8aa519817 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -72,7 +72,7 @@ type ClusterStatus struct { type TemplateParams map[string]interface{} -type InstallFunction func(schema string, user string) error +type InstallFunction func(schema string, user string, role PostgresRole) error type SyncReason []string diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index f00086e50..d5b9bfb67 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -415,30 +415,6 @@ func (c *Cluster) labelsSelector() *metav1.LabelSelector { } } -// Return connection pooler labels selector, which should from one point of view -// inherit most of the labels from the cluster itself, but at the same time -// have e.g. different `application` label, so that recreatePod operation will -// not interfere with it (it lists all the pods via labels, and if there would -// be no difference, it will recreate also pooler pods). -func (c *Cluster) connectionPoolerLabelsSelector(role PostgresRole) *metav1.LabelSelector { - connectionPoolerLabels := labels.Set(map[string]string{}) - - extraLabels := labels.Set(map[string]string{ - "connection-pooler-name": c.connectionPoolerName(role), - "application": "db-connection-pooler", - "role": string(role), - "cluster-name": c.ClusterName, - }) - - connectionPoolerLabels = labels.Merge(connectionPoolerLabels, c.labelsSet(false)) - connectionPoolerLabels = labels.Merge(connectionPoolerLabels, extraLabels) - - return &metav1.LabelSelector{ - MatchLabels: connectionPoolerLabels, - MatchExpressions: nil, - } -} - func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { lbls := c.labelsSet(shouldAddExtraLabels) lbls[c.OpConfig.PodRoleLabel] = string(role) @@ -521,40 +497,6 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool { return c.OpConfig.KubernetesUseConfigMaps } -// isConnectionPoolerEnabled -func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) -} - -func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - return spec.EnableReplicaConnectionPooler != nil && *spec.EnableReplicaConnectionPooler -} - -func (c *Cluster) needMasterConnectionPooler() bool { - return c.needMasterConnectionPoolerWorker(&c.Spec) -} - -func (c *Cluster) needConnectionPooler() bool { - return c.needMasterConnectionPoolerWorker(&c.Spec) || c.needReplicaConnectionPoolerWorker(&c.Spec) -} - -// RolesConnectionPooler gives the list of roles which need connection pooler -func (c *Cluster) RolesConnectionPooler() []PostgresRole { - roles := make([]PostgresRole, 2) - - if c.needMasterConnectionPoolerWorker(&c.Spec) { - roles = append(roles, Master) - } - if c.needMasterConnectionPoolerWorker(&c.Spec) { - roles = append(roles, Replica) - } - return roles -} - -func (c *Cluster) needReplicaConnectionPooler() bool { - return c.needReplicaConnectionPoolerWorker(&c.Spec) -} - // Earlier arguments take priority func mergeContainers(containers ...[]v1.Container) ([]v1.Container, []string) { containerNameTaken := map[string]bool{}