Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 15 additions & 33 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,12 +1032,20 @@ func (c *Cluster) processPodEvent(obj interface{}) error {
return fmt.Errorf("could not cast to PodEvent")
}

// can only take lock when (un)registerPodSubscriber is finshed
c.podSubscribersMu.RLock()
subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)]
c.podSubscribersMu.RUnlock()
if ok {
subscriber <- event
select {
case subscriber <- event:
default:
// ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished)
// avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations
}
}
// hold lock for the time of processing the event to avoid race condition
// with unregisterPodSubscriber closing the channel (see #1876)
c.podSubscribersMu.RUnlock()

return nil
}
Expand Down Expand Up @@ -1501,49 +1509,23 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)

var wg sync.WaitGroup

podLabelErr := make(chan error)
stopCh := make(chan struct{})

wg.Add(1)

go func() {
defer wg.Done()
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)

role := Master

select {
case <-stopCh:
case podLabelErr <- func() (err2 error) {
_, err2 = c.waitForPodLabel(ch, stopCh, &role)
return
}():
}
}()
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)
defer close(stopCh)

if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
if err = <-podLabelErr; err != nil {
_, err = c.waitForPodLabel(ch, stopCh, nil)
if err != nil {
err = fmt.Errorf("could not get master pod label: %v", err)
}
} else {
err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
}

// signal the role label waiting goroutine to close the shop and go home
close(stopCh)
// wait until the goroutine terminates, since unregisterPodSubscriber
// must be called before the outer return; otherwise we risk subscribing to the same pod twice.
wg.Wait()
// close the label waiting channel no sooner than the waiting goroutine terminates.
close(podLabelErr)

return err
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/cluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error {
return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
}

err = retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout,
err = retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
_, err2 := c.KubeClient.Pods(pod.Namespace).Patch(
context.TODO(),
Expand Down Expand Up @@ -151,12 +151,13 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()

if _, ok := c.podSubscribers[podName]; !ok {
ch, ok := c.podSubscribers[podName]
if !ok {
panic("subscriber for pod '" + podName.String() + "' is not found")
}

close(c.podSubscribers[podName])
delete(c.podSubscribers, podName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() before delete() looks a bit safer here. (what does Goalng do to ch when the entry that contains it is removed from c.podSubscribers ?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete only removes the channel from c.podSubscribers map. We can still safely close it afterwards. As the go routine processPodEvent first gets the channel, then writes to it, the idea was to remove the channel from the map first then close it - saving some nanoseconds in this race condition we faced.

close(ch)
}

func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent {
Expand Down Expand Up @@ -399,11 +400,12 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error)
}

func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
stopChan := make(chan struct{})
defer close(stopCh)

err := retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout,
err := retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
err2 := c.KubeClient.Pods(podName.Namespace).Delete(
context.TODO(),
Expand All @@ -421,7 +423,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
if err := c.waitForPodDeletion(ch); err != nil {
return nil, err
}
pod, err := c.waitForPodLabel(ch, stopChan, nil)
pod, err := c.waitForPodLabel(ch, stopCh, nil)
if err != nil {
return nil, err
}
Expand All @@ -446,7 +448,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
continue
}

podName := util.NameFromMeta(pod.ObjectMeta)
podName := util.NameFromMeta(pods[i].ObjectMeta)
newPod, err := c.recreatePod(podName)
if err != nil {
return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (c *Cluster) annotationsSet(annotations map[string]string) map[string]strin
return nil
}

func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) {
func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) {
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
for {
select {
Expand All @@ -332,7 +332,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{
}
case <-timeout:
return nil, fmt.Errorf("pod label wait timeout")
case <-stopChan:
case <-stopCh:
return nil, fmt.Errorf("pod label wait cancelled")
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
panic("could not acquire initial list of clusters")
}

wg.Add(5)
wg.Add(5 + util.Bool2Int(c.opConfig.EnablePostgresTeamCRD))
go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg)
go c.clusterResync(stopCh, wg)
Expand Down
10 changes: 9 additions & 1 deletion pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,20 @@ func testNil(values ...*int32) bool {
return false
}

// Convert int to IntOrString type
// ToIntStr converts int to IntOrString type
func ToIntStr(val int) *intstr.IntOrString {
b := intstr.FromInt(val)
return &b
}

// Bool2Int converts bool to int
func Bool2Int(flag bool) int {
if flag {
return 1
}
return 0
}

// Get int from IntOrString and return max int if string
func IntFromIntStr(intOrStr intstr.IntOrString) int {
if intOrStr.Type == 1 {
Expand Down