Skip to content

Commit

Permalink
cherry pick pingcap#2705 to release-1.1
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
DanielZhangQD authored and ti-srebot committed Jun 18, 2020
1 parent a710b21 commit 61624b9
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 4 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Expand Up @@ -194,10 +194,18 @@ func (tc *TidbCluster) PDUpgrading() bool {
return tc.Status.PD.Phase == UpgradePhase
}

func (tc *TidbCluster) PDScaling() bool {
return tc.Status.PD.Phase == ScalePhase
}

func (tc *TidbCluster) TiKVUpgrading() bool {
return tc.Status.TiKV.Phase == UpgradePhase
}

func (tc *TidbCluster) TiKVScaling() bool {
return tc.Status.TiKV.Phase == ScalePhase
}

func (tc *TidbCluster) TiDBUpgrading() bool {
return tc.Status.TiDB.Phase == UpgradePhase
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Expand Up @@ -61,6 +61,8 @@ const (
NormalPhase MemberPhase = "Normal"
// UpgradePhase represents the upgrade state of TiDB cluster.
UpgradePhase MemberPhase = "Upgrade"
// ScalePhase represents the scaling state of TiDB cluster.
ScalePhase MemberPhase = "Scale"
)

// ConfigUpdateStrategy represents the strategy to update configuration
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/member/pd_member_manager.go
Expand Up @@ -315,6 +315,8 @@ func (pmm *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set
}
if upgrading {
tc.Status.PD.Phase = v1alpha1.UpgradePhase
} else if tc.PDStsDesiredReplicas() != *set.Spec.Replicas {
tc.Status.PD.Phase = v1alpha1.ScalePhase
} else {
tc.Status.PD.Phase = v1alpha1.NormalPhase
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/member/pd_member_manager_test.go
Expand Up @@ -317,7 +317,7 @@ func TestPDMemberManagerSyncUpdate(t *testing.T) {
},
expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.ClusterID).To(Equal("1"))
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.ScalePhase))
g.Expect(tc.Status.PD.StatefulSet.ObservedGeneration).To(Equal(int64(1)))
g.Expect(len(tc.Status.PD.Members)).To(Equal(3))
g.Expect(tc.Status.PD.Members["pd1"].Health).To(Equal(true))
Expand Down Expand Up @@ -727,7 +727,7 @@ func TestPDMemberManagerSyncPDSts(t *testing.T) {
g.Expect(*set.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.ScalePhase))
},
},
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/manager/member/pd_upgrader.go
Expand Up @@ -52,6 +52,16 @@ func (pu *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Sta
if !tc.Status.PD.Synced {
return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed,can not to be upgraded", ns, tcName)
}
if tc.PDScaling() {
klog.Infof("TidbCluster: [%s/%s]'s pd is scaling, can not upgrade pd",
ns, tcName)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
}
newSet.Spec.Template.Spec = *podSpec
return nil
}

tc.Status.PD.Phase = v1alpha1.UpgradePhase
if !templateEqual(newSet, oldSet) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/manager/member/pd_upgrader_test.go
Expand Up @@ -162,6 +162,25 @@ func TestPDUpgraderUpgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(controller.Int32Ptr(3)))
},
},
{
name: "pd scaling",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Synced = true
tc.Status.PD.Phase = v1alpha1.ScalePhase
},
changePods: nil,
changeOldSet: func(set *apps.StatefulSet) {
set.Spec.Template.Spec.Containers[0].Image = "pd-test-image:old"
},
transferLeaderErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.ScalePhase))
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(controller.Int32Ptr(3)))
},
},
{
name: "update revision equals current revision",
changeFn: func(tc *v1alpha1.TidbCluster) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/member/tikv_member_manager.go
Expand Up @@ -633,6 +633,8 @@ func (tkmm *tikvMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, s
}
if upgrading && tc.Status.PD.Phase != v1alpha1.UpgradePhase {
tc.Status.TiKV.Phase = v1alpha1.UpgradePhase
} else if tc.TiKVStsDesiredReplicas() != *set.Spec.Replicas {
tc.Status.TiKV.Phase = v1alpha1.ScalePhase
} else {
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/manager/member/tikv_member_manager_test.go
Expand Up @@ -799,11 +799,15 @@ func TestTiKVMemberManagerSyncTidbClusterStatus(t *testing.T) {
status := apps.StatefulSetStatus{
Replicas: int32(3),
}
spec := apps.StatefulSetSpec{
Replicas: pointer.Int32Ptr(3),
}
now := metav1.Time{Time: time.Now()}
testFn := func(test *testcase, t *testing.T) {
tc := newTidbClusterForPD()
tc.Status.PD.Phase = v1alpha1.NormalPhase
set := &apps.StatefulSet{
Spec: spec,
Status: status,
}
if test.updateTC != nil {
Expand Down Expand Up @@ -910,6 +914,42 @@ func TestTiKVMemberManagerSyncTidbClusterStatus(t *testing.T) {
g.Expect(tc.Status.TiKV.Phase).To(Equal(v1alpha1.NormalPhase))
},
},
{
name: "statefulset is scaling out",
updateTC: func(tc *v1alpha1.TidbCluster) {
tc.Spec.TiKV.Replicas = 4
},
upgradingFn: func(lister corelisters.PodLister, controlInterface pdapi.PDControlInterface, set *apps.StatefulSet, cluster *v1alpha1.TidbCluster) (bool, error) {
return false, nil
},
errWhenGetStores: false,
storeInfo: nil,
errWhenGetTombstoneStores: false,
tombstoneStoreInfo: nil,
errExpectFn: nil,
tcExpectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.TiKV.StatefulSet.Replicas).To(Equal(int32(3)))
g.Expect(tc.Status.TiKV.Phase).To(Equal(v1alpha1.ScalePhase))
},
},
{
name: "statefulset is scaling in",
updateTC: func(tc *v1alpha1.TidbCluster) {
tc.Spec.TiKV.Replicas = 2
},
upgradingFn: func(lister corelisters.PodLister, controlInterface pdapi.PDControlInterface, set *apps.StatefulSet, cluster *v1alpha1.TidbCluster) (bool, error) {
return false, nil
},
errWhenGetStores: false,
storeInfo: nil,
errWhenGetTombstoneStores: false,
tombstoneStoreInfo: nil,
errExpectFn: nil,
tcExpectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.TiKV.StatefulSet.Replicas).To(Equal(int32(3)))
g.Expect(tc.Status.TiKV.Phase).To(Equal(v1alpha1.ScalePhase))
},
},
{
name: "get stores failed",
updateTC: nil,
Expand Down
4 changes: 3 additions & 1 deletion pkg/manager/member/tikv_scaler.go
Expand Up @@ -58,6 +58,8 @@ func (tsd *tikvScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS
_, ordinal, replicas, deleteSlots := scaleOne(oldSet, newSet)
resetReplicas(newSet, oldSet)
if tc.TiKVUpgrading() {
klog.Infof("TidbCluster: [%s/%s]'s tikv is upgrading, can not scale out until the upgrade completed",
tc.Namespace, tc.Name)
return nil
}

Expand All @@ -81,7 +83,7 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe

// tikv can not scale in when it is upgrading
if tc.TiKVUpgrading() {
klog.Infof("the TidbCluster: [%s/%s]'s tikv is upgrading,can not scale in until upgrade have completed",
klog.Infof("TidbCluster: [%s/%s]'s tikv is upgrading, can not scale in until upgrade completed",
ns, tcName)
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/member/tikv_upgrader.go
Expand Up @@ -55,7 +55,10 @@ func NewTiKVUpgrader(pdControl pdapi.PDControlInterface,
func (tku *tikvUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
if tc.Status.PD.Phase == v1alpha1.UpgradePhase {

if tc.Status.PD.Phase == v1alpha1.UpgradePhase || tc.TiKVScaling() {
klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, tikv status is %v, can not upgrade tikv",
ns, tcName, tc.Status.PD.Phase, tc.Status.TiKV.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
Expand Down
22 changes: 22 additions & 0 deletions pkg/manager/member/tikv_upgrader_test.go
Expand Up @@ -290,6 +290,28 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
{
name: "tikv can not upgrade when it is scaling",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.ScalePhase
tc.Status.TiKV.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
changePods: nil,
beginEvictLeaderErr: false,
endEvictLeaderErr: false,
updatePodErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(tc.Status.TiKV.Phase).To(Equal(v1alpha1.ScalePhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
{
name: "get last apply config error",
changeFn: func(tc *v1alpha1.TidbCluster) {
Expand Down

0 comments on commit 61624b9

Please sign in to comment.