Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c663b87
First commit
Oct 5, 2020
d6b4f82
Make connection_pooler a separate package
Oct 6, 2020
49f6285
Add interface to use common functions
Oct 6, 2020
289b209
place config in a common package
Oct 7, 2020
403bbb6
Add pooler for replica
Aug 28, 2020
75ea994
Add new pooler service for replica
Aug 28, 2020
c1e42a8
Enable connection pooler for replica
Sep 2, 2020
dd18c18
Fix labels for the replica pods
Sep 2, 2020
4702340
fix for labels selector
Sep 3, 2020
99abcdc
Assorted changes
Sep 3, 2020
0ce9b50
Fix sync
Sep 4, 2020
6d4bb18
Adding test cases and other changes
Sep 4, 2020
6fda51c
Improvements in tests
Sep 8, 2020
a03397a
Resolve review comments
Sep 11, 2020
c692ca2
Minor fix
Sep 11, 2020
175f0c5
Refactor needConnectionPooler
Sep 21, 2020
fea359f
Update tests
Sep 21, 2020
6272377
Add sync test
Sep 23, 2020
3ba73fb
Add and update tests
Sep 23, 2020
bb51aba
Cleanup deleteConnectionPooler
Oct 5, 2020
84fbfe3
Add labels
Oct 5, 2020
c63446c
First commit
Oct 5, 2020
ae3f5ef
Make connection_pooler a separate package
Oct 6, 2020
4700fc7
Add interface to use common functions
Oct 6, 2020
4fc7e8d
place config in a common package
Oct 7, 2020
11cd11f
Merge branch 'pooler-refac' of https://github.com/zalando/postgres-op…
Oct 7, 2020
f7098b8
Modify packages
Oct 19, 2020
20f2fb7
Update connection_pooler funcs accordingly
Oct 19, 2020
60460c3
fix test cases
Oct 20, 2020
5828e2f
avoid passing spec unnecessarily
Oct 20, 2020
c891798
Code utilization of syncConnectionPooler
Oct 20, 2020
7112995
Fix sync/create issues
Oct 21, 2020
cbdc346
fix podSpec test
Oct 21, 2020
72e2af8
isolate tests and other updates
Oct 21, 2020
3d3bc14
Refactor more methods
Oct 21, 2020
e88fa06
minor fix in test
Oct 27, 2020
4354b02
another test case fix
Oct 27, 2020
2ed0e05
Nitpicks and other changes
Oct 27, 2020
07bb270
cleanup and avoid using cluster in more functions
Oct 27, 2020
4c70c55
resolve conflicts
Oct 29, 2020
49613bd
fix
Oct 29, 2020
7b4e850
minor fix
Oct 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 19 additions & 151 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/r3labs/diff"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -25,6 +24,7 @@ import (
"k8s.io/client-go/tools/reference"

acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"

"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
Expand Down Expand Up @@ -53,26 +53,11 @@ type Config struct {
PodServiceAccountRoleBinding *rbacv1.RoleBinding
}

// K8S objects that are belongs to a connection pooler
type ConnectionPoolerObjects struct {
Deployment map[PostgresRole]*appsv1.Deployment
Service map[PostgresRole]*v1.Service

// It could happen that a connection pooler was enabled, but the operator
// was not able to properly process a corresponding event or was restarted.
// In this case we will miss missing/require situation and a lookup function
// will not be installed. To avoid synchronizing it all the time to prevent
// this, we can remember the result in memory at least until the next
// restart.
LookupFunction bool
}

type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
ConnectionPooler *ConnectionPoolerObjects
PodDisruptionBudget *policybeta1.PodDisruptionBudget
//Pods are treated separately
//PVCs are treated separately
Expand Down Expand Up @@ -102,9 +87,8 @@ type Cluster struct {
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex

ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
}

type compareStatefulsetResult struct {
match bool
replace bool
Expand Down Expand Up @@ -346,20 +330,7 @@ func (c *Cluster) Create() error {
//
// Do not consider connection pooler as a strict requirement, and if
// something fails, report warning
roles := c.RolesConnectionPooler()
for _, r := range roles {
if c.ConnectionPooler != nil {
c.logger.Warning("Connection pooler already exists in the cluster")
return nil
}
connectionPooler, err := c.createConnectionPooler(c.installLookupFunction)
if err != nil {
c.logger.Warningf("could not create connection pooler: %v", err)
return nil
}
c.logger.Infof("connection pooler %q has been successfully created",
util.NameFromMeta(connectionPooler.Deployment[r].ObjectMeta))
}
c.createConnectionPooler(c.installLookupFunction)

return nil
}
Expand Down Expand Up @@ -649,7 +620,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// initUsers. Check if it needs to be called.
sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) &&
reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases)
needConnectionPooler := c.needMasterConnectionPoolerWorker(&newSpec.Spec)
needConnectionPooler := needMasterConnectionPoolerWorker(&newSpec.Spec)
if !sameUsers || needConnectionPooler {
c.logger.Debugf("syncing secrets")
if err := c.initUsers(); err != nil {
Expand Down Expand Up @@ -786,7 +757,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// need to process. In the future we may want to do this more careful and
// check which databases we need to process, but even repeating the whole
// installation process should be good enough.
c.ConnectionPooler.LookupFunction = false

if _, err := c.syncConnectionPooler(oldSpec, newSpec,
c.installLookupFunction); err != nil {
Expand All @@ -797,6 +767,20 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
return nil
}

func syncResources(a, b *v1.ResourceRequirements) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this function was named like that in the original code, but judging from what it does, shouldSyncResources or needToSyncResouces is a better name for it from a readability perspective.

for _, res := range []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
} {
if !a.Limits[res].Equal(b.Limits[res]) ||
!a.Requests[res].Equal(b.Requests[res]) {
return true
}
}

return false
}

// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
Expand Down Expand Up @@ -920,7 +904,7 @@ func (c *Cluster) initSystemUsers() {

// Connection pooler user is an exception, if requested it's going to be
// created by operator as a normal pgUser
if c.needConnectionPooler() {
if needConnectionPooler(&c.Spec) {
// initialize empty connection pooler if not done yet
if c.Spec.ConnectionPooler == nil {
c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
Expand Down Expand Up @@ -1394,119 +1378,3 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {

return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
}

// Test if two connection pooler configuration needs to be synced. For simplicity
// compare not the actual K8S objects, but the configuration itself and request
// sync if there is any difference.
func (c *Cluster) needSyncConnectionPoolerSpecs(oldSpec, newSpec *acidv1.ConnectionPooler) (sync bool, reasons []string) {
reasons = []string{}
sync = false

changelog, err := diff.Diff(oldSpec, newSpec)
if err != nil {
c.logger.Infof("Cannot get diff, do not do anything, %+v", err)
return false, reasons
}

if len(changelog) > 0 {
sync = true
}

for _, change := range changelog {
msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'",
change.Type, change.Path, change.From, change.To)
reasons = append(reasons, msg)
}

return sync, reasons
}

func syncResources(a, b *v1.ResourceRequirements) bool {
for _, res := range []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
} {
if !a.Limits[res].Equal(b.Limits[res]) ||
!a.Requests[res].Equal(b.Requests[res]) {
return true
}
}

return false
}

// Check if we need to synchronize connection pooler deployment due to new
// defaults, that are different from what we see in the DeploymentSpec
func (c *Cluster) needSyncConnectionPoolerDefaults(
spec *acidv1.ConnectionPooler,
deployment *appsv1.Deployment) (sync bool, reasons []string) {

reasons = []string{}
sync = false

config := c.OpConfig.ConnectionPooler
podTemplate := deployment.Spec.Template
poolerContainer := podTemplate.Spec.Containers[constants.ConnectionPoolerContainer]

if spec == nil {
spec = &acidv1.ConnectionPooler{}
}

if spec.NumberOfInstances == nil &&
*deployment.Spec.Replicas != *config.NumberOfInstances {

sync = true
msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)",
*deployment.Spec.Replicas, *config.NumberOfInstances)
reasons = append(reasons, msg)
}

if spec.DockerImage == "" &&
poolerContainer.Image != config.Image {

sync = true
msg := fmt.Sprintf("DockerImage is different (having %s, required %s)",
poolerContainer.Image, config.Image)
reasons = append(reasons, msg)
}

expectedResources, err := generateResourceRequirements(spec.Resources,
c.makeDefaultConnectionPoolerResources())

// An error to generate expected resources means something is not quite
// right, but for the purpose of robustness do not panic here, just report
// and ignore resources comparison (in the worst case there will be no
// updates for new resource values).
if err == nil && syncResources(&poolerContainer.Resources, expectedResources) {
sync = true
msg := fmt.Sprintf("Resources are different (having %+v, required %+v)",
poolerContainer.Resources, expectedResources)
reasons = append(reasons, msg)
}

if err != nil {
c.logger.Warningf("Cannot generate expected resources, %v", err)
}

for _, env := range poolerContainer.Env {
if spec.User == "" && env.Name == "PGUSER" {
ref := env.ValueFrom.SecretKeyRef.LocalObjectReference

if ref.Name != c.credentialSecretName(config.User) {
sync = true
msg := fmt.Sprintf("pooler user is different (having %s, required %s)",
ref.Name, config.User)
reasons = append(reasons, msg)
}
}

if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema {
sync = true
msg := fmt.Sprintf("pooler schema is different (having %s, required %s)",
env.Value, config.Schema)
reasons = append(reasons, msg)
}
}

return sync, reasons
}
Loading