diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a51c9871e..3c857ede8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -77,7 +77,7 @@ type Cluster struct { pgUsers map[string]spec.PgUser pgUsersCache map[string]spec.PgUser systemUsers map[string]spec.PgUser - podSubscribers map[spec.NamespacedName]chan PodEvent + podSubscribers map[spec.NamespacedName]PodSubscriber podSubscribersMu sync.RWMutex pgDb *sql.DB mu sync.Mutex @@ -98,6 +98,11 @@ type Cluster struct { currentMajorVersion int } +type PodSubscriber struct { + podEvents chan PodEvent + stopEvent chan struct{} +} + type compareStatefulsetResult struct { match bool replace bool @@ -127,7 +132,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres Postgresql: pgSpec, pgUsers: make(map[string]spec.PgUser), systemUsers: make(map[string]spec.PgUser), - podSubscribers: make(map[spec.NamespacedName]chan PodEvent), + podSubscribers: make(map[spec.NamespacedName]PodSubscriber), kubeResources: kubeResources{ Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), @@ -1032,15 +1037,17 @@ func (c *Cluster) processPodEvent(obj interface{}) error { return fmt.Errorf("could not cast to PodEvent") } + podName := spec.NamespacedName(event.PodName) + // can only take lock when (un)registerPodSubscriber is finshed c.podSubscribersMu.RLock() - subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] + subscriber, ok := c.podSubscribers[podName] if ok { select { - case subscriber <- event: + case <-subscriber.stopEvent: + c.unregisterPodSubscriber(podName) + case subscriber.podEvents <- event: default: - // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished) - // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations } } // hold lock for the time of processing the event to avoid race condition @@ -1510,14 +1517,16 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) stopCh := make(chan struct{}) - ch := c.registerPodSubscriber(candidate) - defer c.unregisterPodSubscriber(candidate) + subscriber := c.registerPodSubscriber(candidate) + defer func() { + subscriber.stopEvent <- struct{}{} + }() defer close(stopCh) if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) - _, err = c.waitForPodLabel(ch, stopCh, nil) + _, err = c.waitForPodLabel(subscriber.podEvents, stopCh, nil) if err != nil { err = fmt.Errorf("could not get master pod label: %v", err) } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 1e24565d8..f83169935 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -136,14 +136,20 @@ func (c *Cluster) deletePods() error { func (c *Cluster) deletePod(podName spec.NamespacedName) error { c.setProcessName("deleting pod %q", podName) - ch := c.registerPodSubscriber(podName) - defer c.unregisterPodSubscriber(podName) + subscriber := c.registerPodSubscriber(podName) + defer func() { + subscriber.stopEvent <- struct{}{} + }() if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { return err } - return c.waitForPodDeletion(ch) + if err := c.waitForPodDeletion(subscriber.podEvents); err != nil { + return err + } + + return nil } func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { @@ -151,27 +157,31 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() - ch, ok := c.podSubscribers[podName] + subscriber, ok := c.podSubscribers[podName] if !ok { panic("subscriber for pod '" + podName.String() + "' is not found") } delete(c.podSubscribers, podName) - close(ch) + close(subscriber.podEvents) + close(subscriber.stopEvent) } -func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent { +func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) PodSubscriber { c.logger.Debugf("subscribing to pod %q", podName) c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() - ch := make(chan PodEvent) + var subscriber PodSubscriber + subscriber.podEvents = make(chan PodEvent) + subscriber.stopEvent = make(chan struct{}) + if _, ok := c.podSubscribers[podName]; ok { panic("pod '" + podName.String() + "' is already subscribed") } - c.podSubscribers[podName] = ch + c.podSubscribers[podName] = subscriber - return ch + return subscriber } func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { @@ -401,8 +411,10 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { stopCh := make(chan struct{}) - ch := c.registerPodSubscriber(podName) - defer c.unregisterPodSubscriber(podName) + subscriber := c.registerPodSubscriber(podName) + defer func() { + subscriber.stopEvent <- struct{}{} + }() defer close(stopCh) err := retryutil.Retry(1*time.Second, 5*time.Second, @@ -420,10 +432,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return nil, fmt.Errorf("could not delete pod: %v", err) } - if err := c.waitForPodDeletion(ch); err != nil { + if err := c.waitForPodDeletion(subscriber.podEvents); err != nil { return nil, err } - pod, err := c.waitForPodLabel(ch, stopCh, nil) + pod, err := c.waitForPodLabel(subscriber.podEvents, stopCh, nil) if err != nil { return nil, err }