diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index cd3a751d1..8e1dcb22e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1006,9 +1006,9 @@ func (c *Cluster) initSystemUsers() { // Connection pooler user is an exception, if requested it's going to be // created by operator as a normal pgUser if needConnectionPooler(&c.Spec) { - // initialize empty connection pooler if not done yet - if c.Spec.ConnectionPooler == nil { - c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} + connectionPoolerSpec := c.Spec.ConnectionPooler + if connectionPoolerSpec == nil { + connectionPoolerSpec = &acidv1.ConnectionPooler{} } // Using superuser as pooler user is not a good idea. First of all it's @@ -1016,13 +1016,13 @@ func (c *Cluster) initSystemUsers() { // and second it's a bad practice. username := c.OpConfig.ConnectionPooler.User - isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername + isSuperUser := connectionPoolerSpec.User == c.OpConfig.SuperUsername isProtectedUser := c.shouldAvoidProtectedOrSystemRole( - c.Spec.ConnectionPooler.User, "connection pool role") + connectionPoolerSpec.User, "connection pool role") if !isSuperUser && !isProtectedUser { username = util.Coalesce( - c.Spec.ConnectionPooler.User, + connectionPoolerSpec.User, c.OpConfig.ConnectionPooler.User) } diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index 40bdd0e61..5bde71458 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "reflect" "strings" "github.com/r3labs/diff" @@ -60,7 +61,7 @@ func needMasterConnectionPooler(spec *acidv1.PostgresSpec) bool { } func needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || + return (spec.EnableConnectionPooler != nil && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) } @@ -114,7 +115,7 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe c.setProcessName("creating connection pooler") //this is essentially sync with nil as oldSpec - if reason, err := c.syncConnectionPooler(nil, &c.Postgresql, LookupFunction); err != nil { + if reason, err := c.syncConnectionPooler(&acidv1.Postgresql{}, &c.Postgresql, LookupFunction); err != nil { return reason, err } return reason, nil @@ -140,11 +141,15 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe // RESERVE_SIZE is how many additional connections to allow for a pooler. func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { spec := &c.Spec + connectionPoolerSpec := spec.ConnectionPooler + if connectionPoolerSpec == nil { + connectionPoolerSpec = &acidv1.ConnectionPooler{} + } effectiveMode := util.Coalesce( - spec.ConnectionPooler.Mode, + connectionPoolerSpec.Mode, c.OpConfig.ConnectionPooler.Mode) - numberOfInstances := spec.ConnectionPooler.NumberOfInstances + numberOfInstances := connectionPoolerSpec.NumberOfInstances if numberOfInstances == nil { numberOfInstances = util.CoalesceInt32( c.OpConfig.ConnectionPooler.NumberOfInstances, @@ -152,7 +157,7 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { } effectiveMaxDBConn := util.CoalesceInt32( - spec.ConnectionPooler.MaxDBConnections, + connectionPoolerSpec.MaxDBConnections, c.OpConfig.ConnectionPooler.MaxDBConnections) if effectiveMaxDBConn == nil { @@ -201,17 +206,21 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar { func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( *v1.PodTemplateSpec, error) { spec := &c.Spec + connectionPoolerSpec := spec.ConnectionPooler + if connectionPoolerSpec == nil { + connectionPoolerSpec = &acidv1.ConnectionPooler{} + } gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) resources, err := generateResourceRequirements( - spec.ConnectionPooler.Resources, + connectionPoolerSpec.Resources, makeDefaultConnectionPoolerResources(&c.OpConfig)) effectiveDockerImage := util.Coalesce( - spec.ConnectionPooler.DockerImage, + connectionPoolerSpec.DockerImage, c.OpConfig.ConnectionPooler.Image) effectiveSchema := util.Coalesce( - spec.ConnectionPooler.Schema, + connectionPoolerSpec.Schema, c.OpConfig.ConnectionPooler.Schema) if err != nil { @@ -220,7 +229,7 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( secretSelector := func(key string) *v1.SecretKeySelector { effectiveUser := util.Coalesce( - spec.ConnectionPooler.User, + connectionPoolerSpec.User, c.OpConfig.ConnectionPooler.User) return &v1.SecretKeySelector{ @@ -321,12 +330,13 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio // default values, initialize it to an empty structure. It could be done // anywhere, but here is the earliest common entry point between sync and // create code, so init here. - if spec.ConnectionPooler == nil { - spec.ConnectionPooler = &acidv1.ConnectionPooler{} + connectionPoolerSpec := spec.ConnectionPooler + if connectionPoolerSpec == nil { + connectionPoolerSpec = &acidv1.ConnectionPooler{} } podTemplate, err := c.generateConnectionPoolerPodTemplate(connectionPooler.Role) - numberOfInstances := spec.ConnectionPooler.NumberOfInstances + numberOfInstances := connectionPoolerSpec.NumberOfInstances if numberOfInstances == nil { numberOfInstances = util.CoalesceInt32( c.OpConfig.ConnectionPooler.NumberOfInstances, @@ -371,16 +381,6 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service { spec := &c.Spec - // there are two ways to enable connection pooler, either to specify a - // connectionPooler section or enableConnectionPooler. In the second case - // spec.connectionPooler will be nil, so to make it easier to calculate - // default values, initialize it to an empty structure. It could be done - // anywhere, but here is the earliest common entry point between sync and - // create code, so init here. - if spec.ConnectionPooler == nil { - spec.ConnectionPooler = &acidv1.ConnectionPooler{} - } - serviceSpec := v1.ServiceSpec{ Ports: []v1.ServicePort{ { @@ -668,12 +668,14 @@ func makeDefaultConnectionPoolerResources(config *config.Config) acidv1.Resource func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql) { var v []string - var input []*bool + + newMasterConnectionPoolerEnabled := needMasterConnectionPoolerWorker(&newSpec.Spec) if oldSpec == nil { - input = []*bool{nil, nil, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler} + input = []*bool{nil, nil, &newMasterConnectionPoolerEnabled, newSpec.Spec.EnableReplicaConnectionPooler} } else { - input = []*bool{oldSpec.Spec.EnableConnectionPooler, oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler} + oldMasterConnectionPoolerEnabled := needMasterConnectionPoolerWorker(&oldSpec.Spec) + input = []*bool{&oldMasterConnectionPoolerEnabled, oldSpec.Spec.EnableReplicaConnectionPooler, &newMasterConnectionPoolerEnabled, newSpec.Spec.EnableReplicaConnectionPooler} } for _, b := range input { @@ -684,25 +686,16 @@ func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql) } } - log.Debugf("syncing connection pooler from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3]) + log.Debugf("syncing connection pooler (master, replica) from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3]) } func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, LookupFunction InstallFunction) (SyncReason, error) { var reason SyncReason var err error - var newNeedConnectionPooler, oldNeedConnectionPooler bool - oldNeedConnectionPooler = false - - if oldSpec == nil { - oldSpec = &acidv1.Postgresql{ - Spec: acidv1.PostgresSpec{ - ConnectionPooler: &acidv1.ConnectionPooler{}, - }, - } - } + var connectionPoolerNeeded bool - needSync, _ := needSyncConnectionPoolerSpecs(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler, c.logger) + needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler) masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler) if err != nil { c.logger.Error("Error in getting diff of master connection pooler changes") @@ -712,15 +705,14 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look c.logger.Error("Error in getting diff of replica connection pooler changes") } - // skip pooler sync only - // 1. if there is no diff in spec, AND - // 2. if connection pooler is already there and is also required as per newSpec - // - // Handling the case when connectionPooler is not there but it is required + // skip pooler sync when theres no diff or it's deactivated + // but, handling the case when connectionPooler is not there but it is required // as per spec, hence do not skip syncing in that case, even though there // is no diff in specs if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) && - (c.ConnectionPooler != nil && (needConnectionPooler(&newSpec.Spec))) { + ((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) || + (c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) && + (c.ConnectionPooler[Master].LookupFunction || c.ConnectionPooler[Replica].LookupFunction))) { c.logger.Debugln("syncing pooler is not required") return nil, nil } @@ -731,15 +723,9 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look for _, role := range [2]PostgresRole{Master, Replica} { if role == Master { - newNeedConnectionPooler = needMasterConnectionPoolerWorker(&newSpec.Spec) - if oldSpec != nil { - oldNeedConnectionPooler = needMasterConnectionPoolerWorker(&oldSpec.Spec) - } + connectionPoolerNeeded = needMasterConnectionPoolerWorker(&newSpec.Spec) } else { - newNeedConnectionPooler = needReplicaConnectionPoolerWorker(&newSpec.Spec) - if oldSpec != nil { - oldNeedConnectionPooler = needReplicaConnectionPoolerWorker(&oldSpec.Spec) - } + connectionPoolerNeeded = needReplicaConnectionPoolerWorker(&newSpec.Spec) } // if the call is via createConnectionPooler, then it is required to initialize @@ -759,24 +745,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look } } - if newNeedConnectionPooler { + if connectionPoolerNeeded { // Try to sync in any case. If we didn't needed connection pooler before, // it means we want to create it. If it was already present, still sync // since it could happen that there is no difference in specs, and all // the resources are remembered, but the deployment was manually deleted // in between - // in this case also do not forget to install lookup function as for - // creating cluster - if !oldNeedConnectionPooler || !c.ConnectionPooler[role].LookupFunction { - newConnectionPooler := newSpec.Spec.ConnectionPooler - + // in this case also do not forget to install lookup function + if !c.ConnectionPooler[role].LookupFunction { + connectionPooler := c.Spec.ConnectionPooler specSchema := "" specUser := "" - if newConnectionPooler != nil { - specSchema = newConnectionPooler.Schema - specUser = newConnectionPooler.User + if connectionPooler != nil { + specSchema = connectionPooler.Schema + specUser = connectionPooler.User } schema := util.Coalesce( @@ -787,9 +771,10 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look specUser, c.OpConfig.ConnectionPooler.User) - if err = LookupFunction(schema, user, role); err != nil { + if err = LookupFunction(schema, user); err != nil { return NoSync, err } + c.ConnectionPooler[role].LookupFunction = true } if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil { @@ -808,8 +793,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look } } } - if !needMasterConnectionPoolerWorker(&newSpec.Spec) && - !needReplicaConnectionPoolerWorker(&newSpec.Spec) { + if (needMasterConnectionPoolerWorker(&oldSpec.Spec) || needReplicaConnectionPoolerWorker(&oldSpec.Spec)) && + !needMasterConnectionPoolerWorker(&newSpec.Spec) && !needReplicaConnectionPoolerWorker(&newSpec.Spec) { if err = c.deleteConnectionPoolerSecret(); err != nil { c.logger.Warningf("could not remove connection pooler secret: %v", err) } @@ -874,8 +859,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql newConnectionPooler = &acidv1.ConnectionPooler{} } - c.logger.Infof("old: %+v, new %+v", oldConnectionPooler, newConnectionPooler) - var specSync bool var specReason []string diff --git a/pkg/cluster/connection_pooler_test.go b/pkg/cluster/connection_pooler_test.go index 280adb101..9b983c7b0 100644 --- a/pkg/cluster/connection_pooler_test.go +++ b/pkg/cluster/connection_pooler_test.go @@ -19,7 +19,7 @@ import ( "k8s.io/client-go/kubernetes/fake" ) -func mockInstallLookupFunction(schema string, user string, role PostgresRole) error { +func mockInstallLookupFunction(schema string, user string) error { return nil } diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index ba4cf223a..aa3a5e3be 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -508,7 +508,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi // Creates a connection pool credentials lookup function in every database to // perform remote authentication. -func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error { +func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { var stmtBytes bytes.Buffer c.logger.Info("Installing lookup function") @@ -604,8 +604,8 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role Po c.logger.Infof("pooler lookup function installed into %s", dbname) } - if len(failedDatabases) == 0 { - c.ConnectionPooler[role].LookupFunction = true + if len(failedDatabases) > 0 { + return fmt.Errorf("could not install pooler lookup function in every specified databases") } return nil diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4937a2034..5fa93bdd2 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -758,6 +758,15 @@ func (c *Cluster) syncDatabases() error { } } + if len(createDatabases) > 0 { + // trigger creation of pooler objects in new database in syncConnectionPooler + if c.ConnectionPooler != nil { + for _, role := range [2]PostgresRole{Master, Replica} { + c.ConnectionPooler[role].LookupFunction = false + } + } + } + // set default privileges for prepared database for _, preparedDatabase := range preparedDatabases { if err := c.initDbConnWithName(preparedDatabase); err != nil { diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 8aa519817..199914ccc 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -72,7 +72,7 @@ type ClusterStatus struct { type TemplateParams map[string]interface{} -type InstallFunction func(schema string, user string, role PostgresRole) error +type InstallFunction func(schema string, user string) error type SyncReason []string