Skip to content

Commit

Permalink
Merge branch 'master' into kube-cluster-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
mkabilov committed Oct 16, 2017
2 parents 3e64d1e + 6c4cb4e commit 20817d7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 11 deletions.
5 changes: 1 addition & 4 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
reasons = append(reasons, "new statefulset's container specification doesn't match the current one")
}
if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 {

c.logger.Warningf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta))
return &compareStatefulsetResult{}
}
Expand Down Expand Up @@ -446,10 +445,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
defer c.mu.Unlock()

c.setStatus(spec.ClusterStatusUpdating)
c.logger.Debugf("cluster update from version %q to %q",
c.ResourceVersion, newSpec.ResourceVersion)

/* Make sure we update when this function exists */
/* Make sure we update when this function exits */
defer func() {
c.Postgresql = *newSpec
}()
Expand Down
58 changes: 58 additions & 0 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cluster

import (
"fmt"
"strconv"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -131,13 +133,69 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
return statefulSet, nil
}

func getPodIndex(podName string) (int32, error) {
parts := strings.Split(podName, "-")
if len(parts) == 0 {
return 0, fmt.Errorf("pod has no index part")
}

postfix := parts[len(parts)-1]
res, err := strconv.ParseInt(postfix, 10, 32)
if err != nil {
return 0, fmt.Errorf("could not parse pod index: %v", err)
}

return int32(res), nil
}

func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
masterPod, err := c.getRolePods(Master)
if err != nil {
return fmt.Errorf("could not get master pod: %v", err)
}

podNum, err := getPodIndex(masterPod[0].Name)
if err != nil {
return fmt.Errorf("could not get pod number: %v", err)
}

//Check if scale down affects current master pod
if *newStatefulSet.Spec.Replicas >= podNum+1 {
return nil
}

podName := fmt.Sprintf("%s-0", c.Statefulset.Name)
masterCandidatePod, err := c.KubeClient.Pods(c.OpConfig.Namespace).Get(podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get master candidate pod: %v", err)
}

// some sanity check
if !util.MapContains(masterCandidatePod.Labels, c.OpConfig.ClusterLabels) ||
!util.MapContains(masterCandidatePod.Labels, map[string]string{c.OpConfig.ClusterNameLabel: c.Name}) {
return fmt.Errorf("pod %q does not belong to cluster", podName)
}

if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil {
return fmt.Errorf("could not failover: %v", err)
}

return nil
}

func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
c.setProcessName("updating statefulset")
if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster")
}
statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta)

//scale down
if *c.Statefulset.Spec.Replicas > *newStatefulSet.Spec.Replicas {
if err := c.preScaleDown(newStatefulSet); err != nil {
c.logger.Warningf("could not scale down: %v", err)
}
}
c.logger.Debugf("updating statefulset")

patchData, err := specPatch(newStatefulSet.Spec)
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spe
cluster, ok := c.clusters[clusterName]
c.clustersMu.RUnlock()
if ok {
c.logger.WithField("cluster-name", clusterName).
Debugf("sending %q event of pod %q to the cluster channel",
event.EventType,
event.PodName)
cluster.ReceivePodEvent(event)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(logger *logrus.Entry) *Patroni {
}
}

func (p *Patroni) apiURL(masterPod *v1.Pod) string {
func apiURL(masterPod *v1.Pod) string {
return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort)
}

Expand All @@ -54,7 +54,7 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
return fmt.Errorf("could not encode json: %v", err)
}

request, err := http.NewRequest(http.MethodPost, p.apiURL(master)+failoverPath, buf)
request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf)
if err != nil {
return fmt.Errorf("could not create request: %v", err)
}
Expand All @@ -77,4 +77,4 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
}

return nil
}
}

0 comments on commit 20817d7

Please sign in to comment.