diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a51c9871e..649ec5b87 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1031,16 +1031,21 @@ func (c *Cluster) processPodEvent(obj interface{}) error { if !ok { return fmt.Errorf("could not cast to PodEvent") } + podName := spec.NamespacedName(event.PodName) // can only take lock when (un)registerPodSubscriber is finshed c.podSubscribersMu.RLock() - subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] + subscriber, ok := c.podSubscribers[podName] if ok { - select { - case subscriber <- event: - default: - // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished) - // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations + if event.EventType == PodEventEnd { + c.unregisterPodSubscriber(podName) + } else { + select { + case subscriber <- event: + default: + // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished) + // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations + } } } // hold lock for the time of processing the event to avoid race condition @@ -1510,8 +1515,13 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) stopCh := make(chan struct{}) - ch := c.registerPodSubscriber(candidate) - defer c.unregisterPodSubscriber(candidate) + // create buffered channel so that go routine with processPodEvent does not block when + // consumer waitForPodLabel is already finished. Channel size should be big enough to + // receive an event for all pod label wait timeout + patroni API timeout seconds + chanSize := int(c.OpConfig.PodLabelWaitTimeout.Seconds() + c.OpConfig.PatroniAPICheckTimeout.Seconds()) + ch := c.registerBufferedPodSubscriber(candidate, chanSize) + // send special end event to trigger removal of PodEvent channel by processPodEvent + defer c.ReceivePodEvent(PodEvent{PodName: types.NamespacedName(candidate), EventType: PodEventEnd}) defer close(stopCh) if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 1e24565d8..e69a080e4 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -136,8 +136,9 @@ func (c *Cluster) deletePods() error { func (c *Cluster) deletePod(podName spec.NamespacedName) error { c.setProcessName("deleting pod %q", podName) - ch := c.registerPodSubscriber(podName) - defer c.unregisterPodSubscriber(podName) + ch := c.registerUnbufferedPodSubscriber(podName) + // send special end event to trigger removal of PodEvent channel by processPodEvent + defer c.ReceivePodEvent(PodEvent{PodName: types.NamespacedName(podName), EventType: PodEventEnd}) if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { return err @@ -155,17 +156,31 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { if !ok { panic("subscriber for pod '" + podName.String() + "' is not found") } - delete(c.podSubscribers, podName) close(ch) } -func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent { +func (c *Cluster) registerUnbufferedPodSubscriber(podName spec.NamespacedName) chan PodEvent { + return c.registerPodSubscriber(podName, 0) +} + +func (c *Cluster) registerBufferedPodSubscriber(podName spec.NamespacedName, chanSize int) chan PodEvent { + return c.registerPodSubscriber(podName, chanSize) +} + +func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName, chanSize int) chan PodEvent { c.logger.Debugf("subscribing to pod %q", podName) + var ch chan PodEvent c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() - ch := make(chan PodEvent) + // buffered or unbuffered PodEvent channel + if chanSize > 0 { + ch = make(chan PodEvent, chanSize) + } else { + ch = make(chan PodEvent) + } + if _, ok := c.podSubscribers[podName]; ok { panic("pod '" + podName.String() + "' is already subscribed") } @@ -401,8 +416,9 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { stopCh := make(chan struct{}) - ch := c.registerPodSubscriber(podName) - defer c.unregisterPodSubscriber(podName) + ch := c.registerUnbufferedPodSubscriber(podName) + // send special end event to trigger removal of PodEvent channel by processPodEvent + defer c.ReceivePodEvent(PodEvent{PodName: types.NamespacedName(podName), EventType: PodEventEnd}) defer close(stopCh) err := retryutil.Retry(1*time.Second, 5*time.Second, diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 67b4ee395..c53638bd7 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -32,6 +32,7 @@ const ( PodEventAdd PodEventType = "ADD" PodEventUpdate PodEventType = "UPDATE" PodEventDelete PodEventType = "DELETE" + PodEventEnd PodEventType = "END" ) // PodEvent describes the event for a single Pod