Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ _testmain.go
/vendor/
/build/
/docker/build/
/github.com/
.idea

scm-source.json
Expand Down
9 changes: 5 additions & 4 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ def test_node_readiness_label(self):
'''
k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
labels = 'spilo-role=master,' + cluster_label
readiness_label = 'lifecycle-status'
readiness_value = 'ready'

Expand Down Expand Up @@ -709,14 +708,16 @@ def wait_for_logical_backup_job_deletion(self):
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)

def update_config(self, config_map_patch):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)

def delete_operator_pod(self):
operator_pod = self.api.core_v1.list_namespaced_pod(
'default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf
self.wait_for_operator_pod_start()

def update_config(self, config_map_patch):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod()

def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "create", "-f", path],
Expand Down
25 changes: 25 additions & 0 deletions pkg/cluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
return pod, nil
}

func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {

/*
Operator should not re-create pods if there is at least one replica being bootstrapped
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).

XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
after this check succeeds but before a pod is re-created
*/

for _, pod := range pods.Items {
state, err := c.patroni.GetPatroniMemberState(&pod)
if err != nil || state == "creating replica" {
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
return false
}

}
return true
}

func (c *Cluster) recreatePods() error {
c.setProcessName("starting to recreate pods")
ls := c.labelsSet(false)
Expand All @@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error {
}
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))

if !c.isSafeToRecreatePods(pods) {
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized")
}

var (
masterPod, newMasterPod, newPod *v1.Pod
)
Expand Down
37 changes: 36 additions & 1 deletion pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package patroni
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
Expand All @@ -11,7 +12,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

const (
Expand All @@ -25,6 +26,7 @@ const (
type Interface interface {
Switchover(master *v1.Pod, candidate string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
GetPatroniMemberState(pod *v1.Pod) (string, error)
}

// Patroni API client
Expand Down Expand Up @@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
}
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
}

//GetPatroniMemberState returns a state of member of a Patroni cluster
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {

apiURLString, err := apiURL(server)
if err != nil {
return "", err
}
response, err := p.httpClient.Get(apiURLString)
if err != nil {
return "", fmt.Errorf("could not perform Get request: %v", err)
}
defer response.Body.Close()

body, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %v", err)
}

data := make(map[string]interface{})
err = json.Unmarshal(body, &data)
if err != nil {
return "", err
}

state, ok := data["state"].(string)
if !ok {
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
}

return state, nil

}