Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,23 +1006,23 @@ func (c *Cluster) initSystemUsers() {
// Connection pooler user is an exception, if requested it's going to be
// created by operator as a normal pgUser
if needConnectionPooler(&c.Spec) {
// initialize empty connection pooler if not done yet
if c.Spec.ConnectionPooler == nil {
c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
connectionPoolerSpec := c.Spec.ConnectionPooler
if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
}

// Using superuser as pooler user is not a good idea. First of all it's
// not going to be synced correctly with the current implementation,
// and second it's a bad practice.
username := c.OpConfig.ConnectionPooler.User

isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername
isSuperUser := connectionPoolerSpec.User == c.OpConfig.SuperUsername
isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
c.Spec.ConnectionPooler.User, "connection pool role")
connectionPoolerSpec.User, "connection pool role")

if !isSuperUser && !isProtectedUser {
username = util.Coalesce(
c.Spec.ConnectionPooler.User,
connectionPoolerSpec.User,
c.OpConfig.ConnectionPooler.User)
}

Expand Down
113 changes: 48 additions & 65 deletions pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"context"
"fmt"
"reflect"
"strings"

"github.com/r3labs/diff"
Expand Down Expand Up @@ -60,7 +61,7 @@ func needMasterConnectionPooler(spec *acidv1.PostgresSpec) bool {
}

func needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) ||
return (spec.EnableConnectionPooler != nil && *spec.EnableConnectionPooler) ||
(spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil)
}

Expand Down Expand Up @@ -114,7 +115,7 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe
c.setProcessName("creating connection pooler")

//this is essentially sync with nil as oldSpec
if reason, err := c.syncConnectionPooler(nil, &c.Postgresql, LookupFunction); err != nil {
if reason, err := c.syncConnectionPooler(&acidv1.Postgresql{}, &c.Postgresql, LookupFunction); err != nil {
return reason, err
}
return reason, nil
Expand All @@ -140,19 +141,23 @@ func (c *Cluster) createConnectionPooler(LookupFunction InstallFunction) (SyncRe
// RESERVE_SIZE is how many additional connections to allow for a pooler.
func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar {
spec := &c.Spec
connectionPoolerSpec := spec.ConnectionPooler
if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
}
effectiveMode := util.Coalesce(
spec.ConnectionPooler.Mode,
connectionPoolerSpec.Mode,
c.OpConfig.ConnectionPooler.Mode)

numberOfInstances := spec.ConnectionPooler.NumberOfInstances
numberOfInstances := connectionPoolerSpec.NumberOfInstances
if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPooler.NumberOfInstances,
k8sutil.Int32ToPointer(1))
}

effectiveMaxDBConn := util.CoalesceInt32(
spec.ConnectionPooler.MaxDBConnections,
connectionPoolerSpec.MaxDBConnections,
c.OpConfig.ConnectionPooler.MaxDBConnections)

if effectiveMaxDBConn == nil {
Expand Down Expand Up @@ -201,17 +206,21 @@ func (c *Cluster) getConnectionPoolerEnvVars() []v1.EnvVar {
func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) (
*v1.PodTemplateSpec, error) {
spec := &c.Spec
connectionPoolerSpec := spec.ConnectionPooler
if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
}
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements(
spec.ConnectionPooler.Resources,
connectionPoolerSpec.Resources,
makeDefaultConnectionPoolerResources(&c.OpConfig))

effectiveDockerImage := util.Coalesce(
spec.ConnectionPooler.DockerImage,
connectionPoolerSpec.DockerImage,
c.OpConfig.ConnectionPooler.Image)

effectiveSchema := util.Coalesce(
spec.ConnectionPooler.Schema,
connectionPoolerSpec.Schema,
c.OpConfig.ConnectionPooler.Schema)

if err != nil {
Expand All @@ -220,7 +229,7 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) (

secretSelector := func(key string) *v1.SecretKeySelector {
effectiveUser := util.Coalesce(
spec.ConnectionPooler.User,
connectionPoolerSpec.User,
c.OpConfig.ConnectionPooler.User)

return &v1.SecretKeySelector{
Expand Down Expand Up @@ -321,12 +330,13 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio
// default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and
// create code, so init here.
if spec.ConnectionPooler == nil {
spec.ConnectionPooler = &acidv1.ConnectionPooler{}
connectionPoolerSpec := spec.ConnectionPooler
if connectionPoolerSpec == nil {
connectionPoolerSpec = &acidv1.ConnectionPooler{}
}
podTemplate, err := c.generateConnectionPoolerPodTemplate(connectionPooler.Role)

numberOfInstances := spec.ConnectionPooler.NumberOfInstances
numberOfInstances := connectionPoolerSpec.NumberOfInstances
if numberOfInstances == nil {
numberOfInstances = util.CoalesceInt32(
c.OpConfig.ConnectionPooler.NumberOfInstances,
Expand Down Expand Up @@ -371,16 +381,6 @@ func (c *Cluster) generateConnectionPoolerDeployment(connectionPooler *Connectio
func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPoolerObjects) *v1.Service {

spec := &c.Spec
// there are two ways to enable connection pooler, either to specify a
// connectionPooler section or enableConnectionPooler. In the second case
// spec.connectionPooler will be nil, so to make it easier to calculate
// default values, initialize it to an empty structure. It could be done
// anywhere, but here is the earliest common entry point between sync and
// create code, so init here.
if spec.ConnectionPooler == nil {
spec.ConnectionPooler = &acidv1.ConnectionPooler{}
}

serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Expand Down Expand Up @@ -668,12 +668,14 @@ func makeDefaultConnectionPoolerResources(config *config.Config) acidv1.Resource

func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql) {
var v []string

var input []*bool

newMasterConnectionPoolerEnabled := needMasterConnectionPoolerWorker(&newSpec.Spec)
if oldSpec == nil {
input = []*bool{nil, nil, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler}
input = []*bool{nil, nil, &newMasterConnectionPoolerEnabled, newSpec.Spec.EnableReplicaConnectionPooler}
} else {
input = []*bool{oldSpec.Spec.EnableConnectionPooler, oldSpec.Spec.EnableReplicaConnectionPooler, newSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableReplicaConnectionPooler}
oldMasterConnectionPoolerEnabled := needMasterConnectionPoolerWorker(&oldSpec.Spec)
input = []*bool{&oldMasterConnectionPoolerEnabled, oldSpec.Spec.EnableReplicaConnectionPooler, &newMasterConnectionPoolerEnabled, newSpec.Spec.EnableReplicaConnectionPooler}
}

for _, b := range input {
Expand All @@ -684,25 +686,16 @@ func logPoolerEssentials(log *logrus.Entry, oldSpec, newSpec *acidv1.Postgresql)
}
}

log.Debugf("syncing connection pooler from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3])
log.Debugf("syncing connection pooler (master, replica) from (%v, %v) to (%v, %v)", v[0], v[1], v[2], v[3])
}

func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, LookupFunction InstallFunction) (SyncReason, error) {

var reason SyncReason
var err error
var newNeedConnectionPooler, oldNeedConnectionPooler bool
oldNeedConnectionPooler = false

if oldSpec == nil {
oldSpec = &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPooler: &acidv1.ConnectionPooler{},
},
}
}
var connectionPoolerNeeded bool

needSync, _ := needSyncConnectionPoolerSpecs(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler, c.logger)
needSync := !reflect.DeepEqual(oldSpec.Spec.ConnectionPooler, newSpec.Spec.ConnectionPooler)
masterChanges, err := diff.Diff(oldSpec.Spec.EnableConnectionPooler, newSpec.Spec.EnableConnectionPooler)
if err != nil {
c.logger.Error("Error in getting diff of master connection pooler changes")
Expand All @@ -712,15 +705,14 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
c.logger.Error("Error in getting diff of replica connection pooler changes")
}

// skip pooler sync only
// 1. if there is no diff in spec, AND
// 2. if connection pooler is already there and is also required as per newSpec
//
// Handling the case when connectionPooler is not there but it is required
// skip pooler sync when theres no diff or it's deactivated
// but, handling the case when connectionPooler is not there but it is required
// as per spec, hence do not skip syncing in that case, even though there
// is no diff in specs
if (!needSync && len(masterChanges) <= 0 && len(replicaChanges) <= 0) &&
(c.ConnectionPooler != nil && (needConnectionPooler(&newSpec.Spec))) {
((!needConnectionPooler(&newSpec.Spec) && (c.ConnectionPooler == nil || !needConnectionPooler(&oldSpec.Spec))) ||
(c.ConnectionPooler != nil && needConnectionPooler(&newSpec.Spec) &&
(c.ConnectionPooler[Master].LookupFunction || c.ConnectionPooler[Replica].LookupFunction))) {
c.logger.Debugln("syncing pooler is not required")
return nil, nil
}
Expand All @@ -731,15 +723,9 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
for _, role := range [2]PostgresRole{Master, Replica} {

if role == Master {
newNeedConnectionPooler = needMasterConnectionPoolerWorker(&newSpec.Spec)
if oldSpec != nil {
oldNeedConnectionPooler = needMasterConnectionPoolerWorker(&oldSpec.Spec)
}
connectionPoolerNeeded = needMasterConnectionPoolerWorker(&newSpec.Spec)
} else {
newNeedConnectionPooler = needReplicaConnectionPoolerWorker(&newSpec.Spec)
if oldSpec != nil {
oldNeedConnectionPooler = needReplicaConnectionPoolerWorker(&oldSpec.Spec)
}
connectionPoolerNeeded = needReplicaConnectionPoolerWorker(&newSpec.Spec)
}

// if the call is via createConnectionPooler, then it is required to initialize
Expand All @@ -759,24 +745,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
}
}

if newNeedConnectionPooler {
if connectionPoolerNeeded {
// Try to sync in any case. If we didn't needed connection pooler before,
// it means we want to create it. If it was already present, still sync
// since it could happen that there is no difference in specs, and all
// the resources are remembered, but the deployment was manually deleted
// in between

// in this case also do not forget to install lookup function as for
// creating cluster
if !oldNeedConnectionPooler || !c.ConnectionPooler[role].LookupFunction {
newConnectionPooler := newSpec.Spec.ConnectionPooler

// in this case also do not forget to install lookup function
if !c.ConnectionPooler[role].LookupFunction {
connectionPooler := c.Spec.ConnectionPooler
specSchema := ""
specUser := ""

if newConnectionPooler != nil {
specSchema = newConnectionPooler.Schema
specUser = newConnectionPooler.User
if connectionPooler != nil {
specSchema = connectionPooler.Schema
specUser = connectionPooler.User
}

schema := util.Coalesce(
Expand All @@ -787,9 +771,10 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
specUser,
c.OpConfig.ConnectionPooler.User)

if err = LookupFunction(schema, user, role); err != nil {
if err = LookupFunction(schema, user); err != nil {
return NoSync, err
}
c.ConnectionPooler[role].LookupFunction = true
}

if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec, role); err != nil {
Expand All @@ -808,8 +793,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look
}
}
}
if !needMasterConnectionPoolerWorker(&newSpec.Spec) &&
!needReplicaConnectionPoolerWorker(&newSpec.Spec) {
if (needMasterConnectionPoolerWorker(&oldSpec.Spec) || needReplicaConnectionPoolerWorker(&oldSpec.Spec)) &&
!needMasterConnectionPoolerWorker(&newSpec.Spec) && !needReplicaConnectionPoolerWorker(&newSpec.Spec) {
if err = c.deleteConnectionPoolerSecret(); err != nil {
c.logger.Warningf("could not remove connection pooler secret: %v", err)
}
Expand Down Expand Up @@ -874,8 +859,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newConnectionPooler = &acidv1.ConnectionPooler{}
}

c.logger.Infof("old: %+v, new %+v", oldConnectionPooler, newConnectionPooler)

var specSync bool
var specReason []string

Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/connection_pooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
)

func mockInstallLookupFunction(schema string, user string, role PostgresRole) error {
func mockInstallLookupFunction(schema string, user string) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi

// Creates a connection pool credentials lookup function in every database to
// perform remote authentication.
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error {
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {
var stmtBytes bytes.Buffer

c.logger.Info("Installing lookup function")
Expand Down Expand Up @@ -604,8 +604,8 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role Po
c.logger.Infof("pooler lookup function installed into %s", dbname)
}

if len(failedDatabases) == 0 {
c.ConnectionPooler[role].LookupFunction = true
if len(failedDatabases) > 0 {
return fmt.Errorf("could not install pooler lookup function in every specified databases")
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,15 @@ func (c *Cluster) syncDatabases() error {
}
}

if len(createDatabases) > 0 {
// trigger creation of pooler objects in new database in syncConnectionPooler
if c.ConnectionPooler != nil {
for _, role := range [2]PostgresRole{Master, Replica} {
c.ConnectionPooler[role].LookupFunction = false
}
}
}

// set default privileges for prepared database
for _, preparedDatabase := range preparedDatabases {
if err := c.initDbConnWithName(preparedDatabase); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ClusterStatus struct {

type TemplateParams map[string]interface{}

type InstallFunction func(schema string, user string, role PostgresRole) error
type InstallFunction func(schema string, user string) error

type SyncReason []string

Expand Down