Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/logical-backup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ RUN apt-get update \
&& curl --silent https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - \
&& apt-get update \
&& apt-get install --no-install-recommends -y \
postgresql-client-14 \
postgresql-client-13 \
postgresql-client-12 \
postgresql-client-11 \
Expand Down
2 changes: 1 addition & 1 deletion e2e/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN apt-get update \
curl \
vim \
&& pip3 install --no-cache-dir -r requirements.txt \
&& curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl \
&& curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.22.0/bin/linux/amd64/kubectl \
&& chmod +x ./kubectl \
&& mv ./kubectl /usr/local/bin/kubectl \
&& apt-get clean \
Expand Down
2 changes: 1 addition & 1 deletion e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ IFS=$'\n\t'

readonly cluster_name="postgres-operator-e2e-tests"
readonly kubeconfig_path="/tmp/kind-config-${cluster_name}"
readonly spilo_image="registry.opensource.zalan.do/acid/spilo-13-e2e:0.3"
readonly spilo_image="registry.opensource.zalan.do/acid/spilo-14-e2e:0.1"
readonly e2e_test_runner_image="registry.opensource.zalan.do/acid/postgres-operator-e2e-tests-runner:0.3"

export GOPATH=${GOPATH-~/go}
Expand Down
79 changes: 63 additions & 16 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from tests.k8s_api import K8s
from kubernetes.client.rest import ApiException

SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-13-e2e:0.3"
SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-13-e2e:0.4"
SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-14-e2e:0.1"
SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-14-e2e:0.2"


def to_selector(labels):
Expand Down Expand Up @@ -85,6 +85,7 @@ def setUpClass(cls):

# set a single K8s wrapper for all tests
k8s = cls.k8s = K8s()
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'

# remove existing local storage class and create hostpath class
try:
Expand Down Expand Up @@ -150,8 +151,8 @@ def setUpClass(cls):
result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml")
print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr))
try:
k8s.wait_for_pod_start('spilo-role=master')
k8s.wait_for_pod_start('spilo-role=replica')
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
Expand Down Expand Up @@ -1064,12 +1065,13 @@ def test_patroni_config_update(self):
via restarting cluster through Patroni's rest api
'''
k8s = self.k8s
masterPod = k8s.get_cluster_leader_pod()
labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master'
creationTimestamp = masterPod.metadata.creation_timestamp
leader = k8s.get_cluster_leader_pod()
replica = k8s.get_cluster_replica_pod()
masterCreationTimestamp = leader.metadata.creation_timestamp
replicaCreationTimestamp = replica.metadata.creation_timestamp
new_max_connections_value = "50"

# adjust max_connection
# adjust Postgres config
pg_patch_config = {
"spec": {
"postgresql": {
Expand Down Expand Up @@ -1098,7 +1100,7 @@ def test_patroni_config_update(self):
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

def compare_config():
effective_config = k8s.patroni_rest(masterPod.metadata.name, "config")
effective_config = k8s.patroni_rest(leader.metadata.name, "config")
desired_config = pg_patch_config["spec"]["patroni"]
desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"]
effective_parameters = effective_config["postgresql"]["parameters"]
Expand All @@ -1115,19 +1117,63 @@ def compare_config():
"synchronous_mode not updated")
return True

# check if Patroni config has been updated
self.eventuallyTrue(compare_config, "Postgres config not applied")

# make sure that pods were not recreated
leader = k8s.get_cluster_leader_pod()
replica = k8s.get_cluster_replica_pod()
self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp,
"Master pod creation timestamp is updated")
self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp,
"Master pod creation timestamp is updated")

# query max_connections setting
setting_query = """
SELECT setting
FROM pg_settings
WHERE name = 'max_connections';
"""
self.eventuallyEqual(lambda: self.query_database(masterPod.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied", 10, 5)
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied on master", 10, 5)
self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5)

# make sure that pod wasn't recreated
self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp,
"Master pod creation timestamp is updated")
# the next sync should restart the replica because it has pending_restart flag set
# force next sync by deleting the operator pod
k8s.delete_operator_pod()
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied on replica", 10, 5)

# decrease max_connections again
# this time restart will be correct and new value should appear on both instances
lower_max_connections_value = "30"
pg_patch_max_connections = {
"spec": {
"postgresql": {
"parameters": {
"max_connections": lower_max_connections_value
}
}
}
}

k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections)

self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# check Patroni config again
pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value
self.eventuallyTrue(compare_config, "Postgres config not applied")

# and query max_connections setting again
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on master", 10, 5)
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on replica", 10, 5)

except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
Expand Down Expand Up @@ -1554,6 +1600,7 @@ def assert_distributed_pods(self, master_node, replica_nodes, cluster_label):
Toggle pod anti affinty to distribute pods accross nodes (replica in particular).
'''
k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
failover_targets = self.get_failover_targets(master_node, replica_nodes)

# enable pod anti affintiy in config map which should trigger movement of replica
Expand All @@ -1572,8 +1619,8 @@ def assert_distributed_pods(self, master_node, replica_nodes, cluster_label):
}
}
k8s.update_config(patch_disable_antiaffinity, "disable antiaffinity")
k8s.wait_for_pod_start('spilo-role=master')
k8s.wait_for_pod_start('spilo-role=replica')
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
return True

def list_databases(self, pod_name):
Expand Down
126 changes: 81 additions & 45 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -20,6 +21,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var requireMasterRestartWhenDecreased = []string{
"max_connections",
"max_prepared_transactions",
"max_locks_per_transaction",
"max_worker_processes",
"max_wal_senders",
}

// Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
// Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
Expand Down Expand Up @@ -264,11 +273,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {

func (c *Cluster) syncStatefulSet() error {
var (
masterPod *v1.Pod
postgresConfig map[string]interface{}
instanceRestartRequired bool
restartWait uint32
restartMasterFirst bool
)

podsToRecreate := make([]v1.Pod, 0)
switchoverCandidates := make([]spec.NamespacedName, 0)

Expand Down Expand Up @@ -402,38 +409,41 @@ func (c *Cluster) syncStatefulSet() error {
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
continue
}
restartWait = patroniConfig.LoopWait

// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
// do not attempt a restart
if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 {
instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters)
restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
}
// it could take up to LoopWait to apply the config
time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2)
break
}
}

// if the config update requires a restart, call Patroni restart for replicas first, then master
if instanceRestartRequired {
c.logger.Debug("restarting Postgres server within pods")
ttl, ok := postgresConfig["ttl"].(int32)
if !ok {
ttl = 30
}
for i, pod := range pods {
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
if role == Master {
masterPod = &pods[i]
continue
}
c.restartInstance(&pod)
time.Sleep(time.Duration(ttl) * time.Second)
// restart instances if required
remainingPods := make([]*v1.Pod, 0)
skipRole := Master
if restartMasterFirst {
skipRole = Replica
}
for i, pod := range pods {
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
if role == skipRole {
remainingPods = append(remainingPods, &pods[i])
continue
}
c.restartInstance(&pod, restartWait)
}

if masterPod != nil {
c.restartInstance(masterPod)
// in most cases only the master should be left to restart
if len(remainingPods) > 0 {
for _, remainingPod := range remainingPods {
c.restartInstance(remainingPod, restartWait)
}
}

Expand All @@ -450,19 +460,27 @@ func (c *Cluster) syncStatefulSet() error {
return nil
}

func (c *Cluster) restartInstance(pod *v1.Pod) {
func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) {
podName := util.NameFromMeta(pod.ObjectMeta)
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])

c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name))

if err := c.patroni.Restart(pod); err != nil {
c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err)
// if the config update requires a restart, call Patroni restart
memberData, err := c.patroni.GetMemberData(pod)
if err != nil {
c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err)
return
}

c.logger.Debugf("Postgres server successfuly restarted in %s pod %s", role, podName)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name))
// do restart only when it is pending
if memberData.PendingRestart {
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name))
if err := c.patroni.Restart(pod); err != nil {
c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err)
return
}
time.Sleep(time.Duration(restartWait) * time.Second)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name))
}
}

// AnnotationsToPropagate get the annotations to update if required
Expand Down Expand Up @@ -502,21 +520,10 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) {
configToSet := make(map[string]interface{})
parametersToSet := make(map[string]string)
restartMaster := make([]bool, 0)
requiresMasterRestart := false

// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
desiredPgParameters := c.Spec.Parameters
for desiredOption, desiredValue := range desiredPgParameters {
effectiveValue := effectivePgParameters[desiredOption]
if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) {
parametersToSet[desiredOption] = desiredValue
}
}

if len(parametersToSet) > 0 {
configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet}
}

// compare other options from config with c.Spec.Patroni from manifest
// compare options from config with c.Spec.Patroni from manifest
desiredPatroniConfig := c.Spec.Patroni
if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait {
configToSet["loop_wait"] = desiredPatroniConfig.LoopWait
Expand Down Expand Up @@ -554,6 +561,35 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC
configToSet["slots"] = slotsToSet
}

// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
desiredPgParameters := c.Spec.Parameters
for desiredOption, desiredValue := range desiredPgParameters {
effectiveValue := effectivePgParameters[desiredOption]
if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) {
parametersToSet[desiredOption] = desiredValue
if util.SliceContains(requireMasterRestartWhenDecreased, desiredOption) {
effectiveValueNum, errConv := strconv.Atoi(effectiveValue)
desiredValueNum, errConv2 := strconv.Atoi(desiredValue)
if errConv != nil || errConv2 != nil {
continue
}
if effectiveValueNum > desiredValueNum {
restartMaster = append(restartMaster, true)
continue
}
}
restartMaster = append(restartMaster, false)
}
}

if !util.SliceContains(restartMaster, false) && len(configToSet) == 0 {
requiresMasterRestart = true
}

if len(parametersToSet) > 0 {
configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet}
}

if len(configToSet) == 0 {
return false, nil
}
Expand All @@ -569,10 +605,10 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC
c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s",
podName, configToSetJson)
if err = c.patroni.SetConfig(pod, configToSet); err != nil {
return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err)
return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err)
}

return true, nil
return requiresMasterRestart, nil
}

func (c *Cluster) syncSecrets() error {
Expand Down
Loading