From 52e1f7f459f0609ac1369b9d62ca47e015e56072 Mon Sep 17 00:00:00 2001 From: Yilong Li Date: Wed, 10 Mar 2021 17:46:55 +0800 Subject: [PATCH] Fix support for multiple pvc for pd (#3820) --- docs/api-references/docs.md | 12 ++ pkg/apis/pingcap/v1alpha1/tidbcluster.go | 6 +- pkg/apis/pingcap/v1alpha1/types.go | 21 +-- .../pingcap/v1alpha1/zz_generated.deepcopy.go | 8 ++ .../tidbcluster/tidb_cluster_control.go | 1 + .../member/dm_master_member_manager.go | 2 +- pkg/manager/member/dm_master_scaler.go | 2 +- .../member/dm_worker_member_manager.go | 2 +- pkg/manager/member/dm_worker_scaler.go | 2 +- pkg/manager/member/failover.go | 3 + pkg/manager/member/pd_failover.go | 120 +++++++++++------- pkg/manager/member/pd_failover_test.go | 86 ++++++++++--- pkg/manager/member/pd_member_manager.go | 29 +++-- pkg/manager/member/pd_scaler.go | 33 ++--- pkg/manager/member/pd_scaler_test.go | 48 +++++-- pkg/manager/member/scaler.go | 27 +--- pkg/manager/member/scaler_test.go | 6 +- pkg/manager/member/tidb_member_manager.go | 2 +- pkg/manager/member/tiflash_member_manager.go | 2 +- pkg/manager/member/tiflash_scaler.go | 2 +- pkg/manager/member/tikv_member_manager.go | 2 +- pkg/manager/member/tikv_scaler.go | 6 +- pkg/manager/member/tikv_scaler_test.go | 6 +- pkg/manager/member/utils.go | 26 +++- 24 files changed, 295 insertions(+), 159 deletions(-) diff --git a/docs/api-references/docs.md b/docs/api-references/docs.md index f0978dd9f5..314b841d3b 100644 --- a/docs/api-references/docs.md +++ b/docs/api-references/docs.md @@ -7799,6 +7799,16 @@ k8s.io/apimachinery/pkg/types.UID +pvcUIDSet
+ +map[k8s.io/apimachinery/pkg/types.UID]struct{} + + + + + + + memberDeleted
bool @@ -9073,6 +9083,7 @@ map[string]github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.PDMember +

Members contains PDs in current TidbCluster

@@ -9085,6 +9096,7 @@ map[string]github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.PDMember +

PeerMembers contains PDs NOT in current TidbCluster

diff --git a/pkg/apis/pingcap/v1alpha1/tidbcluster.go b/pkg/apis/pingcap/v1alpha1/tidbcluster.go index 0884d68a97..dd6b0d693a 100644 --- a/pkg/apis/pingcap/v1alpha1/tidbcluster.go +++ b/pkg/apis/pingcap/v1alpha1/tidbcluster.go @@ -334,13 +334,13 @@ func (tc *TidbCluster) PDAutoFailovering() bool { } func (tc *TidbCluster) GetPDDeletedFailureReplicas() int32 { - var failureReplicas int32 = 0 + var deteledReplicas int32 = 0 for _, failureMember := range tc.Status.PD.FailureMembers { if failureMember.MemberDeleted { - failureReplicas++ + deteledReplicas++ } } - return failureReplicas + return deteledReplicas } func (tc *TidbCluster) PDStsDesiredReplicas() int32 { diff --git a/pkg/apis/pingcap/v1alpha1/types.go b/pkg/apis/pingcap/v1alpha1/types.go index 4722735fcc..5c088f7575 100644 --- a/pkg/apis/pingcap/v1alpha1/types.go +++ b/pkg/apis/pingcap/v1alpha1/types.go @@ -940,10 +940,12 @@ type Service struct { // PDStatus is PD status type PDStatus struct { - Synced bool `json:"synced,omitempty"` - Phase MemberPhase `json:"phase,omitempty"` - StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"` - Members map[string]PDMember `json:"members,omitempty"` + Synced bool `json:"synced,omitempty"` + Phase MemberPhase `json:"phase,omitempty"` + StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"` + // Members contains PDs in current TidbCluster + Members map[string]PDMember `json:"members,omitempty"` + // PeerMembers contains PDs NOT in current TidbCluster PeerMembers map[string]PDMember `json:"peerMembers,omitempty"` Leader PDMember `json:"leader,omitempty"` FailureMembers map[string]PDFailureMember `json:"failureMembers,omitempty"` @@ -965,11 +967,12 @@ type PDMember struct { // PDFailureMember is the pd failure member information type PDFailureMember struct { - PodName string `json:"podName,omitempty"` - MemberID string `json:"memberID,omitempty"` - PVCUID types.UID `json:"pvcUID,omitempty"` - MemberDeleted bool `json:"memberDeleted,omitempty"` - CreatedAt metav1.Time `json:"createdAt,omitempty"` + PodName string `json:"podName,omitempty"` + MemberID string `json:"memberID,omitempty"` + PVCUID types.UID `json:"pvcUID,omitempty"` + PVCUIDSet map[types.UID]struct{} `json:"pvcUIDSet,omitempty"` + MemberDeleted bool `json:"memberDeleted,omitempty"` + CreatedAt metav1.Time `json:"createdAt,omitempty"` } // UnjoinedMember is the pd unjoin cluster member information diff --git a/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go index 130bee5f3d..d4698beaf2 100644 --- a/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go @@ -26,6 +26,7 @@ import ( extensionsv1beta1 "k8s.io/api/extensions/v1beta1" v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" runtime "k8s.io/apimachinery/pkg/runtime" + types "k8s.io/apimachinery/pkg/types" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -2732,6 +2733,13 @@ func (in *PDConfigWraper) DeepCopy() *PDConfigWraper { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PDFailureMember) DeepCopyInto(out *PDFailureMember) { *out = *in + if in.PVCUIDSet != nil { + in, out := &in.PVCUIDSet, &out.PVCUIDSet + *out = make(map[types.UID]struct{}, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } in.CreatedAt.DeepCopyInto(&out.CreatedAt) return } diff --git a/pkg/controller/tidbcluster/tidb_cluster_control.go b/pkg/controller/tidbcluster/tidb_cluster_control.go index 5c08905213..644c4674ae 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_control.go +++ b/pkg/controller/tidbcluster/tidb_cluster_control.go @@ -143,6 +143,7 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster) } // cleaning all orphan pods(pd, tikv or tiflash which don't have a related PVC) managed by operator + // this could be useful when failover run into an undesired situation as described in PD failover function skipReasons, err := c.orphanPodsCleaner.Clean(tc) if err != nil { return err diff --git a/pkg/manager/member/dm_master_member_manager.go b/pkg/manager/member/dm_master_member_manager.go index 1c9889bd72..8e5f9a3848 100644 --- a/pkg/manager/member/dm_master_member_manager.go +++ b/pkg/manager/member/dm_master_member_manager.go @@ -351,7 +351,7 @@ func (m *masterMemberManager) syncDMClusterStatus(dc *v1alpha1.DMCluster, set *a dc.Status.Master.Members = masterStatus dc.Status.Master.Leader = dc.Status.Master.Members[leader.Name] dc.Status.Master.Image = "" - c := filterContainer(set, "dm-master") + c := findContainerByName(set, "dm-master") if c != nil { dc.Status.Master.Image = c.Image } diff --git a/pkg/manager/member/dm_master_scaler.go b/pkg/manager/member/dm_master_scaler.go index b12d9d76e8..cbf9fefd05 100644 --- a/pkg/manager/member/dm_master_scaler.go +++ b/pkg/manager/member/dm_master_scaler.go @@ -61,7 +61,7 @@ func (s *masterScaler) ScaleOut(meta metav1.Object, oldSet *apps.StatefulSet, ne dcName := dc.GetName() klog.Infof("scaling out dm-master statefulset %s/%s, ordinal: %d (replicas: %d, delete slots: %v)", oldSet.Namespace, oldSet.Name, ordinal, replicas, deleteSlots.List()) - _, err := s.deleteDeferDeletingPVC(dc, oldSet.GetName(), v1alpha1.DMMasterMemberType, ordinal) + _, err := s.deleteDeferDeletingPVC(dc, v1alpha1.DMMasterMemberType, ordinal) if err != nil { return err } diff --git a/pkg/manager/member/dm_worker_member_manager.go b/pkg/manager/member/dm_worker_member_manager.go index ad135f74d4..6f06b5d504 100644 --- a/pkg/manager/member/dm_worker_member_manager.go +++ b/pkg/manager/member/dm_worker_member_manager.go @@ -280,7 +280,7 @@ func (m *workerMemberManager) syncDMClusterStatus(dc *v1alpha1.DMCluster, set *a dc.Status.Worker.Synced = true dc.Status.Worker.Members = workerStatus dc.Status.Worker.Image = "" - c := filterContainer(set, "dm-worker") + c := findContainerByName(set, "dm-worker") if c != nil { dc.Status.Worker.Image = c.Image } diff --git a/pkg/manager/member/dm_worker_scaler.go b/pkg/manager/member/dm_worker_scaler.go index 7d5ed6b04e..7d58884b9f 100644 --- a/pkg/manager/member/dm_worker_scaler.go +++ b/pkg/manager/member/dm_worker_scaler.go @@ -61,7 +61,7 @@ func (s *workerScaler) ScaleOut(meta metav1.Object, oldSet *apps.StatefulSet, ne dcName := dc.GetName() klog.Infof("scaling out dm-worker statefulset %s/%s, ordinal: %d (replicas: %d, delete slots: %v)", oldSet.Namespace, oldSet.Name, ordinal, replicas, deleteSlots.List()) - _, err := s.deleteDeferDeletingPVC(dc, oldSet.GetName(), v1alpha1.DMWorkerMemberType, ordinal) + _, err := s.deleteDeferDeletingPVC(dc, v1alpha1.DMWorkerMemberType, ordinal) if err != nil { return err } diff --git a/pkg/manager/member/failover.go b/pkg/manager/member/failover.go index 482c729641..87dcac4bb6 100644 --- a/pkg/manager/member/failover.go +++ b/pkg/manager/member/failover.go @@ -15,6 +15,9 @@ package member import "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" +// TODO: move this to a centralized place +// Since the "Unhealthy" is a very universal event reason string, which could apply to all the TiDB/DM cluster components, +// we should make a global event module, and put event related constants there. const ( unHealthEventReason = "Unhealthy" unHealthEventMsgPattern = "%s pod[%s] is unhealthy, msg:%s" diff --git a/pkg/manager/member/pd_failover.go b/pkg/manager/member/pd_failover.go index 4f790f3883..eeb01c2388 100644 --- a/pkg/manager/member/pd_failover.go +++ b/pkg/manager/member/pd_failover.go @@ -25,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog" ) @@ -43,14 +44,15 @@ func NewPDFailover(deps *controller.Dependencies) Failover { // If there are 3 PD members in a tidb cluster with 1 broken member pd-0, pdFailover will do failover in 3 rounds: // 1. mark pd-0 as a failure Member with non-deleted state (MemberDeleted=false) // 2. delete the failure member pd-0, and mark it deleted (MemberDeleted=true) -// 3. PD member manager will add the count of deleted failure members more replicas +// 3. PD member manager will add the `count(deleted failure members)` more replicas +// // If the count of the failure PD member with the deleted state (MemberDeleted=true) is equal or greater than MaxFailoverCount, we will skip failover. func (f *pdFailover) Failover(tc *v1alpha1.TidbCluster) error { ns := tc.GetNamespace() tcName := tc.GetName() if !tc.Status.PD.Synced { - return fmt.Errorf("TidbCluster: %s/%s's pd status sync failed, can't failover", ns, tcName) + return fmt.Errorf("TidbCluster: %s/%s .Status.PD.Synced = false, can't failover", ns, tcName) } if tc.Status.PD.FailureMembers == nil { tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{} @@ -58,8 +60,8 @@ func (f *pdFailover) Failover(tc *v1alpha1.TidbCluster) error { inQuorum, healthCount := f.isPDInQuorum(tc) if !inQuorum { - return fmt.Errorf("TidbCluster: %s/%s's pd cluster is not health: %d/%d, "+ - "replicas: %d, failureCount: %d, can't failover", + return fmt.Errorf("TidbCluster: %s/%s's pd cluster is not healthy, healthy %d / desired %d,"+ + " replicas %d, failureCount %d, can't failover", ns, tcName, healthCount, tc.PDStsDesiredReplicas(), tc.Spec.PD.Replicas, len(tc.Status.PD.FailureMembers)) } @@ -86,7 +88,6 @@ func (f *pdFailover) Recover(tc *v1alpha1.TidbCluster) { func (f *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error { ns := tc.GetNamespace() - tcName := tc.GetName() for pdName, pdMember := range tc.Status.PD.Members { if pdMember.LastTransitionTime.IsZero() { @@ -100,32 +101,35 @@ func (f *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error { if tc.Status.PD.FailureMembers == nil { tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{} } - deadline := pdMember.LastTransitionTime.Add(f.deps.CLIConfig.PDFailoverPeriod) + failoverDeadline := pdMember.LastTransitionTime.Add(f.deps.CLIConfig.PDFailoverPeriod) _, exist := tc.Status.PD.FailureMembers[pdName] - if pdMember.Health || time.Now().Before(deadline) || exist { + if pdMember.Health || time.Now().Before(failoverDeadline) || exist { continue } - ordinal, err := util.GetOrdinalFromPodName(podName) + pod, err := f.deps.PodLister.Pods(ns).Get(podName) if err != nil { - return err + return fmt.Errorf("tryToMarkAPeerAsFailure: failed to get pod %s/%s, error: %s", ns, podName, err) } - pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal) - pvc, err := f.deps.PVCLister.PersistentVolumeClaims(ns).Get(pvcName) + + pvcs, err := util.ResolvePVCFromPod(pod, f.deps.PVCLister) if err != nil { - return fmt.Errorf("tryToMarkAPeerAsFailure: failed to get pvc %s for cluster %s/%s, error: %s", pvcName, ns, tcName, err) + return fmt.Errorf("tryToMarkAPeerAsFailure: failed to get pvcs for pod %s/%s, error: %s", ns, pod.Name, err) } - msg := fmt.Sprintf("pd member[%s] is unhealthy", pdMember.ID) - f.deps.Recorder.Event(tc, apiv1.EventTypeWarning, unHealthEventReason, fmt.Sprintf(unHealthEventMsgPattern, "pd", pdName, msg)) + f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy", "%s/%s(%s) is unhealthy", ns, podName, pdMember.ID) // mark a peer member failed and return an error to skip reconciliation // note that status of tidb cluster will be updated always + pvcUIDSet := make(map[types.UID]struct{}) + for _, pvc := range pvcs { + pvcUIDSet[pvc.UID] = struct{}{} + } tc.Status.PD.FailureMembers[pdName] = v1alpha1.PDFailureMember{ PodName: podName, MemberID: pdMember.ID, - PVCUID: pvc.UID, + PVCUIDSet: pvcUIDSet, MemberDeleted: false, CreatedAt: metav1.Now(), } @@ -135,26 +139,26 @@ func (f *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error { return nil } -// tryToDeleteAFailureMember tries to delete a PD member and associated pod & -// pvc. If this succeeds, new pod & pvc will be created by Kubernetes. -// Note that this will fail if the kubelet on the node which failed pod was -// running on is not responding. +// tryToDeleteAFailureMember tries to delete a PD member and associated Pod & PVC. +// On success, new Pod & PVC will be created. +// Note that this will fail if the kubelet on the node on which failed Pod was running is not responding. func (f *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error { ns := tc.GetNamespace() tcName := tc.GetName() var failureMember *v1alpha1.PDFailureMember var failurePodName string - var failurePdName string + var failurePDName string for pdName, pdMember := range tc.Status.PD.FailureMembers { if !pdMember.MemberDeleted { failureMember = &pdMember failurePodName = strings.Split(pdName, ".")[0] - failurePdName = pdName + failurePDName = pdName break } } if failureMember == nil { + klog.Infof("No PD FailureMembers to delete for tc %s/%s", ns, tcName) return nil } @@ -163,50 +167,65 @@ func (f *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error { return err } // invoke deleteMember api to delete a member from the pd cluster - err = controller.GetPDClient(f.deps.PDControl, tc).DeleteMemberByID(memberID) - if err != nil { - klog.Errorf("pd failover: failed to delete member: %d, %v", memberID, err) + if err := controller.GetPDClient(f.deps.PDControl, tc).DeleteMemberByID(memberID); err != nil { + klog.Errorf("pd failover[tryToDeleteAFailureMember]: failed to delete member %s/%s(%d), error: %v", ns, failurePodName, memberID, err) return err } - klog.Infof("pd failover: delete member: %d successfully", memberID) - f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberDeleted", - "%s(%d) deleted from cluster", failurePodName, memberID) + klog.Infof("pd failover[tryToDeleteAFailureMember]: delete member %s/%s(%d) successfully", ns, failurePodName, memberID) + f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberDeleted", "failure member %s/%s(%d) deleted from PD cluster", ns, failurePodName, memberID) // The order of old PVC deleting and the new Pod creating is not guaranteed by Kubernetes. - // If new Pod is created before old PVC deleted, new Pod will reuse old PVC. - // So we must try to delete the PVC and Pod of this PD peer over and over, - // and let StatefulSet create the new PD peer with the same ordinal, but don't use the tombstone PV + // If new Pod is created before old PVCs are deleted, the Statefulset will try to use the old PVCs and skip creating new PVCs. + // This could result in 2 possible cases: + // 1. If the old PVCs are first mounted successfully by the new Pod, the following pvc deletion will fail and return error. + // We will try to delete the Pod and PVCs again in the next requeued run. + // 2. If the old PVCs are first deleted successfully here, the new Pods will try to mount non-existing PVCs, which will pend forever. + // This is where OrphanPodsCleaner kicks in, which will delete the pending Pods in this situation. + // Please refer to orphan_pods_cleaner.go for details. pod, err := f.deps.PodLister.Pods(ns).Get(failurePodName) if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("tryToDeleteAFailureMember: failed to get pods %s for cluster %s/%s, error: %s", failurePodName, ns, tcName, err) + return fmt.Errorf("pd failover[tryToDeleteAFailureMember]: failed to get pod %s/%s for tc %s/%s, error: %s", ns, failurePodName, ns, tcName, err) + } + if pod != nil { + if pod.DeletionTimestamp == nil { + if err := f.deps.PodControl.DeletePod(tc, pod); err != nil { + return err + } + } + } else { + klog.Infof("pd failover[tryToDeleteAFailureMember]: failure pod %s/%s not found, skip", ns, failurePodName) } ordinal, err := util.GetOrdinalFromPodName(failurePodName) if err != nil { - return err + return fmt.Errorf("pd failover[tryToDeleteAFailureMember]: failed to parse ordinal from Pod name for %s/%s, error: %s", ns, failurePodName, err) } - pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal) - pvc, err := f.deps.PVCLister.PersistentVolumeClaims(ns).Get(pvcName) + pvcSelector, err := getPVCSelectorForPod(tc, v1alpha1.PDMemberType, ordinal) + if err != nil { + return fmt.Errorf("pd failover[tryToDeleteAFailureMember]: failed to get PVC selector for Pod %s/%s, error: %s", ns, failurePodName, err) + } + pvcs, err := f.deps.PVCLister.PersistentVolumeClaims(ns).List(pvcSelector) if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("tryToDeleteAFailureMember: failed to get pvc %s for cluster %s/%s, error: %s", pvcName, ns, tcName, err) + return fmt.Errorf("pd failover[tryToDeleteAFailureMember]: failed to get PVCs for pod %s/%s, error: %s", ns, failurePodName, err) } - if pod != nil && pod.DeletionTimestamp == nil { - err := f.deps.PodControl.DeletePod(tc, pod) - if err != nil { - return err + for _, pvc := range pvcs { + _, pvcUIDExist := failureMember.PVCUIDSet[pvc.GetUID()] + // for backward compatibility, if there exists failureMembers and user upgrades operator to newer version + // there will be failure member structures with PVCUID set from api server, we should handle this as pvcUIDExist == true + if pvc.GetUID() == failureMember.PVCUID { + pvcUIDExist = true } - } - if pvc != nil && pvc.DeletionTimestamp == nil && pvc.GetUID() == failureMember.PVCUID { - err = f.deps.PVCControl.DeletePVC(tc, pvc) - if err != nil { - klog.Errorf("pd failover: failed to delete pvc: %s/%s, %v", ns, pvcName, err) - return err + if pvc.DeletionTimestamp == nil && pvcUIDExist { + if err := f.deps.PVCControl.DeletePVC(tc, pvc); err != nil { + klog.Errorf("pd failover[tryToDeleteAFailureMember]: failed to delete PVC: %s/%s, error: %s", ns, pvc.Name, err) + return err + } + klog.Infof("pd failover[tryToDeleteAFailureMember]: delete PVC %s/%s successfully", ns, pvc.Name) } - klog.Infof("pd failover: pvc: %s/%s successfully", ns, pvcName) } - setMemberDeleted(tc, failurePdName) + setMemberDeleted(tc, failurePDName) return nil } @@ -230,19 +249,22 @@ func setMemberDeleted(tc *v1alpha1.TidbCluster, pdName string) { klog.Infof("pd failover: set pd member: %s/%s deleted", tc.GetName(), pdName) } +// is healthy PD more than a half func (f *pdFailover) isPDInQuorum(tc *v1alpha1.TidbCluster) (bool, int) { healthCount := 0 + ns := tc.GetNamespace() for podName, pdMember := range tc.Status.PD.Members { if pdMember.Health { healthCount++ } else { - f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy", - "%s(%s) is unhealthy", podName, pdMember.ID) + f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy", "%s/%s(%s) is unhealthy", ns, podName, pdMember.ID) } } for _, pdMember := range tc.Status.PD.PeerMembers { if pdMember.Health { healthCount++ + } else { + f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDPeerMemberUnhealthy", "%s(%s) is unhealthy", pdMember.Name, pdMember.ID) } } return healthCount > (len(tc.Status.PD.Members)+len(tc.Status.PD.PeerMembers))/2, healthCount diff --git a/pkg/manager/member/pd_failover_test.go b/pkg/manager/member/pd_failover_test.go index c0a98ff08b..fdedd97846 100644 --- a/pkg/manager/member/pd_failover_test.go +++ b/pkg/manager/member/pd_failover_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/gomega" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" + "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/pdapi" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -168,7 +169,7 @@ func TestPDFailoverFailover(t *testing.T) { g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) events := collectEvents(recorder.Events) sort.Strings(events) - g.Expect(events).To(HaveLen(1)) + g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) }, }, @@ -192,7 +193,7 @@ func TestPDFailoverFailover(t *testing.T) { g.Expect(pd1.MemberDeleted).To(Equal(true)) events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(1)) - g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster")) + g.Expect(events[0]).To(ContainSubstring("failure member default/test-pd-1(12891273174085095651) deleted from PD cluster")) }, }, { @@ -260,7 +261,7 @@ func TestPDFailoverFailover(t *testing.T) { statusSyncFailed: false, errExpectFn: func(g *GomegaWithT, err error) { g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "persistentvolumeclaim \"pd-test-pd-1\" not found")).To(Equal(true)) + g.Expect(err.Error()).To(ContainSubstring("no pvc found for pod default/test-pd-1")) }, expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) { g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) @@ -291,12 +292,14 @@ func TestPDFailoverFailover(t *testing.T) { failureMembers := tc.Status.PD.FailureMembers["test-pd-1"] g.Expect(failureMembers.PodName).To(Equal("test-pd-1")) g.Expect(failureMembers.MemberID).To(Equal("12891273174085095651")) - g.Expect(string(failureMembers.PVCUID)).To(Equal("pvc-1-uid")) + g.Expect(string(failureMembers.PVCUID)).To(Equal("")) + g.Expect(failureMembers.PVCUIDSet).To(HaveKey(types.UID("pvc-1-uid-1"))) + g.Expect(failureMembers.PVCUIDSet).To(HaveKey(types.UID("pvc-1-uid-2"))) g.Expect(failureMembers.MemberDeleted).To(BeFalse()) events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) - g.Expect(events[1]).To(ContainSubstring("Unhealthy pd pod[test-pd-1] is unhealthy, msg:pd member[12891273174085095651] is unhealthy")) + g.Expect(events[1]).To(ContainSubstring("PDMemberUnhealthy default/test-pd-1(12891273174085095651) is unhealthy")) }, }, { @@ -340,7 +343,7 @@ func TestPDFailoverFailover(t *testing.T) { events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) - g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster")) + g.Expect(events[1]).To(ContainSubstring("failure member default/test-pd-1(12891273174085095651) deleted from PD cluster")) }, }, { @@ -425,7 +428,7 @@ func TestPDFailoverFailover(t *testing.T) { events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) - g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster")) + g.Expect(events[1]).To(ContainSubstring("failure member default/test-pd-1(12891273174085095651) deleted from PD cluster")) }, }, { @@ -452,7 +455,7 @@ func TestPDFailoverFailover(t *testing.T) { events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) - g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster")) + g.Expect(events[1]).To(ContainSubstring("failure member default/test-pd-1(12891273174085095651) deleted from PD cluster")) }, }, { @@ -462,6 +465,7 @@ func TestPDFailoverFailover(t *testing.T) { hasPVC: true, hasPod: true, podWithDeletionTimestamp: true, + pvcWithDeletionTimestamp: false, delMemberFailed: false, delPodFailed: false, delPVCFailed: false, @@ -476,13 +480,16 @@ func TestPDFailoverFailover(t *testing.T) { g.Expect(pd1.MemberDeleted).To(Equal(true)) _, err := pf.deps.PodLister.Pods(metav1.NamespaceDefault).Get(pd1Name) g.Expect(err).NotTo(HaveOccurred()) - _, err = pf.deps.PVCLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName) + _, err = pf.deps.PVCLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName + "-1") + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.IsNotFound(err)).To(BeTrue()) + _, err = pf.deps.PVCLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName + "-2") g.Expect(err).To(HaveOccurred()) g.Expect(errors.IsNotFound(err)).To(BeTrue()) events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) - g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster")) + g.Expect(events[1]).To(ContainSubstring("failure member default/test-pd-1(12891273174085095651) deleted from PD cluster")) }, }, { @@ -508,12 +515,14 @@ func TestPDFailoverFailover(t *testing.T) { _, err := pf.deps.PodLister.Pods(metav1.NamespaceDefault).Get(pd1Name) g.Expect(err).To(HaveOccurred()) g.Expect(errors.IsNotFound(err)).To(BeTrue()) - _, err = pf.deps.PVCLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName) + _, err = pf.deps.PVCLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName + "-1") + g.Expect(err).NotTo(HaveOccurred()) + _, err = pf.deps.PVCLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName + "-2") g.Expect(err).NotTo(HaveOccurred()) events := collectEvents(recorder.Events) g.Expect(events).To(HaveLen(2)) g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy")) - g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster")) + g.Expect(events[1]).To(ContainSubstring("failure member default/test-pd-1(12891273174085095651) deleted from PD cluster")) }, }, } @@ -535,18 +544,48 @@ func TestPDFailoverFailover(t *testing.T) { return nil, nil }) + var pvc1 *corev1.PersistentVolumeClaim + var pvc2 *corev1.PersistentVolumeClaim if test.hasPVC { - pvc := newPVCForPDFailover(tc, v1alpha1.PDMemberType, 1) + pvc1 = newPVCForPDFailover(tc, v1alpha1.PDMemberType, 1) + pvc2 = pvc1.DeepCopy() + pvc1.Name = pvc1.Name + "-1" + pvc1.UID = pvc1.UID + "-1" + pvc2.Name = pvc2.Name + "-2" + pvc2.UID = pvc2.UID + "-2" + if test.pvcWithDeletionTimestamp { - pvc.DeletionTimestamp = &metav1.Time{Time: time.Now()} + pvc1.DeletionTimestamp = &metav1.Time{Time: time.Now()} + pvc2.DeletionTimestamp = &metav1.Time{Time: time.Now()} } - pvcIndexer.Add(pvc) + pvcIndexer.Add(pvc1) + pvcIndexer.Add(pvc2) } + // TODO: all test cases hasPod==true, should we remove this? if test.hasPod { pod := newPodForPDFailover(tc, v1alpha1.PDMemberType, 1) if test.podWithDeletionTimestamp { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} } + if test.hasPVC { + pvc1.ObjectMeta.Labels[label.AnnPodNameKey] = pod.GetName() + pvc2.ObjectMeta.Labels[label.AnnPodNameKey] = pod.GetName() + pod.Spec.Volumes = append(pod.Spec.Volumes, + corev1.Volume{ + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + corev1.Volume{ + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }) + } podIndexer.Add(pod) } if test.delPodFailed { @@ -686,8 +725,12 @@ func oneFailureMember(tc *v1alpha1.TidbCluster) { pd0: {Name: pd0, ID: "0", Health: true}, pd2: {Name: pd2, ID: "2", Health: true}, } + + pvcUIDSet := make(map[types.UID]struct{}) + pvcUIDSet[types.UID("pvc-1-uid-1")] = struct{}{} + pvcUIDSet[types.UID("pvc-1-uid-2")] = struct{}{} tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{ - pd1: {PodName: pd1, PVCUID: "pvc-1-uid", MemberID: "12891273174085095651"}, + pd1: {PodName: pd1, PVCUIDSet: pvcUIDSet, MemberID: "12891273174085095651"}, } } @@ -724,8 +767,12 @@ func oneNotReadyMemberAndAFailureMember(tc *v1alpha1.TidbCluster) { pd1: {Name: pd1, ID: "12891273174085095651", Health: false, LastTransitionTime: metav1.Time{Time: time.Now().Add(-10 * time.Minute)}}, pd2: {Name: pd2, ID: "2", Health: true}, } + + pvcUIDSet := make(map[types.UID]struct{}) + pvcUIDSet[types.UID("pvc-1-uid-1")] = struct{}{} + pvcUIDSet[types.UID("pvc-1-uid-2")] = struct{}{} tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{ - pd1: {PodName: pd1, PVCUID: "pvc-1-uid", MemberID: "12891273174085095651"}, + pd1: {PodName: pd1, PVCUIDSet: pvcUIDSet, MemberID: "12891273174085095651"}, } } @@ -780,6 +827,11 @@ func newPVCForPDFailover(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberTyp Name: ordinalPVCName(memberType, controller.PDMemberName(tc.GetName()), ordinal), Namespace: metav1.NamespaceDefault, UID: types.UID("pvc-1-uid"), + Labels: map[string]string{ + label.NameLabelKey: "tidb-cluster", + label.ManagedByLabelKey: label.TiDBOperator, + label.InstanceLabelKey: "test", + }, }, Spec: corev1.PersistentVolumeClaimSpec{ VolumeName: fmt.Sprintf("pv-%d", ordinal), diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index d7efb1f126..23b4fc235b 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -338,15 +338,14 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a return err } - pattern, err := regexp.Compile(fmt.Sprintf(pdMemberLimitPattern, tc.Name, tc.Name, tc.Namespace, controller.FormatClusterDomainForRegex(tc.Spec.ClusterDomain))) + rePDMembers, err := regexp.Compile(fmt.Sprintf(pdMemberLimitPattern, tc.Name, tc.Name, tc.Namespace, controller.FormatClusterDomainForRegex(tc.Spec.ClusterDomain))) if err != nil { return err } pdStatus := map[string]v1alpha1.PDMember{} peerPDStatus := map[string]v1alpha1.PDMember{} for _, memberHealth := range healthInfo.Healths { - id := memberHealth.MemberID - memberID := fmt.Sprintf("%d", id) + memberID := memberHealth.MemberID var clientURL string if len(memberHealth.ClientUrls) > 0 { clientURL = memberHealth.ClientUrls[0] @@ -354,19 +353,20 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a name := memberHealth.Name if len(name) == 0 { klog.Warningf("PD member: [%d] doesn't have a name, and can't get it from clientUrls: [%s], memberHealth Info: [%v] in [%s/%s]", - id, memberHealth.ClientUrls, memberHealth, ns, tcName) + memberID, memberHealth.ClientUrls, memberHealth, ns, tcName) continue } status := v1alpha1.PDMember{ Name: name, - ID: memberID, + ID: fmt.Sprintf("%d", memberID), ClientURL: clientURL, Health: memberHealth.Health, } status.LastTransitionTime = metav1.Now() - if pattern.Match([]byte(clientURL)) { + // matching `rePDMembers` means `clientURL` is a PD in current tc + if rePDMembers.Match([]byte(clientURL)) { oldPDMember, exist := tc.Status.PD.Members[name] if exist && status.Health == oldPDMember.Health { status.LastTransitionTime = oldPDMember.LastTransitionTime @@ -389,14 +389,11 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a tc.Status.PD.Members = pdStatus tc.Status.PD.PeerMembers = peerPDStatus tc.Status.PD.Image = "" - c := filterContainer(set, "pd") - if c != nil { + if c := findContainerByName(set, "pd"); c != nil { tc.Status.PD.Image = c.Image } - // k8s check - err = m.collectUnjoinedMembers(tc, set, pdStatus) - if err != nil { + if err := m.collectUnjoinedMembers(tc, set, pdStatus); err != nil { return err } return nil @@ -876,6 +873,8 @@ func clusterVersionGreaterThanOrEqualTo4(version string) (bool, error) { return v.Major() >= 4, nil } +// find PD pods in set that have not joined the PD cluster yet. +// pdStatus contains the PD members in the PD cluster. func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set *apps.StatefulSet, pdStatus map[string]v1alpha1.PDMember) error { podSelector, podSelectErr := metav1.LabelSelectorAsSelector(set.Spec.Selector) if podSelectErr != nil { @@ -885,8 +884,11 @@ func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set * if podErr != nil { return fmt.Errorf("collectUnjoinedMembers: failed to list pods for cluster %s/%s, selector %s, error %v", tc.GetNamespace(), tc.GetName(), set.Spec.Selector, podErr) } + + // check all pods in PD sts to see whether it has already joined the PD cluster for _, pod := range pods { var joined = false + // if current PD pod name is in the keys of pdStatus, it has joined the PD cluster for pdName := range pdStatus { ordinal, err := util.GetOrdinalFromPodName(pod.Name) if err != nil { @@ -905,6 +907,7 @@ func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set * if err != nil { return err } + // FIXME: this will only show one PVC UID in status, should use PVCUIDSet according to PDFailureMember pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tc.Name), ordinal) pvc, err := m.deps.PVCLister.PersistentVolumeClaims(tc.Namespace).Get(pvcName) if err != nil { @@ -916,9 +919,7 @@ func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set * CreatedAt: metav1.Now(), } } else { - if tc.Status.PD.UnjoinedMembers != nil { - delete(tc.Status.PD.UnjoinedMembers, pod.Name) - } + delete(tc.Status.PD.UnjoinedMembers, pod.Name) } } return nil diff --git a/pkg/manager/member/pd_scaler.go b/pkg/manager/member/pd_scaler.go index 1edc54a960..e75327c959 100644 --- a/pkg/manager/member/pd_scaler.go +++ b/pkg/manager/member/pd_scaler.go @@ -15,12 +15,11 @@ package member import ( "fmt" - "time" "github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" - "github.com/pingcap/tidb-operator/pkg/label" + "github.com/pingcap/tidb-operator/pkg/util" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,7 +59,7 @@ func (s *pdScaler) ScaleOut(meta metav1.Object, oldSet *apps.StatefulSet, newSet tcName := tc.GetName() klog.Infof("scaling out pd statefulset %s/%s, ordinal: %d (replicas: %d, delete slots: %v)", oldSet.Namespace, oldSet.Name, ordinal, replicas, deleteSlots.List()) - _, err := s.deleteDeferDeletingPVC(tc, oldSet.GetName(), v1alpha1.PDMemberType, ordinal) + _, err := s.deleteDeferDeletingPVC(tc, v1alpha1.PDMemberType, ordinal) if err != nil { return err } @@ -106,7 +105,6 @@ func (s *pdScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newSet resetReplicas(newSet, oldSet) memberName := PdName(tcName, ordinal, tc.Namespace, tc.Spec.ClusterDomain) pdPodName := PdPodName(tcName, ordinal) - setName := oldSet.GetName() if !tc.Status.PD.Synced { return fmt.Errorf("TidbCluster: %s/%s's pd status sync failed, can't scale in now", ns, tcName) @@ -163,31 +161,26 @@ func (s *pdScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newSet err = pdClient.DeleteMember(memberName) if err != nil { - klog.Errorf("pd scale in: failed to delete member %s, %v", memberName, err) + klog.Errorf("pdScaler.ScaleIn: failed to delete member %s, %v", memberName, err) return err } - klog.Infof("pd scale in: delete member %s successfully", memberName) + klog.Infof("pdScaler.ScaleIn: delete member %s successfully", memberName) - pvcName := ordinalPVCName(v1alpha1.PDMemberType, setName, ordinal) - pvc, err := s.deps.PVCLister.PersistentVolumeClaims(ns).Get(pvcName) + pod, err := s.deps.PodLister.Pods(ns).Get(pdPodName) if err != nil { - return fmt.Errorf("pdScaler.ScaleIn: failed to get pvc %s for cluster %s/%s, error: %s", pvcName, ns, tcName, err) + return fmt.Errorf("pdScaler.ScaleIn: failed to get pod %s/%s for pd in tc %s/%s, error: %s", ns, pdPodName, ns, tcName, err) } - if pvc.Annotations == nil { - pvc.Annotations = map[string]string{} + pvcs, err := util.ResolvePVCFromPod(pod, s.deps.PVCLister) + if err != nil { + return fmt.Errorf("pdScaler.ScaleIn: failed to get pvcs for pod %s/%s in tc %s/%s, error: %s", ns, pod.Name, ns, tcName, err) } - now := time.Now().Format(time.RFC3339) - pvc.Annotations[label.AnnPVCDeferDeleting] = now - _, err = s.deps.PVCControl.UpdatePVC(tc, pvc) - if err != nil { - klog.Errorf("pd scale in: failed to set pvc %s/%s annotation: %s to %s", - ns, pvcName, label.AnnPVCDeferDeleting, now) - return err + for _, pvc := range pvcs { + if err := addDeferDeletingAnnoToPVC(tc, pvc, s.deps.PVCControl); err != nil { + return err + } } - klog.Infof("pd scale in: set pvc %s/%s annotation: %s to %s", - ns, pvcName, label.AnnPVCDeferDeleting, now) setReplicasAndDeleteSlots(newSet, replicas, deleteSlots) return nil diff --git a/pkg/manager/member/pd_scaler_test.go b/pkg/manager/member/pd_scaler_test.go index a8b30f897a..b3bfb624d1 100644 --- a/pkg/manager/member/pd_scaler_test.go +++ b/pkg/manager/member/pd_scaler_test.go @@ -59,10 +59,9 @@ func TestPDScalerScaleOut(t *testing.T) { newSet := oldSet.DeepCopy() newSet.Spec.Replicas = pointer.Int32Ptr(7) - scaler, _, pvcIndexer, pvcControl := newFakePDScaler() + scaler, _, pvcIndexer, _, pvcControl := newFakePDScaler() pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType, tc.Name) - pvc.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas) if !test.annoIsNil { pvc.Annotations = map[string]string{} } @@ -255,11 +254,43 @@ func TestPDScalerScaleIn(t *testing.T) { newSet := oldSet.DeepCopy() newSet.Spec.Replicas = pointer.Int32Ptr(3) - scaler, pdControl, pvcIndexer, pvcControl := newFakePDScaler() + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: PdPodName(tc.GetName(), 4), + Namespace: corev1.NamespaceDefault, + CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, + }, + } + + scaler, pdControl, pvcIndexer, podIndexer, pvcControl := newFakePDScaler() + + podIndexer.Add(pod) if test.hasPVC { - pvc := newScaleInPVCForStatefulSet(oldSet, v1alpha1.PDMemberType, tc.Name) - pvcIndexer.Add(pvc) + pvc1 := newScaleInPVCForStatefulSet(oldSet, v1alpha1.PDMemberType, tc.Name) + pvc2 := pvc1.DeepCopy() + pvc1.Name = pvc1.Name + "-1" + pvc1.UID = pvc1.UID + "-1" + pvc2.Name = pvc2.Name + "-2" + pvc2.UID = pvc2.UID + "-2" + pvcIndexer.Add(pvc1) + pvcIndexer.Add(pvc2) + pod.Spec.Volumes = append(pod.Spec.Volumes, + corev1.Volume{ + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + corev1.Volume{ + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }) } pdClient := controller.NewFakePDClient(pdControl, tc) @@ -402,7 +433,7 @@ func TestPDScalerScaleInBlockByOtherComponents(t *testing.T) { newSet := oldSet.DeepCopy() newSet.Spec.Replicas = pointer.Int32Ptr(3) - scaler, _, _, _ := newFakePDScaler() + scaler, _, _, _, _ := newFakePDScaler() tc.Spec.PD.Replicas = 0 @@ -519,13 +550,14 @@ func TestPDScalerScaleInBlockByOtherComponents(t *testing.T) { } } -func newFakePDScaler() (*pdScaler, *pdapi.FakePDControl, cache.Indexer, *controller.FakePVCControl) { +func newFakePDScaler() (*pdScaler, *pdapi.FakePDControl, cache.Indexer, cache.Indexer, *controller.FakePVCControl) { fakeDeps := controller.NewFakeDependencies() pdScaler := &pdScaler{generalScaler: generalScaler{deps: fakeDeps}} pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl) pvcIndexer := fakeDeps.KubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer() + podIndexer := fakeDeps.KubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() pvcControl := fakeDeps.PVCControl.(*controller.FakePVCControl) - return pdScaler, pdControl, pvcIndexer, pvcControl + return pdScaler, pdControl, pvcIndexer, podIndexer, pvcControl } func newStatefulSetForPDScale() *apps.StatefulSet { diff --git a/pkg/manager/member/scaler.go b/pkg/manager/member/scaler.go index bfabe54960..0e5df82a31 100644 --- a/pkg/manager/member/scaler.go +++ b/pkg/manager/member/scaler.go @@ -51,31 +51,15 @@ type generalScaler struct { deps *controller.Dependencies } -func (s *generalScaler) deleteDeferDeletingPVC(controller runtime.Object, - setName string, memberType v1alpha1.MemberType, ordinal int32) (map[string]string, error) { +// TODO: change skipReason to event recorder as in TestPDFailoverFailover +func (s *generalScaler) deleteDeferDeletingPVC(controller runtime.Object, memberType v1alpha1.MemberType, ordinal int32) (map[string]string, error) { meta := controller.(metav1.Object) ns := meta.GetNamespace() + kind := controller.GetObjectKind().GroupVersionKind().Kind // for unit test skipReason := map[string]string{} - var podName, kind string - var l label.Label - switch controller.(type) { - case *v1alpha1.TidbCluster: - podName = ordinalPodName(memberType, meta.GetName(), ordinal) - l = label.New().Instance(meta.GetName()) - l[label.AnnPodNameKey] = podName - kind = v1alpha1.TiDBClusterKind - case *v1alpha1.DMCluster: - podName = ordinalPodName(memberType, meta.GetName(), ordinal) - l = label.NewDM().Instance(meta.GetName()) - // just delete all defer Deleting pvc for convenience. Or dm have to support sync meta info labels for pod/pvc which seems unnecessary - // l[label.AnnPodNameKey] = podName - kind = v1alpha1.DMClusterKind - default: - kind = controller.GetObjectKind().GroupVersionKind().Kind - return nil, fmt.Errorf("%s[%s/%s] has unknown controller", kind, ns, meta.GetName()) - } - selector, err := l.Selector() + + selector, err := getPVCSelectorForPod(controller, memberType, ordinal) if err != nil { return skipReason, fmt.Errorf("%s %s/%s assemble label selector failed, err: %v", kind, ns, meta.GetName(), err) } @@ -88,6 +72,7 @@ func (s *generalScaler) deleteDeferDeletingPVC(controller runtime.Object, } if len(pvcs) == 0 { klog.Infof("%s %s/%s list pvc not found, selector: %s", kind, ns, meta.GetName(), selector) + podName := ordinalPodName(memberType, meta.GetName(), ordinal) skipReason[podName] = skipReasonScalerPVCNotFound return skipReason, nil } diff --git a/pkg/manager/member/scaler_test.go b/pkg/manager/member/scaler_test.go index 5e61af9761..c8a1593c4c 100644 --- a/pkg/manager/member/scaler_test.go +++ b/pkg/manager/member/scaler_test.go @@ -62,7 +62,7 @@ func TestGeneralScalerDeleteAllDeferDeletingPVC(t *testing.T) { pvcControl.SetDeletePVCError(fmt.Errorf("delete pvc failed"), 0) } - skipReason, err := gs.deleteDeferDeletingPVC(tc, setName, test.memberType, test.ordinal) + skipReason, err := gs.deleteDeferDeletingPVC(tc, test.memberType, test.ordinal) if test.pvc != nil { test.expectFn(g, skipReason, err, podName, test.pvc.Name) } else { @@ -98,6 +98,7 @@ func TestGeneralScalerDeleteAllDeferDeletingPVC(t *testing.T) { expectFn: func(g *GomegaWithT, skipReason map[string]string, err error, podName, pvcName string) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(skipReason)).To(Equal(1)) + t.Logf("skipReason: %q", skipReason[podName]) g.Expect(skipReason[podName]).To(Equal(skipReasonScalerPVCNotFound)) }, }, @@ -647,7 +648,6 @@ func TestGeneralScalerDeleteMultiDeferDeletingPVC(t *testing.T) { expectFn func(*GomegaWithT, map[string]string, error, string) } tc := newTidbClusterForPD() - setName := controller.PDMemberName(tc.GetName()) testFn := func(test *testcase, t *testing.T) { t.Logf(test.name) g := NewGomegaWithT(t) @@ -666,7 +666,7 @@ func TestGeneralScalerDeleteMultiDeferDeletingPVC(t *testing.T) { pvcControl.SetDeletePVCError(fmt.Errorf("delete pvc failed"), 0) } - skipReason, err := gs.deleteDeferDeletingPVC(tc, setName, test.memberType, test.ordinal) + skipReason, err := gs.deleteDeferDeletingPVC(tc, test.memberType, test.ordinal) test.expectFn(g, skipReason, err, podName) } tests := []testcase{ diff --git a/pkg/manager/member/tidb_member_manager.go b/pkg/manager/member/tidb_member_manager.go index 48abdc80e2..93110a5bb4 100644 --- a/pkg/manager/member/tidb_member_manager.go +++ b/pkg/manager/member/tidb_member_manager.go @@ -854,7 +854,7 @@ func (m *tidbMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set } tc.Status.TiDB.Members = tidbStatus tc.Status.TiDB.Image = "" - c := filterContainer(set, "tidb") + c := findContainerByName(set, "tidb") if c != nil { tc.Status.TiDB.Image = c.Image } diff --git a/pkg/manager/member/tiflash_member_manager.go b/pkg/manager/member/tiflash_member_manager.go index 3f5cb07a27..a6f6401e35 100644 --- a/pkg/manager/member/tiflash_member_manager.go +++ b/pkg/manager/member/tiflash_member_manager.go @@ -703,7 +703,7 @@ func (m *tiflashMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, s tc.Status.TiFlash.PeerStores = peerStores tc.Status.TiFlash.TombstoneStores = tombstoneStores tc.Status.TiFlash.Image = "" - c := filterContainer(set, "tiflash") + c := findContainerByName(set, "tiflash") if c != nil { tc.Status.TiFlash.Image = c.Image } diff --git a/pkg/manager/member/tiflash_scaler.go b/pkg/manager/member/tiflash_scaler.go index 58c036e571..637318829e 100644 --- a/pkg/manager/member/tiflash_scaler.go +++ b/pkg/manager/member/tiflash_scaler.go @@ -57,7 +57,7 @@ func (s *tiflashScaler) ScaleOut(meta metav1.Object, oldSet *apps.StatefulSet, n resetReplicas(newSet, oldSet) klog.Infof("scaling out tiflash statefulset %s/%s, ordinal: %d (replicas: %d, delete slots: %v)", oldSet.Namespace, oldSet.Name, ordinal, replicas, deleteSlots.List()) - _, err := s.deleteDeferDeletingPVC(tc, oldSet.GetName(), v1alpha1.TiFlashMemberType, ordinal) + _, err := s.deleteDeferDeletingPVC(tc, v1alpha1.TiFlashMemberType, ordinal) if err != nil { return err } diff --git a/pkg/manager/member/tikv_member_manager.go b/pkg/manager/member/tikv_member_manager.go index 6102ce6550..bbe95657f6 100644 --- a/pkg/manager/member/tikv_member_manager.go +++ b/pkg/manager/member/tikv_member_manager.go @@ -770,7 +770,7 @@ func (m *tikvMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set tc.Status.TiKV.TombstoneStores = tombstoneStores tc.Status.TiKV.BootStrapped = true tc.Status.TiKV.Image = "" - c := filterContainer(set, "tikv") + c := findContainerByName(set, "tikv") if c != nil { tc.Status.TiKV.Image = c.Image } diff --git a/pkg/manager/member/tikv_scaler.go b/pkg/manager/member/tikv_scaler.go index 65b6fb0a98..71d05badc7 100644 --- a/pkg/manager/member/tikv_scaler.go +++ b/pkg/manager/member/tikv_scaler.go @@ -69,7 +69,7 @@ func (s *tikvScaler) ScaleOut(meta metav1.Object, oldSet *apps.StatefulSet, newS } _, err := s.deps.PVCLister.PersistentVolumeClaims(meta.GetNamespace()).Get(pvcName) if err == nil { - _, err = s.deleteDeferDeletingPVC(obj, oldSet.GetName(), v1alpha1.TiKVMemberType, ordinal) + _, err = s.deleteDeferDeletingPVC(obj, v1alpha1.TiKVMemberType, ordinal) if err != nil { return err } @@ -124,10 +124,10 @@ func (s *tikvScaler) ScaleIn(meta metav1.Object, oldSet *apps.StatefulSet, newSe } if state != v1alpha1.TiKVStateOffline { if err := controller.GetPDClient(s.deps.PDControl, tc).DeleteStore(id); err != nil { - klog.Errorf("tikv scale in: failed to delete store %d, %v", id, err) + klog.Errorf("tikvScaler.ScaleIn: failed to delete store %d, %v", id, err) return err } - klog.Infof("tikv scale in: delete store %d for tikv %s/%s successfully", id, ns, podName) + klog.Infof("tikvScaler.ScaleIn: delete store %d for tikv %s/%s successfully", id, ns, podName) } return controller.RequeueErrorf("TiKV %s/%s store %d is still in cluster, state: %s", ns, podName, id, state) } diff --git a/pkg/manager/member/tikv_scaler_test.go b/pkg/manager/member/tikv_scaler_test.go index 8a39a33207..f6d91587ac 100644 --- a/pkg/manager/member/tikv_scaler_test.go +++ b/pkg/manager/member/tikv_scaler_test.go @@ -199,8 +199,10 @@ func TestTiKVScalerScaleIn(t *testing.T) { if test.hasPVC { pvc1 := newScaleInPVCForStatefulSet(oldSet, v1alpha1.TiKVMemberType, tc.Name) pvc2 := pvc1.DeepCopy() - pvc1.Name = pvc1.Name + "1" - pvc2.Name = pvc2.Name + "2" + pvc1.Name = pvc1.Name + "-1" + pvc1.UID = pvc1.UID + "-1" + pvc2.Name = pvc2.Name + "-2" + pvc2.UID = pvc2.UID + "-2" pvcIndexer.Add(pvc1) pvcIndexer.Add(pvc2) pod.Spec.Volumes = append(pod.Spec.Volumes, diff --git a/pkg/manager/member/utils.go b/pkg/manager/member/utils.go index 92d3955341..4b2236400d 100644 --- a/pkg/manager/member/utils.go +++ b/pkg/manager/member/utils.go @@ -31,6 +31,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" @@ -307,8 +308,8 @@ func UpdateStatefulSet(setCtl controller.StatefulSetControlInterface, object run return nil } -// filter targetContainer by containerName, If not find, then return nil -func filterContainer(sts *apps.StatefulSet, containerName string) *corev1.Container { +// findContainerByName finds targetContainer by containerName, If not find, then return nil +func findContainerByName(sts *apps.StatefulSet, containerName string) *corev1.Container { for _, c := range sts.Spec.Template.Spec.Containers { if c.Name == containerName { return &c @@ -517,3 +518,24 @@ func addDeferDeletingAnnoToPVC(tc *v1alpha1.TidbCluster, pvc *corev1.PersistentV klog.Infof("set PVC %s/%s annotationq %q to %q successfully", tc.Namespace, pvc.Name, label.AnnPVCDeferDeleting, now) return nil } + +func getPVCSelectorForPod(controller runtime.Object, memberType v1alpha1.MemberType, ordinal int32) (labels.Selector, error) { + meta := controller.(metav1.Object) + var podName string + var l label.Label + switch controller.(type) { + case *v1alpha1.TidbCluster: + podName = ordinalPodName(memberType, meta.GetName(), ordinal) + l = label.New().Instance(meta.GetName()) + l[label.AnnPodNameKey] = podName + case *v1alpha1.DMCluster: + // podName = ordinalPodName(memberType, meta.GetName(), ordinal) + l = label.NewDM().Instance(meta.GetName()) + // just delete all defer Deleting pvc for convenience. Or dm have to support sync meta info labels for pod/pvc which seems unnecessary + // l[label.AnnPodNameKey] = podName + default: + kind := controller.GetObjectKind().GroupVersionKind().Kind + return nil, fmt.Errorf("object %s/%s of kind %s has unknown controller", meta.GetNamespace(), meta.GetName(), kind) + } + return l.Selector() +}