Skip to content

Commit

Permalink
add check if current master is affected
Browse files Browse the repository at this point in the history
  • Loading branch information
mkabilov committed Oct 16, 2017
1 parent f609c41 commit b4cbb12
Showing 1 changed file with 45 additions and 14 deletions.
59 changes: 45 additions & 14 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cluster

import (
"fmt"
"strconv"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -119,26 +121,54 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
return statefulSet, nil
}

func (c *Cluster) scaleDown(newStatefulSet *v1beta1.StatefulSet) error {
podName := fmt.Sprintf("%s-0", c.Statefulset.Name) //TODO:
func getPodNumb(podName string) (int32, error) {
parts := strings.Split(podName, "-")
if len(parts) == 0 {
return 0, fmt.Errorf("no postfix")
}

postfix := parts[len(parts)-1]
res, err := strconv.ParseInt(postfix, 10, 64)
if err != nil {
return 0, fmt.Errorf("couldn not parse postfix: %v", err)
}

return int32(res), nil
}

func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
masterPod, err := c.getRolePods(Master)
if err != nil {
return fmt.Errorf("could not get master pod: %v", err)
}

podNum, err := getPodNumb(masterPod[0].Name)
if err != nil {
return fmt.Errorf("could not get pod number: %v", err)
}

//Check if scale down affects current master pod
if (*c.Statefulset.Spec.Replicas < podNum+1) ||
(*newStatefulSet.Spec.Replicas > podNum+1) {
return nil
}

podName := fmt.Sprintf("%s-0", c.Statefulset.Name)
masterCandidatePod, err := c.KubeClient.Pods(c.OpConfig.Namespace).Get(podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get master candidate pod: %v", err)
}

//TODO: check if the pod is master pod
if c.podSpiloRole(masterCandidatePod) == string(Master) {
return nil
}

// some sanity check
if !util.MapContains(masterCandidatePod.Labels, c.OpConfig.ClusterLabels) ||
!util.MapContains(masterCandidatePod.Labels, map[string]string{c.OpConfig.ClusterNameLabel:c.Name}) {
!util.MapContains(masterCandidatePod.Labels, map[string]string{c.OpConfig.ClusterNameLabel: c.Name}) {
return fmt.Errorf("pod %q does not belong to cluster", podName)
}

masterPod, err := c.getRolePods(Master)
if err != nil {
return fmt.Errorf("could not get master pod: %v", err)
}

if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil {
return fmt.Errorf("could not failover: %v", err)
}
Expand All @@ -153,18 +183,19 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
}
statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta)

//scale down
if *c.Statefulset.Spec.Replicas > *newStatefulSet.Spec.Replicas {
if err := c.preScaleDown(newStatefulSet); err != nil {
return fmt.Errorf("could not scale down: %v", err)
}
}
c.logger.Debugf("updating statefulset")

patchData, err := specPatch(newStatefulSet.Spec)
if err != nil {
return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err)
}

//Scale down
if *c.Statefulset.Spec.Replicas > *newStatefulSet.Spec.Replicas {
c.scaleDown(newStatefulSet)
}

statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
c.Statefulset.Name,
types.MergePatchType,
Expand Down

0 comments on commit b4cbb12

Please sign in to comment.