Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix support for multiple pvc for pd #3820

Merged
merged 24 commits into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/api-references/docs.md
Expand Up @@ -7799,6 +7799,16 @@ k8s.io/apimachinery/pkg/types.UID
</tr>
<tr>
<td>
<code>pvcUIDSet</code></br>
<em>
map[k8s.io/apimachinery/pkg/types.UID]struct{}
</em>
</td>
<td>
</td>
</tr>
<tr>
<td>
<code>memberDeleted</code></br>
<em>
bool
Expand Down Expand Up @@ -9073,6 +9083,7 @@ map[string]github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.PDMember
</em>
</td>
<td>
<p>Members contains PDs in current TidbCluster</p>
</td>
</tr>
<tr>
Expand All @@ -9085,6 +9096,7 @@ map[string]github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.PDMember
</em>
</td>
<td>
<p>PeerMembers contains PDs NOT in current TidbCluster</p>
</td>
</tr>
<tr>
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Expand Up @@ -334,13 +334,13 @@ func (tc *TidbCluster) PDAutoFailovering() bool {
}

func (tc *TidbCluster) GetPDDeletedFailureReplicas() int32 {
var failureReplicas int32 = 0
var deteledReplicas int32 = 0
for _, failureMember := range tc.Status.PD.FailureMembers {
if failureMember.MemberDeleted {
failureReplicas++
deteledReplicas++
}
}
return failureReplicas
return deteledReplicas
}

func (tc *TidbCluster) PDStsDesiredReplicas() int32 {
Expand Down
21 changes: 12 additions & 9 deletions pkg/apis/pingcap/v1alpha1/types.go
Expand Up @@ -940,10 +940,12 @@ type Service struct {

// PDStatus is PD status
type PDStatus struct {
Synced bool `json:"synced,omitempty"`
Phase MemberPhase `json:"phase,omitempty"`
StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"`
Members map[string]PDMember `json:"members,omitempty"`
Synced bool `json:"synced,omitempty"`
Phase MemberPhase `json:"phase,omitempty"`
StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"`
// Members contains PDs in current TidbCluster
Members map[string]PDMember `json:"members,omitempty"`
// PeerMembers contains PDs NOT in current TidbCluster
PeerMembers map[string]PDMember `json:"peerMembers,omitempty"`
Leader PDMember `json:"leader,omitempty"`
FailureMembers map[string]PDFailureMember `json:"failureMembers,omitempty"`
Expand All @@ -965,11 +967,12 @@ type PDMember struct {

// PDFailureMember is the pd failure member information
type PDFailureMember struct {
PodName string `json:"podName,omitempty"`
MemberID string `json:"memberID,omitempty"`
PVCUID types.UID `json:"pvcUID,omitempty"`
MemberDeleted bool `json:"memberDeleted,omitempty"`
CreatedAt metav1.Time `json:"createdAt,omitempty"`
PodName string `json:"podName,omitempty"`
MemberID string `json:"memberID,omitempty"`
PVCUID types.UID `json:"pvcUID,omitempty"`
PVCUIDSet map[types.UID]struct{} `json:"pvcUIDSet,omitempty"`
MemberDeleted bool `json:"memberDeleted,omitempty"`
CreatedAt metav1.Time `json:"createdAt,omitempty"`
}

// UnjoinedMember is the pd unjoin cluster member information
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/controller/tidbcluster/tidb_cluster_control.go
Expand Up @@ -143,6 +143,7 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster)
}

// cleaning all orphan pods(pd, tikv or tiflash which don't have a related PVC) managed by operator
// this could be useful when failover run into an undesired situation as described in PD failover function
skipReasons, err := c.orphanPodsCleaner.Clean(tc)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/member/dm_master_member_manager.go
Expand Up @@ -351,7 +351,7 @@ func (m *masterMemberManager) syncDMClusterStatus(dc *v1alpha1.DMCluster, set *a
dc.Status.Master.Members = masterStatus
dc.Status.Master.Leader = dc.Status.Master.Members[leader.Name]
dc.Status.Master.Image = ""
c := filterContainer(set, "dm-master")
c := findContainerByName(set, "dm-master")
if c != nil {
dc.Status.Master.Image = c.Image
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/member/dm_worker_member_manager.go
Expand Up @@ -280,7 +280,7 @@ func (m *workerMemberManager) syncDMClusterStatus(dc *v1alpha1.DMCluster, set *a
dc.Status.Worker.Synced = true
dc.Status.Worker.Members = workerStatus
dc.Status.Worker.Image = ""
c := filterContainer(set, "dm-worker")
c := findContainerByName(set, "dm-worker")
if c != nil {
dc.Status.Worker.Image = c.Image
}
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/member/failover.go
Expand Up @@ -15,6 +15,7 @@ package member

import "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"

// TODO: move this to a centralized place
dragonly marked this conversation as resolved.
Show resolved Hide resolved
const (
unHealthEventReason = "Unhealthy"
unHealthEventMsgPattern = "%s pod[%s] is unhealthy, msg:%s"
Expand Down
118 changes: 68 additions & 50 deletions pkg/manager/member/pd_failover.go
Expand Up @@ -25,6 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
)

Expand All @@ -43,23 +44,24 @@ func NewPDFailover(deps *controller.Dependencies) Failover {
// If there are 3 PD members in a tidb cluster with 1 broken member pd-0, pdFailover will do failover in 3 rounds:
// 1. mark pd-0 as a failure Member with non-deleted state (MemberDeleted=false)
// 2. delete the failure member pd-0, and mark it deleted (MemberDeleted=true)
// 3. PD member manager will add the count of deleted failure members more replicas
// 3. PD member manager will add the `count(deleted failure members)` more replicas
//
// If the count of the failure PD member with the deleted state (MemberDeleted=true) is equal or greater than MaxFailoverCount, we will skip failover.
func (f *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

if !tc.Status.PD.Synced {
return fmt.Errorf("TidbCluster: %s/%s's pd status sync failed, can't failover", ns, tcName)
return fmt.Errorf("TidbCluster: %s/%s .Status.PD.Synced = false, can't failover", ns, tcName)
}
if tc.Status.PD.FailureMembers == nil {
tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{}
}

inQuorum, healthCount := f.isPDInQuorum(tc)
if !inQuorum {
return fmt.Errorf("TidbCluster: %s/%s's pd cluster is not health: %d/%d, "+
"replicas: %d, failureCount: %d, can't failover",
return fmt.Errorf("TidbCluster: %s/%s's pd cluster is not health, health %d / desired %d,"+
dragonly marked this conversation as resolved.
Show resolved Hide resolved
" replicas %d, failureCount %d, can't failover",
ns, tcName, healthCount, tc.PDStsDesiredReplicas(), tc.Spec.PD.Replicas, len(tc.Status.PD.FailureMembers))
}

Expand Down Expand Up @@ -100,32 +102,39 @@ func (f *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error {
if tc.Status.PD.FailureMembers == nil {
tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{}
}
deadline := pdMember.LastTransitionTime.Add(f.deps.CLIConfig.PDFailoverPeriod)
failoverDeadline := pdMember.LastTransitionTime.Add(f.deps.CLIConfig.PDFailoverPeriod)
_, exist := tc.Status.PD.FailureMembers[pdName]

if pdMember.Health || time.Now().Before(deadline) || exist {
if pdMember.Health || time.Now().Before(failoverDeadline) || exist {
continue
}

ordinal, err := util.GetOrdinalFromPodName(podName)
if err != nil {
return err
pod, err := f.deps.PodLister.Pods(ns).Get(podName)
if err != nil && !errors.IsNotFound(err) {
dragonly marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("tryToMarkAPeerAsFailure: failed to get pod %s/%s for tc %s/%s, error: %s", ns, podName, ns, tcName, err)
}
pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal)
pvc, err := f.deps.PVCLister.PersistentVolumeClaims(ns).Get(pvcName)
if pod == nil {
klog.Infof("tryToMarkAPeerAsFailure: failure pod %s/%s not found, skip", ns, podName)
return nil
}

pvcs, err := util.ResolvePVCFromPod(pod, f.deps.PVCLister)
if err != nil {
return fmt.Errorf("tryToMarkAPeerAsFailure: failed to get pvc %s for cluster %s/%s, error: %s", pvcName, ns, tcName, err)
return fmt.Errorf("tryToMarkAPeerAsFailure: failed to get pvcs for pod %s/%s in tc %s/%s, error: %s", ns, pod.Name, ns, tcName, err)
}

msg := fmt.Sprintf("pd member[%s] is unhealthy", pdMember.ID)
f.deps.Recorder.Event(tc, apiv1.EventTypeWarning, unHealthEventReason, fmt.Sprintf(unHealthEventMsgPattern, "pd", pdName, msg))
f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy", "%s/%s(%s) is unhealthy", ns, podName, pdMember.ID)

// mark a peer member failed and return an error to skip reconciliation
// note that status of tidb cluster will be updated always
pvcUIDSet := make(map[types.UID]struct{})
for _, pvc := range pvcs {
pvcUIDSet[pvc.UID] = struct{}{}
}
tc.Status.PD.FailureMembers[pdName] = v1alpha1.PDFailureMember{
PodName: podName,
MemberID: pdMember.ID,
PVCUID: pvc.UID,
PVCUIDSet: pvcUIDSet,
MemberDeleted: false,
CreatedAt: metav1.Now(),
}
Expand All @@ -135,26 +144,26 @@ func (f *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error {
return nil
}

// tryToDeleteAFailureMember tries to delete a PD member and associated pod &
// pvc. If this succeeds, new pod & pvc will be created by Kubernetes.
// Note that this will fail if the kubelet on the node which failed pod was
// running on is not responding.
// tryToDeleteAFailureMember tries to delete a PD member and associated Pod & PVC.
// On success, new Pod & PVC will be created.
// Note that this will fail if the kubelet on the node on which failed Pod was running is not responding.
func (f *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
var failureMember *v1alpha1.PDFailureMember
var failurePodName string
var failurePdName string
var failurePDName string

for pdName, pdMember := range tc.Status.PD.FailureMembers {
if !pdMember.MemberDeleted {
failureMember = &pdMember
failurePodName = strings.Split(pdName, ".")[0]
failurePdName = pdName
failurePDName = pdName
break
}
}
if failureMember == nil {
klog.Infof("No PD FailureMembers to delete for tc %s/%s", ns, tcName)
return nil
}

Expand All @@ -163,50 +172,56 @@ func (f *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error {
return err
}
// invoke deleteMember api to delete a member from the pd cluster
err = controller.GetPDClient(f.deps.PDControl, tc).DeleteMemberByID(memberID)
if err != nil {
klog.Errorf("pd failover: failed to delete member: %d, %v", memberID, err)
if err := controller.GetPDClient(f.deps.PDControl, tc).DeleteMemberByID(memberID); err != nil {
klog.Errorf("tryToDeleteAFailureMember: failed to delete member %d, error: %v", memberID, err)
dragonly marked this conversation as resolved.
Show resolved Hide resolved
return err
}
klog.Infof("pd failover: delete member: %d successfully", memberID)
f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberDeleted",
"%s(%d) deleted from cluster", failurePodName, memberID)
klog.Infof("tryToDeleteAFailureMember: delete member %s/%s(%d) successfully", ns, failurePodName, memberID)
f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberDeleted", "failure member %s/%s(%d) deleted from PD cluster", ns, failurePodName, memberID)

// The order of old PVC deleting and the new Pod creating is not guaranteed by Kubernetes.
// If new Pod is created before old PVC deleted, new Pod will reuse old PVC.
// So we must try to delete the PVC and Pod of this PD peer over and over,
// and let StatefulSet create the new PD peer with the same ordinal, but don't use the tombstone PV
// If new Pod is created before old PVCs are deleted, the Statefulset will try to use the old PVCs and skip creating new PVCs.
// This could result in 2 possible cases:
// 1. If the old PVCs are first mounted successfully by the new Pod, the following pvc deletion will fail and return error.
// We will try to delete the Pod and PVCs again in the next requeued run.
// 2. If the old PVCs are first deleted successfully here, the new Pods will try to mount non-existing PVCs, which will pend forever.
// This is where OrphanPodsCleaner kicks in, which will delete the pending Pods in this situation.
// Please refer to orphan_pods_cleaner.go for details.
pod, err := f.deps.PodLister.Pods(ns).Get(failurePodName)
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("tryToDeleteAFailureMember: failed to get pods %s for cluster %s/%s, error: %s", failurePodName, ns, tcName, err)
return fmt.Errorf("tryToDeleteAFailureMember: failed to get pod %s/%s for tc %s/%s, error: %s", ns, failurePodName, ns, tcName, err)
}

ordinal, err := util.GetOrdinalFromPodName(failurePodName)
if err != nil {
return err
if pod == nil {
klog.Infof("tryToDeleteAFailureMember: failure pod %s/%s not found, skip", ns, failurePodName)
return nil
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}
pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal)
pvc, err := f.deps.PVCLister.PersistentVolumeClaims(ns).Get(pvcName)
pvcs, err := util.ResolvePVCFromPod(pod, f.deps.PVCLister)
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("tryToDeleteAFailureMember: failed to get pvc %s for cluster %s/%s, error: %s", pvcName, ns, tcName, err)
return fmt.Errorf("tryToDeleteAFailureMember: failed to get pvcs for pod %s/%s in tc %s/%s, error: %s", ns, pod.Name, ns, tcName, err)
}

if pod != nil && pod.DeletionTimestamp == nil {
err := f.deps.PodControl.DeletePod(tc, pod)
if err != nil {
if pod.DeletionTimestamp == nil {
if err := f.deps.PodControl.DeletePod(tc, pod); err != nil {
return err
}
}
if pvc != nil && pvc.DeletionTimestamp == nil && pvc.GetUID() == failureMember.PVCUID {
err = f.deps.PVCControl.DeletePVC(tc, pvc)
if err != nil {
klog.Errorf("pd failover: failed to delete pvc: %s/%s, %v", ns, pvcName, err)
return err
for _, pvc := range pvcs {
_, pvcUIDExist := failureMember.PVCUIDSet[pvc.GetUID()]
// for backward compatibility, if there exists failureMembers and user upgrades operator to newer version
// there will be failure member structures with PVCUID set from api server, we should handle this as pvcUIDExist == true
if pvc.GetUID() == failureMember.PVCUID {
pvcUIDExist = true
}
if pvc.DeletionTimestamp == nil && pvcUIDExist {
if err := f.deps.PVCControl.DeletePVC(tc, pvc); err != nil {
klog.Errorf("tryToDeleteAFailureMember: failed to delete pvc: %s/%s, error: %s", ns, pvc.Name, err)
return err
}
klog.Infof("tryToDeleteAFailureMember: delete pvc %s/%s successfully", ns, pvc.Name)
}
klog.Infof("pd failover: pvc: %s/%s successfully", ns, pvcName)
}

setMemberDeleted(tc, failurePdName)
setMemberDeleted(tc, failurePDName)
return nil
}

Expand All @@ -230,19 +245,22 @@ func setMemberDeleted(tc *v1alpha1.TidbCluster, pdName string) {
klog.Infof("pd failover: set pd member: %s/%s deleted", tc.GetName(), pdName)
}

// is healthy PD more than a half
func (f *pdFailover) isPDInQuorum(tc *v1alpha1.TidbCluster) (bool, int) {
healthCount := 0
ns := tc.GetNamespace()
for podName, pdMember := range tc.Status.PD.Members {
if pdMember.Health {
healthCount++
} else {
f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy",
"%s(%s) is unhealthy", podName, pdMember.ID)
f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy", "%s/%s(%s) is unhealthy", ns, podName, pdMember.ID)
}
}
for _, pdMember := range tc.Status.PD.PeerMembers {
if pdMember.Health {
healthCount++
} else {
f.deps.Recorder.Eventf(tc, apiv1.EventTypeWarning, "PDPeerMemberUnhealthy", "%s/%s(%s) is unhealthy", ns, pdMember.Name, pdMember.ID)
dragonly marked this conversation as resolved.
Show resolved Hide resolved
}
}
return healthCount > (len(tc.Status.PD.Members)+len(tc.Status.PD.PeerMembers))/2, healthCount
Expand Down