Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,16 +1031,21 @@ func (c *Cluster) processPodEvent(obj interface{}) error {
if !ok {
return fmt.Errorf("could not cast to PodEvent")
}
podName := spec.NamespacedName(event.PodName)

// can only take lock when (un)registerPodSubscriber is finshed
c.podSubscribersMu.RLock()
subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)]
subscriber, ok := c.podSubscribers[podName]
if ok {
select {
case subscriber <- event:
default:
// ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished)
// avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations
if event.EventType == PodEventEnd {
c.unregisterPodSubscriber(podName)
} else {
select {
case subscriber <- event:
default:
// ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished)
// avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations
}
}
}
// hold lock for the time of processing the event to avoid race condition
Expand Down Expand Up @@ -1510,8 +1515,13 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)
// create buffered channel so that go routine with processPodEvent does not block when
// consumer waitForPodLabel is already finished. Channel size should be big enough to
// receive an event for all pod label wait timeout + patroni API timeout seconds
chanSize := int(c.OpConfig.PodLabelWaitTimeout.Seconds() + c.OpConfig.PatroniAPICheckTimeout.Seconds())
ch := c.registerBufferedPodSubscriber(candidate, chanSize)
// send special end event to trigger removal of PodEvent channel by processPodEvent
defer c.ReceivePodEvent(PodEvent{PodName: types.NamespacedName(candidate), EventType: PodEventEnd})
defer close(stopCh)

if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
Expand Down
30 changes: 23 additions & 7 deletions pkg/cluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ func (c *Cluster) deletePods() error {

func (c *Cluster) deletePod(podName spec.NamespacedName) error {
c.setProcessName("deleting pod %q", podName)
ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
ch := c.registerUnbufferedPodSubscriber(podName)
// send special end event to trigger removal of PodEvent channel by processPodEvent
defer c.ReceivePodEvent(PodEvent{PodName: types.NamespacedName(podName), EventType: PodEventEnd})

if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
return err
Expand All @@ -155,17 +156,31 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
if !ok {
panic("subscriber for pod '" + podName.String() + "' is not found")
}

delete(c.podSubscribers, podName)
close(ch)
}

func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent {
func (c *Cluster) registerUnbufferedPodSubscriber(podName spec.NamespacedName) chan PodEvent {
return c.registerPodSubscriber(podName, 0)
}

func (c *Cluster) registerBufferedPodSubscriber(podName spec.NamespacedName, chanSize int) chan PodEvent {
return c.registerPodSubscriber(podName, chanSize)
}

func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName, chanSize int) chan PodEvent {
c.logger.Debugf("subscribing to pod %q", podName)
var ch chan PodEvent
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()

ch := make(chan PodEvent)
// buffered or unbuffered PodEvent channel
if chanSize > 0 {
ch = make(chan PodEvent, chanSize)
} else {
ch = make(chan PodEvent)
}

if _, ok := c.podSubscribers[podName]; ok {
panic("pod '" + podName.String() + "' is already subscribed")
}
Expand Down Expand Up @@ -401,8 +416,9 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error)

func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
ch := c.registerUnbufferedPodSubscriber(podName)
// send special end event to trigger removal of PodEvent channel by processPodEvent
defer c.ReceivePodEvent(PodEvent{PodName: types.NamespacedName(podName), EventType: PodEventEnd})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FxKu a bit confusing with the function name, as it sends event, not receives.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also stumbled over the name. But maybe I can get around sending artificial podEvents, after all, see #1891

defer close(stopCh)

err := retryutil.Retry(1*time.Second, 5*time.Second,
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
PodEventAdd PodEventType = "ADD"
PodEventUpdate PodEventType = "UPDATE"
PodEventDelete PodEventType = "DELETE"
PodEventEnd PodEventType = "END"
)

// PodEvent describes the event for a single Pod
Expand Down