diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 85f60b601..a51c9871e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1032,12 +1032,20 @@ func (c *Cluster) processPodEvent(obj interface{}) error { return fmt.Errorf("could not cast to PodEvent") } + // can only take lock when (un)registerPodSubscriber is finshed c.podSubscribersMu.RLock() subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] - c.podSubscribersMu.RUnlock() if ok { - subscriber <- event + select { + case subscriber <- event: + default: + // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished) + // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations + } } + // hold lock for the time of processing the event to avoid race condition + // with unregisterPodSubscriber closing the channel (see #1876) + c.podSubscribersMu.RUnlock() return nil } @@ -1501,34 +1509,16 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e var err error c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) - - var wg sync.WaitGroup - - podLabelErr := make(chan error) stopCh := make(chan struct{}) - - wg.Add(1) - - go func() { - defer wg.Done() - ch := c.registerPodSubscriber(candidate) - defer c.unregisterPodSubscriber(candidate) - - role := Master - - select { - case <-stopCh: - case podLabelErr <- func() (err2 error) { - _, err2 = c.waitForPodLabel(ch, stopCh, &role) - return - }(): - } - }() + ch := c.registerPodSubscriber(candidate) + defer c.unregisterPodSubscriber(candidate) + defer close(stopCh) if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) - if err = <-podLabelErr; err != nil { + _, err = c.waitForPodLabel(ch, stopCh, nil) + if err != nil { err = fmt.Errorf("could not get master pod label: %v", err) } } else { @@ -1536,14 +1526,6 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) } - // signal the role label waiting goroutine to close the shop and go home - close(stopCh) - // wait until the goroutine terminates, since unregisterPodSubscriber - // must be called before the outer return; otherwise we risk subscribing to the same pod twice. - wg.Wait() - // close the label waiting channel no sooner than the waiting goroutine terminates. - close(podLabelErr) - return err } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 26c4c332d..1e24565d8 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -67,7 +67,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) } - err = retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout, + err = retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { _, err2 := c.KubeClient.Pods(pod.Namespace).Patch( context.TODO(), @@ -151,12 +151,13 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() - if _, ok := c.podSubscribers[podName]; !ok { + ch, ok := c.podSubscribers[podName] + if !ok { panic("subscriber for pod '" + podName.String() + "' is not found") } - close(c.podSubscribers[podName]) delete(c.podSubscribers, podName) + close(ch) } func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent { @@ -399,11 +400,12 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) } func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { + stopCh := make(chan struct{}) ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) - stopChan := make(chan struct{}) + defer close(stopCh) - err := retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout, + err := retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { err2 := c.KubeClient.Pods(podName.Namespace).Delete( context.TODO(), @@ -421,7 +423,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { if err := c.waitForPodDeletion(ch); err != nil { return nil, err } - pod, err := c.waitForPodLabel(ch, stopChan, nil) + pod, err := c.waitForPodLabel(ch, stopCh, nil) if err != nil { return nil, err } @@ -446,7 +448,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp continue } - podName := util.NameFromMeta(pod.ObjectMeta) + podName := util.NameFromMeta(pods[i].ObjectMeta) newPod, err := c.recreatePod(podName) if err != nil { return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 0f71d2d64..0bfda78bf 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -316,7 +316,7 @@ func (c *Cluster) annotationsSet(annotations map[string]string) map[string]strin return nil } -func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) { +func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) { timeout := time.After(c.OpConfig.PodLabelWaitTimeout) for { select { @@ -332,7 +332,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{ } case <-timeout: return nil, fmt.Errorf("pod label wait timeout") - case <-stopChan: + case <-stopCh: return nil, fmt.Errorf("pod label wait cancelled") } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index de0dec69f..e46b9ee44 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -451,7 +451,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { panic("could not acquire initial list of clusters") } - wg.Add(5) + wg.Add(5 + util.Bool2Int(c.opConfig.EnablePostgresTeamCRD)) go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) go c.clusterResync(stopCh, wg) diff --git a/pkg/util/util.go b/pkg/util/util.go index 688153b89..8e27e4448 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -324,12 +324,20 @@ func testNil(values ...*int32) bool { return false } -// Convert int to IntOrString type +// ToIntStr converts int to IntOrString type func ToIntStr(val int) *intstr.IntOrString { b := intstr.FromInt(val) return &b } +// Bool2Int converts bool to int +func Bool2Int(flag bool) int { + if flag { + return 1 + } + return 0 +} + // Get int from IntOrString and return max int if string func IntFromIntStr(intOrStr intstr.IntOrString) int { if intOrStr.Type == 1 {