diff --git a/Makefile b/Makefile index 58d9223ed2..a077ccc512 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ -GOENV := GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=linux GOARCH=amd64 -GO := $(GOENV) go +GOENV := GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=linux GOARCH=amd64 +GO := $(GOENV) go +GOTEST := go test -v -cover LDFLAGS += -X "github.com/pingcap/tidb-operator/version.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')" LDFLAGS += -X "github.com/pingcap/tidb-operator/version.GitSHA=$(shell git rev-parse HEAD)" @@ -39,8 +40,7 @@ e2e-build: $(GOENV) ginkgo build tests/e2e test: - @echo "run unit tests" - @$(GO) test ./pkg/... -v -cover && echo success + @$(GOTEST) ./pkg/... && echo "\nUnit tests run successfully!" check-all: lint check-static check-shadow check-gosec megacheck errcheck diff --git a/charts/tidb-cluster/templates/pd-configmap.yaml b/charts/tidb-cluster/templates/pd-configmap.yaml index c00414624b..ea0a165ada 100644 --- a/charts/tidb-cluster/templates/pd-configmap.yaml +++ b/charts/tidb-cluster/templates/pd-configmap.yaml @@ -32,6 +32,9 @@ data: fi source ${ANNOTATIONS} 2>/dev/null + PEER_SERVICE_DOMAIN="${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc" + SERVICE_DOMAIN="${SERVICE_NAME}.${NAMESPACE}.svc" + runmode=${runmode:-normal} if [[ X${runmode} == Xdebug ]] then @@ -42,18 +45,35 @@ data: elapseTime=0 period=1 threshold=30 - while true;do - nslookup ${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc 2>/dev/null - [[ $? -eq 0 ]] && break - echo "nslookup domain ${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc failed" >&2 + while true; do sleep ${period} elapseTime=$(( elapseTime+period )) - if [[ ${elapseTime} -ge ${threshold} ]];then - echo "nslookup domain ${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc timeout" >&2 + + if [[ ${elapseTime} -ge ${threshold} ]] + then + echo "waiting for pd cluster ready timeout" >&2 exit 1 fi + + source ${ANNOTATIONS} 2>/dev/null + if nslookup ${PEER_SERVICE_DOMAIN} 2>/dev/null + then + echo "nslookup domain ${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc success" + + if [[ ${ORDINAL} -eq 0 ]] + then + [[ -z ${bootstrapping:-} ]] && continue + [[ ${bootstrapping} == "true" ]] && break + fi + + [[ -d /var/lib/pd/member/wal ]] && break + wget -qO- ${SERVICE_DOMAIN}:2379/pd/api/v1/members 2>/dev/null + [[ $? -eq 0 ]] && break + echo "pd cluster is not ready now: ${SERVICE_DOMAIN}" + else + echo "nslookup domain ${PEER_SERVICE_DOMAIN} failed" >&2 + fi done - echo "nslookup domain ${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc success" ARGS="--data-dir=/var/lib/pd \ --name=${HOSTNAME} \ @@ -64,14 +84,22 @@ data: --config=/etc/pd/pd.toml \ " - if [[ ${ORDINAL} == "0" ]] + replicas=${replicas:-3} + if [[ ${ORDINAL} -eq 0 && ${bootstrapping:-} == "true" ]] then ARGS="${ARGS}--initial-cluster=${HOSTNAME}=http://${HOSTNAME}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc:2380" else + if [[ ${ORDINAL} -eq 0 ]] + then + TOP=$((replicas-1)) + else + TOP=$((ORDINAL-1)) + fi + ARGS="${ARGS}--join=" - TOP=$((ORDINAL-1)) for i in $(seq 0 ${TOP}); do + [[ ${i} -eq ${ORDINAL} ]] && continue ARGS="${ARGS}http://${SET_NAME}-${i}.${PEER_SERVICE_NAME}.${NAMESPACE}.svc:2380" if [[ ${i} -lt ${TOP} ]] then diff --git a/charts/tidb-operator/templates/controller-manager-deployment.yaml b/charts/tidb-operator/templates/controller-manager-deployment.yaml index df364c916e..6805fc8ed0 100644 --- a/charts/tidb-operator/templates/controller-manager-deployment.yaml +++ b/charts/tidb-operator/templates/controller-manager-deployment.yaml @@ -35,6 +35,8 @@ spec: - /usr/local/bin/tidb-controller-manager - -default-storage-class-name={{ .Values.defaultStorageClassName }} - -cluster-scoped={{ .Values.clusterScoped }} + - -auto-failover={{ .Values.controllerManager.autoFailover | default false }} + - -pd-failover-period={{ .Values.controllerManager.pdFailoverPeriod | default "5m" }} - -v={{ .Values.controllerManager.logLevel }} env: - name: NAMESPACE diff --git a/charts/tidb-operator/templates/controller-manager-rbac.yaml b/charts/tidb-operator/templates/controller-manager-rbac.yaml index 54acf8053d..b9ac84fefa 100644 --- a/charts/tidb-operator/templates/controller-manager-rbac.yaml +++ b/charts/tidb-operator/templates/controller-manager-rbac.yaml @@ -27,7 +27,7 @@ rules: verbs: ["create", "get", "update"] - apiGroups: [""] resources: ["pods"] - verbs: ["get", "list", "watch","update"] + verbs: ["get", "list", "watch","update", "delete"] - apiGroups: [""] resources: ["persistentvolumes"] verbs: ["get", "list", "watch", "patch","update"] @@ -84,7 +84,7 @@ rules: verbs: ["get", "list", "watch", "update", "delete"] - apiGroups: [""] resources: ["pods"] - verbs: ["get", "list", "watch"] + verbs: ["get", "list", "watch","update", "delete"] - apiGroups: ["apps"] resources: ["statefulsets"] verbs: ["*"] diff --git a/charts/tidb-operator/values.yaml b/charts/tidb-operator/values.yaml index 993d3f4d1e..b59e748a0f 100644 --- a/charts/tidb-operator/values.yaml +++ b/charts/tidb-operator/values.yaml @@ -26,3 +26,7 @@ controllerManager: requests: cpu: 80m memory: 50Mi + # autoFailover is whether tidb-operator should auto failover when failure occurs + autoFailover: false + # pd failover period default(5m) + pdFailoverPeriod: 5m diff --git a/cmd/controller-manager/main.go b/cmd/controller-manager/main.go index 942e49166d..dd86967631 100644 --- a/cmd/controller-manager/main.go +++ b/cmd/controller-manager/main.go @@ -38,13 +38,15 @@ import ( ) var ( - printVersion bool - workers int - leaseDuration = 15 * time.Second - renewDuration = 5 * time.Second - retryPeriod = 3 * time.Second - resyncDuration = 30 * time.Second - waitDuration = 5 * time.Second + printVersion bool + workers int + pdFailoverPeriod time.Duration + autoFailover bool + leaseDuration = 15 * time.Second + renewDuration = 5 * time.Second + retryPeriod = 3 * time.Second + resyncDuration = 30 * time.Second + waitDuration = 5 * time.Second ) func init() { @@ -53,6 +55,8 @@ func init() { flag.IntVar(&workers, "workers", 5, "The number of workers that are allowed to sync concurrently. Larger number = more responsive management, but more CPU (and network) load") flag.BoolVar(&controller.ClusterScoped, "cluster-scoped", true, "Whether tidb-operator should manage kubernetes cluster wide TiDB Clusters") flag.StringVar(&controller.DefaultStorageClassName, "default-storage-class-name", "standard", "Default storage class name") + flag.BoolVar(&autoFailover, "auto-failover", false, "Auto failover") + flag.DurationVar(&pdFailoverPeriod, "pd-failover-period", time.Duration(5*time.Minute), "PD failover period default(5m)") flag.Parse() } @@ -112,7 +116,7 @@ func main() { }, } - tcController := tidbcluster.NewController(kubeCli, cli, informerFactory, kubeInformerFactory) + tcController := tidbcluster.NewController(kubeCli, cli, informerFactory, kubeInformerFactory, autoFailover, pdFailoverPeriod) stop := make(chan struct{}) defer close(stop) go informerFactory.Start(stop) diff --git a/pkg/apis/pingcap.com/v1alpha1/types.go b/pkg/apis/pingcap.com/v1alpha1/types.go index 3c3bc868ff..0119a9ee40 100644 --- a/pkg/apis/pingcap.com/v1alpha1/types.go +++ b/pkg/apis/pingcap.com/v1alpha1/types.go @@ -16,6 +16,7 @@ package v1alpha1 import ( apps "k8s.io/api/apps/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/api/core/v1" ) @@ -165,9 +166,10 @@ type ResourceRequirement struct { // PDStatus is PD status type PDStatus struct { - Phase MemberPhase `json:"phase,omitempty"` - StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"` - Members map[string]PDMember `json:"members,omitempty"` + Phase MemberPhase `json:"phase,omitempty"` + StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"` + Members map[string]PDMember `json:"members,omitempty"` + FailureMembers map[string]PDFailureMember `json:"failureMembers,omitempty"` } // PDMember is PD member @@ -178,6 +180,17 @@ type PDMember struct { ID string `json:"id"` ClientURL string `json:"clientURL"` Health bool `json:"health"` + // Last time the health transitioned from one to another. + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` +} + +// PDFailureMember is the pd failure member information +type PDFailureMember struct { + PodName string `json:"podName,omitempty"` + MemberID string `json:"memberID,omitempty"` + PVUID types.UID `json:"pvUID,omitempty"` + Replicas int32 `json:"replicas,omitempty"` + MemberDeleted bool `json:"memberDeleted,omitempty"` } // TiDBStatus is TiDB status diff --git a/pkg/apis/pingcap.com/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pingcap.com/v1alpha1/zz_generated.deepcopy.go index ed7885d4f3..c3a924d2a8 100644 --- a/pkg/apis/pingcap.com/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pingcap.com/v1alpha1/zz_generated.deepcopy.go @@ -52,9 +52,26 @@ func (in *ContainerSpec) DeepCopy() *ContainerSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PDFailureMember) DeepCopyInto(out *PDFailureMember) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PDFailureMember. +func (in *PDFailureMember) DeepCopy() *PDFailureMember { + if in == nil { + return nil + } + out := new(PDFailureMember) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PDMember) DeepCopyInto(out *PDMember) { *out = *in + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) return } @@ -110,6 +127,13 @@ func (in *PDStatus) DeepCopyInto(out *PDStatus) { if in.Members != nil { in, out := &in.Members, &out.Members *out = make(map[string]PDMember, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } + if in.FailureMembers != nil { + in, out := &in.FailureMembers, &out.FailureMembers + *out = make(map[string]PDFailureMember, len(*in)) for key, val := range *in { (*out)[key] = val } diff --git a/pkg/controller/pod_control.go b/pkg/controller/pod_control.go index 17fc31e191..ced287c34d 100644 --- a/pkg/controller/pod_control.go +++ b/pkg/controller/pod_control.go @@ -22,8 +22,10 @@ import ( "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/label" corev1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" @@ -31,12 +33,16 @@ import ( // PodControlInterface manages Pods used in TidbCluster type PodControlInterface interface { + // TODO change this to UpdatePod UpdateMetaInfo(*v1alpha1.TidbCluster, *corev1.Pod) error + DeletePod(*v1alpha1.TidbCluster, *corev1.Pod) error + UpdatePod(*v1alpha1.TidbCluster, *corev1.Pod) error } type realPodControl struct { kubeCli kubernetes.Interface pdControl PDControlInterface + podLister corelisters.PodLister recorder record.EventRecorder } @@ -44,15 +50,43 @@ type realPodControl struct { func NewRealPodControl( kubeCli kubernetes.Interface, pdControl PDControlInterface, + podLister corelisters.PodLister, recorder record.EventRecorder, ) PodControlInterface { return &realPodControl{ kubeCli: kubeCli, pdControl: pdControl, + podLister: podLister, recorder: recorder, } } +func (rpc *realPodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { + ns := tc.GetNamespace() + podName := pod.GetName() + + // don't wait due to limited number of clients, but backoff after the default number of steps + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _, updateErr := rpc.kubeCli.CoreV1().Pods(ns).Update(pod) + if updateErr == nil { + glog.Infof("Pod: [%s/%s] updated successfully", ns, podName) + return nil + } + glog.Errorf("failed to update Pod: [%s/%s], error: %v", ns, podName, updateErr) + + if updated, err := rpc.podLister.Pods(ns).Get(podName); err == nil { + // make a copy so we don't mutate the shared cache + pod = updated.DeepCopy() + } else { + utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", ns, podName, err)) + } + + return updateErr + }) + rpc.recordPodEvent("update", tc, podName, err) + return err +} + func (rpc *realPodControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { ns := pod.GetNamespace() podName := pod.GetName() @@ -131,6 +165,20 @@ func (rpc *realPodControl) UpdateMetaInfo(tc *v1alpha1.TidbCluster, pod *corev1. return err } +func (rpc *realPodControl) DeletePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { + ns := tc.GetNamespace() + tcName := tc.GetName() + podName := pod.GetName() + err := rpc.kubeCli.CoreV1().Pods(ns).Delete(podName, nil) + if err != nil { + glog.Errorf("failed to delete Pod: [%s/%s], TidbCluster: %s, %v", ns, podName, tcName, err) + } else { + glog.V(4).Infof("delete Pod: [%s/%s] successfully, TidbCluster: %s", ns, podName, tcName) + } + rpc.recordPodEvent("delete", tc, podName, err) + return err +} + func (rpc *realPodControl) recordPodEvent(verb string, tc *v1alpha1.TidbCluster, podName string, err error) { tcName := tc.GetName() if err == nil { @@ -162,6 +210,7 @@ var ( type FakePodControl struct { PodIndexer cache.Indexer updatePodTracker requestTracker + deletePodTracker requestTracker getClusterTracker requestTracker getMemberTracker requestTracker getStoreTracker requestTracker @@ -175,6 +224,7 @@ func NewFakePodControl(podInformer coreinformers.PodInformer) *FakePodControl { requestTracker{0, nil, 0}, requestTracker{0, nil, 0}, requestTracker{0, nil, 0}, + requestTracker{0, nil, 0}, } } @@ -184,6 +234,12 @@ func (fpc *FakePodControl) SetUpdatePodError(err error, after int) { fpc.updatePodTracker.after = after } +// SetDeletePodError sets the error attributes of deletePodTracker +func (fpc *FakePodControl) SetDeletePodError(err error, after int) { + fpc.deletePodTracker.err = err + fpc.deletePodTracker.after = after +} + // SetGetClusterError sets the error attributes of getClusterTracker func (fpc *FakePodControl) SetGetClusterError(err error, after int) { fpc.getClusterTracker.err = err @@ -237,4 +293,24 @@ func (fpc *FakePodControl) UpdateMetaInfo(_ *v1alpha1.TidbCluster, pod *corev1.P return fpc.PodIndexer.Update(pod) } +func (fpc *FakePodControl) DeletePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { + defer fpc.deletePodTracker.inc() + if fpc.deletePodTracker.errorReady() { + defer fpc.deletePodTracker.reset() + return fpc.deletePodTracker.err + } + + return fpc.PodIndexer.Delete(pod) +} + +func (fpc *FakePodControl) UpdatePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { + defer fpc.updatePodTracker.inc() + if fpc.updatePodTracker.errorReady() { + defer fpc.updatePodTracker.reset() + return fpc.updatePodTracker.err + } + + return fpc.PodIndexer.Update(pod) +} + var _ PodControlInterface = &FakePodControl{} diff --git a/pkg/controller/pod_control_test.go b/pkg/controller/pod_control_test.go index b1ecfec4e7..56f1060a2c 100644 --- a/pkg/controller/pod_control_test.go +++ b/pkg/controller/pod_control_test.go @@ -28,7 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + kubefake "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" ) @@ -37,8 +40,8 @@ func TestPodControlUpdateMetaInfoSuccess(t *testing.T) { g := NewGomegaWithT(t) tc := newTidbCluster() pod := newPod(tc) - fakeClient, pdControl, recorder := newFakeClientRecorderAndPDControl() - control := NewRealPodControl(fakeClient, pdControl, recorder) + fakeClient, pdControl, podLister, recorder := newFakeClientRecorderAndPDControl() + control := NewRealPodControl(fakeClient, pdControl, podLister, recorder) pdClient := NewFakePDClient() pdControl.SetPDClient(tc, pdClient) pdClient.AddReaction(GetClusterActionType, func(action *Action) (interface{}, error) { @@ -88,8 +91,8 @@ func TestPodControlUpdateMetaInfoGetClusterFailed(t *testing.T) { g := NewGomegaWithT(t) tc := newTidbCluster() pod := newPod(tc) - fakeClient, pdControl, recorder := newFakeClientRecorderAndPDControl() - control := NewRealPodControl(fakeClient, pdControl, recorder) + fakeClient, pdControl, podLister, recorder := newFakeClientRecorderAndPDControl() + control := NewRealPodControl(fakeClient, pdControl, podLister, recorder) pdClient := NewFakePDClient() pdControl.SetPDClient(tc, pdClient) pdClient.AddReaction(GetClusterActionType, func(action *Action) (interface{}, error) { @@ -135,8 +138,8 @@ func TestPodControlUpdateMetaInfoGetMemberFailed(t *testing.T) { g := NewGomegaWithT(t) tc := newTidbCluster() pod := newPod(tc) - fakeClient, pdControl, recorder := newFakeClientRecorderAndPDControl() - control := NewRealPodControl(fakeClient, pdControl, recorder) + fakeClient, pdControl, podLister, recorder := newFakeClientRecorderAndPDControl() + control := NewRealPodControl(fakeClient, pdControl, podLister, recorder) pdClient := NewFakePDClient() pdControl.SetPDClient(tc, pdClient) pdClient.AddReaction(GetClusterActionType, func(action *Action) (interface{}, error) { @@ -179,8 +182,8 @@ func TestPodControlUpdateMetaInfoGetStoreFailed(t *testing.T) { g := NewGomegaWithT(t) tc := newTidbCluster() pod := newPod(tc) - fakeClient, pdControl, recorder := newFakeClientRecorderAndPDControl() - control := NewRealPodControl(fakeClient, pdControl, recorder) + fakeClient, pdControl, podLister, recorder := newFakeClientRecorderAndPDControl() + control := NewRealPodControl(fakeClient, pdControl, podLister, recorder) pdClient := NewFakePDClient() pdControl.SetPDClient(tc, pdClient) pdClient.AddReaction(GetClusterActionType, func(action *Action) (interface{}, error) { @@ -218,8 +221,8 @@ func TestPodControlUpdateMetaInfoUpdatePodFailed(t *testing.T) { g := NewGomegaWithT(t) tc := newTidbCluster() pod := newPod(tc) - fakeClient, pdControl, recorder := newFakeClientRecorderAndPDControl() - control := NewRealPodControl(fakeClient, pdControl, recorder) + fakeClient, pdControl, podLister, recorder := newFakeClientRecorderAndPDControl() + control := NewRealPodControl(fakeClient, pdControl, podLister, recorder) pdClient := NewFakePDClient() pdControl.SetPDClient(tc, pdClient) pdClient.AddReaction(GetClusterActionType, func(action *Action) (interface{}, error) { @@ -269,8 +272,8 @@ func TestPodControlUpdateMetaInfoConflictSuccess(t *testing.T) { g := NewGomegaWithT(t) tc := newTidbCluster() pod := newPod(tc) - fakeClient, pdControl, recorder := newFakeClientRecorderAndPDControl() - control := NewRealPodControl(fakeClient, pdControl, recorder) + fakeClient, pdControl, podLister, recorder := newFakeClientRecorderAndPDControl() + control := NewRealPodControl(fakeClient, pdControl, podLister, recorder) pdClient := NewFakePDClient() pdControl.SetPDClient(tc, pdClient) pdClient.AddReaction(GetClusterActionType, func(action *Action) (interface{}, error) { @@ -322,11 +325,13 @@ func TestPodControlUpdateMetaInfoConflictSuccess(t *testing.T) { g.Expect(events[0]).To(ContainSubstring(corev1.EventTypeNormal)) } -func newFakeClientRecorderAndPDControl() (*fake.Clientset, *FakePDControl, *record.FakeRecorder) { +func newFakeClientRecorderAndPDControl() (*fake.Clientset, *FakePDControl, corelisters.PodLister, *record.FakeRecorder) { fakeClient := &fake.Clientset{} pdControl := NewFakePDControl() + kubeCli := kubefake.NewSimpleClientset() + podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods() recorder := record.NewFakeRecorder(10) - return fakeClient, pdControl, recorder + return fakeClient, pdControl, podInformer.Lister(), recorder } func newPod(tc *v1alpha1.TidbCluster) *corev1.Pod { diff --git a/pkg/controller/tidbcluster/tidb_cluster_control.go b/pkg/controller/tidbcluster/tidb_cluster_control.go index 203501a266..103d5b323e 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_control.go +++ b/pkg/controller/tidbcluster/tidb_cluster_control.go @@ -16,10 +16,10 @@ package tidbcluster import ( "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/manager" "github.com/pingcap/tidb-operator/pkg/util" apiequality "k8s.io/apimachinery/pkg/api/equality" - errorutils "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" ) @@ -39,7 +39,7 @@ type ControlInterface interface { // NewDefaultTidbClusterControl returns a new instance of the default implementation TidbClusterControlInterface that // implements the documented semantics for TidbClusters. func NewDefaultTidbClusterControl( - statusUpdater StatusUpdaterInterface, + tcControl controller.TidbClusterControlInterface, pdMemberManager manager.Manager, tikvMemberManager manager.Manager, tidbMemberManager manager.Manager, @@ -47,7 +47,7 @@ func NewDefaultTidbClusterControl( metaManager manager.Manager, recorder record.EventRecorder) ControlInterface { return &defaultTidbClusterControl{ - statusUpdater, + tcControl, pdMemberManager, tikvMemberManager, tidbMemberManager, @@ -58,7 +58,7 @@ func NewDefaultTidbClusterControl( } type defaultTidbClusterControl struct { - statusUpdater StatusUpdaterInterface + tcControl controller.TidbClusterControlInterface pdMemberManager manager.Manager tikvMemberManager manager.Manager tidbMemberManager manager.Manager @@ -69,21 +69,22 @@ type defaultTidbClusterControl struct { // UpdateStatefulSet executes the core logic loop for a tidbcluster. func (tcc *defaultTidbClusterControl) UpdateTidbCluster(tc *v1alpha1.TidbCluster) error { - // perform the main update function and get the status - var errs []error oldStatus := tc.Status.DeepCopy() + oldPDReplicas := tc.Spec.PD.Replicas + err := tcc.updateTidbCluster(tc) if err != nil { - errs = append(errs, err) + return err } - if !apiequality.Semantic.DeepEqual(&tc.Status, oldStatus) { - // update the tidbCluster's status - err2 := tcc.updateTidbClusterStatus(tc, &tc.Status) - if err2 != nil { - errs = append(errs, err2) + + if !apiequality.Semantic.DeepEqual(&tc.Status, oldStatus) || tc.Spec.PD.Replicas != oldPDReplicas { + tc, err = tcc.tcControl.UpdateTidbCluster(tc.DeepCopy()) + if err != nil { + return err } } - return errorutils.NewAggregate(errs) + + return nil } func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster) error { @@ -131,12 +132,6 @@ func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster return nil } -func (tcc *defaultTidbClusterControl) updateTidbClusterStatus(tc *v1alpha1.TidbCluster, status *v1alpha1.TidbClusterStatus) error { - tc = tc.DeepCopy() - status = status.DeepCopy() - return tcc.statusUpdater.UpdateTidbClusterStatus(tc, status) -} - func (tcc *defaultTidbClusterControl) IsPDAvailable(tc *v1alpha1.TidbCluster) bool { lowerLimit := tc.Spec.PD.Replicas/2 + 1 if int32(len(tc.Status.PD.Members)) < lowerLimit { diff --git a/pkg/controller/tidbcluster/tidb_cluster_control_test.go b/pkg/controller/tidbcluster/tidb_cluster_control_test.go index e99bacd7f5..6ccdb11c87 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_control_test.go +++ b/pkg/controller/tidbcluster/tidb_cluster_control_test.go @@ -20,15 +20,12 @@ import ( "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/fake" informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions" - tcinformers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions/pingcap.com/v1alpha1" - v1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" mm "github.com/pingcap/tidb-operator/pkg/manager/member" "github.com/pingcap/tidb-operator/pkg/manager/meta" apiequality "k8s.io/apimachinery/pkg/api/equality" kubeinformers "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" ) @@ -47,7 +44,7 @@ func TestTidbClusterControl(t *testing.T) { }, nil }) - control, setControl, _, pdControl := newFakeTidbClusterControl() + control, setControl, pdControl := newFakeTidbClusterControl() pdControl.SetPDClient(tc, pdClient) err := syncTidbClusterControl(tc, setControl, control) @@ -71,7 +68,7 @@ func TestTidbClusterStatusEquality(t *testing.T) { g.Expect(apiequality.Semantic.DeepEqual(&tcStatus, tcStatusCopy)).To(Equal(false)) } -func newFakeTidbClusterControl() (ControlInterface, *controller.FakeStatefulSetControl, StatusUpdaterInterface, *controller.FakePDControl) { +func newFakeTidbClusterControl() (ControlInterface, *controller.FakeStatefulSetControl, *controller.FakePDControl) { cli := fake.NewSimpleClientset() kubeCli := kubefake.NewSimpleClientset() setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets() @@ -88,22 +85,24 @@ func newFakeTidbClusterControl() (ControlInterface, *controller.FakeStatefulSetC svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer) pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), recorder) pvcControl := controller.NewRealPVCControl(kubeCli, recorder, pvcInformer.Lister()) - podControl := controller.NewRealPodControl(kubeCli, pdControl, recorder) - statusUpdater := newFakeTidbClusterStatusUpdater(tcInformer) + podControl := controller.NewRealPodControl(kubeCli, pdControl, podInformer.Lister(), recorder) + tcControl := controller.NewFakeTidbClusterControl(tcInformer) pdScaler := mm.NewFakePDScaler() tikvScaler := mm.NewFakeTiKVScaler() + autoFailover := true + pdFailover := mm.NewFakePDFailover() pdUpgrader := mm.NewFakePDUpgrader() tikvUpgrader := mm.NewFakeTiKVUpgrader() tidbUpgrader := mm.NewFakeTiDBUpgrader() - pdMemberManager := mm.NewPDMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), pdScaler, pdUpgrader) + pdMemberManager := mm.NewPDMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), podControl, pvcInformer.Lister(), pdScaler, pdUpgrader, autoFailover, pdFailover) tikvMemberManager := mm.NewTiKVMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), tikvScaler, tikvUpgrader) tidbMemberManager := mm.NewTiDBMemberManager(setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), tidbUpgrader) reclaimPolicyManager := meta.NewReclaimPolicyManager(pvcInformer.Lister(), pvInformer.Lister(), pvControl) metaManager := meta.NewMetaManager(pvcInformer.Lister(), pvcControl, pvInformer.Lister(), pvControl, podInformer.Lister(), podControl) - control := NewDefaultTidbClusterControl(statusUpdater, pdMemberManager, tikvMemberManager, tidbMemberManager, reclaimPolicyManager, metaManager, recorder) + control := NewDefaultTidbClusterControl(tcControl, pdMemberManager, tikvMemberManager, tidbMemberManager, reclaimPolicyManager, metaManager, recorder) - return control, setControl, statusUpdater, pdControl + return control, setControl, pdControl } func syncTidbClusterControl(tc *v1alpha1.TidbCluster, _ *controller.FakeStatefulSetControl, control ControlInterface) error { @@ -116,22 +115,3 @@ func syncTidbClusterControl(tc *v1alpha1.TidbCluster, _ *controller.FakeStateful return nil } - -type fakeTidbClusterStatusUpdater struct { - tcLister v1listers.TidbClusterLister - tcIndexer cache.Indexer -} - -func newFakeTidbClusterStatusUpdater(tcInformer tcinformers.TidbClusterInformer) *fakeTidbClusterStatusUpdater { - return &fakeTidbClusterStatusUpdater{ - tcInformer.Lister(), - tcInformer.Informer().GetIndexer(), - } -} - -func (tsu *fakeTidbClusterStatusUpdater) UpdateTidbClusterStatus(tc *v1alpha1.TidbCluster, status *v1alpha1.TidbClusterStatus) error { - tc.Status = *status - return tsu.tcIndexer.Update(tc) -} - -var _ StatusUpdaterInterface = &fakeTidbClusterStatusUpdater{} diff --git a/pkg/controller/tidbcluster/tidb_cluster_controller.go b/pkg/controller/tidbcluster/tidb_cluster_controller.go index 4f716d184d..15b1ff5144 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_controller.go +++ b/pkg/controller/tidbcluster/tidb_cluster_controller.go @@ -70,7 +70,8 @@ func NewController( cli versioned.Interface, informerFactory informers.SharedInformerFactory, kubeInformerFactory kubeinformers.SharedInformerFactory, -) *Controller { + autoFailover bool, + pdFailoverPeriod time.Duration) *Controller { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{ @@ -85,14 +86,16 @@ func NewController( podInformer := kubeInformerFactory.Core().V1().Pods() nodeInformer := kubeInformerFactory.Core().V1().Nodes() + tcControl := controller.NewRealTidbClusterControl(cli, tcInformer.Lister(), recorder) pdControl := controller.NewDefaultPDControl() setControl := controller.NewRealStatefuSetControl(kubeCli, setInformer.Lister(), recorder) svcControl := controller.NewRealServiceControl(kubeCli, svcInformer.Lister(), recorder) pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), recorder) pvcControl := controller.NewRealPVCControl(kubeCli, recorder, pvcInformer.Lister()) - podControl := controller.NewRealPodControl(kubeCli, pdControl, recorder) + podControl := controller.NewRealPodControl(kubeCli, pdControl, podInformer.Lister(), recorder) pdScaler := mm.NewPDScaler(pdControl, pvcInformer.Lister(), pvcControl) tikvScaler := mm.NewTiKVScaler(pdControl, pvcInformer.Lister(), pvcControl) + pdFailover := mm.NewPDFailover(cli, tcControl, pdControl, pdFailoverPeriod, podInformer.Lister(), podControl, pvcInformer.Lister(), pvcControl, pvInformer.Lister()) pdUpgrader := mm.NewPDUpgrader() tikvUpgrader := mm.NewTiKVUpgrader() tidbUpgrader := mm.NewTiDBUpgrader() @@ -101,15 +104,20 @@ func NewController( kubeClient: kubeCli, cli: cli, control: NewDefaultTidbClusterControl( - NewRealTidbClusterStatusUpdater(cli, tcInformer.Lister()), + tcControl, mm.NewPDMemberManager( pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), + podInformer.Lister(), + podControl, + pvcInformer.Lister(), pdScaler, pdUpgrader, + autoFailover, + pdFailover, ), mm.NewTiKVMemberManager( pdControl, diff --git a/pkg/controller/tidbcluster/tidb_cluster_controller_test.go b/pkg/controller/tidbcluster/tidb_cluster_controller_test.go index 3259f62aa7..5f903eac6b 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_controller_test.go +++ b/pkg/controller/tidbcluster/tidb_cluster_controller_test.go @@ -223,12 +223,15 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer) tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters() podInformer := kubeInformerFactory.Core().V1().Pods() nodeInformer := kubeInformerFactory.Core().V1().Nodes() + autoFailover := true tcc := NewController( kubeCli, cli, informerFactory, kubeInformerFactory, + autoFailover, + 5*time.Minute, ) tcc.tcListerSynced = alwaysReady tcc.setListerSynced = alwaysReady @@ -247,23 +250,29 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer) ) pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), recorder) pvcControl := controller.NewRealPVCControl(kubeCli, recorder, pvcInformer.Lister()) - podControl := controller.NewRealPodControl(kubeCli, pdControl, recorder) + podControl := controller.NewRealPodControl(kubeCli, pdControl, podInformer.Lister(), recorder) pdScaler := mm.NewPDScaler(pdControl, pvcInformer.Lister(), pvcControl) tikvScaler := mm.NewTiKVScaler(pdControl, pvcInformer.Lister(), pvcControl) - pdUpgrade := mm.NewPDUpgrader() + pdFailover := mm.NewFakePDFailover() + pdUpgrader := mm.NewPDUpgrader() tikvUpgrader := mm.NewTiKVUpgrader() tidbUpgrader := mm.NewTiDBUpgrader() tcc.control = NewDefaultTidbClusterControl( - NewRealTidbClusterStatusUpdater(cli, tcInformer.Lister()), + controller.NewRealTidbClusterControl(cli, tcInformer.Lister(), recorder), mm.NewPDMemberManager( pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), + podInformer.Lister(), + podControl, + pvcInformer.Lister(), pdScaler, - pdUpgrade, + pdUpgrader, + autoFailover, + pdFailover, ), mm.NewTiKVMemberManager( pdControl, diff --git a/pkg/controller/tidbcluster/tidb_cluster_status_updater.go b/pkg/controller/tidbcluster/tidb_cluster_status_updater.go deleted file mode 100644 index ca4309b354..0000000000 --- a/pkg/controller/tidbcluster/tidb_cluster_status_updater.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tidbcluster - -import ( - "fmt" - - "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" - "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" - v1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap.com/v1alpha1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/util/retry" -) - -// StatusUpdaterInterface is an interface used to update the TidbClusterStatus associated with a TidbCluster. -// For any use other than testing, clients should create an instance using NewRealTidbClusterStatusUpdater. -type StatusUpdaterInterface interface { - // UpdateTidbClusterStatus sets the tidbCluster's Status to status. Implementations are required to retry on conflicts, - // but fail on other errors. If the returned error is nil tidbCluster's Status has been successfully set to status. - UpdateTidbClusterStatus(*v1alpha1.TidbCluster, *v1alpha1.TidbClusterStatus) error -} - -// NewRealTidbClusterStatusUpdater returns a StatusUpdaterInterface that updates the Status of a TidbCluster, -// using the supplied client and setLister. -func NewRealTidbClusterStatusUpdater( - cli versioned.Interface, - tcLister v1listers.TidbClusterLister) StatusUpdaterInterface { - return &realTidbClusterStatusUpdater{cli, tcLister} -} - -type realTidbClusterStatusUpdater struct { - cli versioned.Interface - tcLister v1listers.TidbClusterLister -} - -func (tcs *realTidbClusterStatusUpdater) UpdateTidbClusterStatus( - tc *v1alpha1.TidbCluster, - status *v1alpha1.TidbClusterStatus) error { - ns := tc.GetNamespace() - tcName := tc.GetName() - // don't wait due to limited number of clients, but backoff after the default number of steps - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - tc.Status = *status - _, updateErr := tcs.cli.PingcapV1alpha1().TidbClusters(ns).Update(tc) - if updateErr == nil { - return nil - } - if updated, err := tcs.tcLister.TidbClusters(ns).Get(tcName); err == nil { - // make a copy so we don't mutate the shared cache - tc = updated.DeepCopy() - } else { - utilruntime.HandleError(fmt.Errorf("error getting updated TidbCluster %s/%s from lister: %v", ns, tcName, err)) - } - - return updateErr - }) -} - -var _ StatusUpdaterInterface = &realTidbClusterStatusUpdater{} diff --git a/pkg/controller/tidbcluster/tidb_cluster_status_updater_test.go b/pkg/controller/tidbcluster/tidb_cluster_status_updater_test.go deleted file mode 100644 index 8e586d632a..0000000000 --- a/pkg/controller/tidbcluster/tidb_cluster_status_updater_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tidbcluster diff --git a/pkg/controller/tidbcluster_control.go b/pkg/controller/tidbcluster_control.go new file mode 100644 index 0000000000..f3b39808e0 --- /dev/null +++ b/pkg/controller/tidbcluster_control.go @@ -0,0 +1,133 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + "strings" + + "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" + tcinformers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions/pingcap.com/v1alpha1" + listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap.com/v1alpha1" + corev1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" +) + +// TidbClusterControlInterface manages TidbClusters +type TidbClusterControlInterface interface { + UpdateTidbCluster(*v1alpha1.TidbCluster) (*v1alpha1.TidbCluster, error) +} + +type realTidbClusterControl struct { + cli versioned.Interface + tcLister listers.TidbClusterLister + recorder record.EventRecorder +} + +// NewRealTidbClusterControl creates a new TidbClusterControlInterface +func NewRealTidbClusterControl(cli versioned.Interface, + tcLister listers.TidbClusterLister, + recorder record.EventRecorder) TidbClusterControlInterface { + return &realTidbClusterControl{ + cli, + tcLister, + recorder, + } +} + +func (rtc *realTidbClusterControl) UpdateTidbCluster(tc *v1alpha1.TidbCluster) (*v1alpha1.TidbCluster, error) { + ns := tc.GetNamespace() + tcName := tc.GetName() + + status := tc.Status.DeepCopy() + pdReplicas := tc.Spec.PD.Replicas + var updateTC *v1alpha1.TidbCluster + + // don't wait due to limited number of clients, but backoff after the default number of steps + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + tc.Status = *status + tc.Spec.PD.Replicas = pdReplicas + var updateErr error + updateTC, updateErr = rtc.cli.PingcapV1alpha1().TidbClusters(ns).Update(tc) + if updateErr == nil { + glog.Infof("TidbCluster: [%s/%s] updated successfully", ns, tcName) + return nil + } + glog.Errorf("failed to update TidbCluster: [%s/%s], error: %v", ns, tcName, updateErr) + + if updated, err := rtc.tcLister.TidbClusters(ns).Get(tcName); err == nil { + // make a copy so we don't mutate the shared cache + tc = updated.DeepCopy() + } else { + utilruntime.HandleError(fmt.Errorf("error getting updated TidbCluster %s/%s from lister: %v", ns, tcName, err)) + } + + return updateErr + }) + rtc.recordTidbClusterEvent("update", tc, err) + return updateTC, err +} + +func (rtc *realTidbClusterControl) recordTidbClusterEvent(verb string, tc *v1alpha1.TidbCluster, err error) { + tcName := tc.GetName() + if err == nil { + reason := fmt.Sprintf("Successful%s", strings.Title(verb)) + msg := fmt.Sprintf("%s TidbCluster %s successful", + strings.ToLower(verb), tcName) + rtc.recorder.Event(tc, corev1.EventTypeNormal, reason, msg) + } else { + reason := fmt.Sprintf("Failed%s", strings.Title(verb)) + msg := fmt.Sprintf("%s TidbCluster %s failed error: %s", + strings.ToLower(verb), tcName, err) + rtc.recorder.Event(tc, corev1.EventTypeWarning, reason, msg) + } +} + +// FakeTidbClusterControl is a fake TidbClusterControlInterface +type FakeTidbClusterControl struct { + TcLister listers.TidbClusterLister + TcIndexer cache.Indexer + updateTidbClusterTracker requestTracker +} + +// NewFakeTidbClusterControl returns a FakeTidbClusterControl +func NewFakeTidbClusterControl(tcInformer tcinformers.TidbClusterInformer) *FakeTidbClusterControl { + return &FakeTidbClusterControl{ + tcInformer.Lister(), + tcInformer.Informer().GetIndexer(), + requestTracker{0, nil, 0}, + } +} + +// SetUpdateTidbClusterError sets the error attributes of updateTidbClusterTracker +func (ssc *FakeTidbClusterControl) SetUpdateTidbClusterError(err error, after int) { + ssc.updateTidbClusterTracker.err = err + ssc.updateTidbClusterTracker.after = after +} + +// UpdateTidbCluster updates the TidbCluster +func (ssc *FakeTidbClusterControl) UpdateTidbCluster(tc *v1alpha1.TidbCluster) (*v1alpha1.TidbCluster, error) { + defer ssc.updateTidbClusterTracker.inc() + if ssc.updateTidbClusterTracker.errorReady() { + defer ssc.updateTidbClusterTracker.reset() + return tc, ssc.updateTidbClusterTracker.err + } + + return tc, ssc.TcIndexer.Update(tc) +} diff --git a/pkg/label/label.go b/pkg/label/label.go index eaaf48dc4f..a6f5d7f03b 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -49,6 +49,10 @@ const ( MonitorLabelVal string = "monitor" // ClusterLabelVal is cluster label value ClusterLabelVal string = "tidbCluster" + // Bootstrapping is bootstrapping key + Bootstrapping string = "bootstrapping" + // Replicas is replicas key + Replicas string = "replicas" ) // Label is the label field in metadata diff --git a/pkg/manager/member/failover.go b/pkg/manager/member/failover.go new file mode 100644 index 0000000000..124348d17d --- /dev/null +++ b/pkg/manager/member/failover.go @@ -0,0 +1,290 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package member + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" + "github.com/pingcap/tidb-operator/pkg/controller" + "k8s.io/apimachinery/pkg/api/errors" + corelisters "k8s.io/client-go/listers/core/v1" +) + +// Failover implements the logic for pd/tikv/tidb's failover and recovery. +type Failover interface { + Failover(*v1alpha1.TidbCluster) error + Recover(*v1alpha1.TidbCluster) +} + +// TODO add maxFailoverCount +type pdFailover struct { + cli versioned.Interface + tcControl controller.TidbClusterControlInterface + pdControl controller.PDControlInterface + pdFailoverPeriod time.Duration + podLister corelisters.PodLister + podControl controller.PodControlInterface + pvcLister corelisters.PersistentVolumeClaimLister + pvcControl controller.PVCControlInterface + pvLister corelisters.PersistentVolumeLister +} + +// NewPDFailover returns a pd Failover +func NewPDFailover(cli versioned.Interface, + tcControl controller.TidbClusterControlInterface, + pdControl controller.PDControlInterface, + pdFailoverPeriod time.Duration, + podLister corelisters.PodLister, + podControl controller.PodControlInterface, + pvcLister corelisters.PersistentVolumeClaimLister, + pvcControl controller.PVCControlInterface, + pvLister corelisters.PersistentVolumeLister) Failover { + return &pdFailover{ + cli, + tcControl, + pdControl, + pdFailoverPeriod, + podLister, + podControl, + pvcLister, + pvcControl, + pvLister} +} + +func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error { + ns := tc.GetNamespace() + tcName := tc.GetName() + + healthCount := 0 + for _, pdMember := range tc.Status.PD.Members { + if pdMember.Health { + healthCount++ + } + } + inQuorum := healthCount > int(tc.Spec.PD.Replicas/2) + if !inQuorum { + return fmt.Errorf("TidbCluster: %s/%s's pd cluster is not health: %d/%d, can't failover", + ns, tcName, healthCount, tc.Spec.PD.Replicas) + } + + if tc.Status.PD.FailureMembers == nil { + tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{} + } + notDeletedCount := 0 + for _, failureMember := range tc.Status.PD.FailureMembers { + if !failureMember.MemberDeleted { + notDeletedCount++ + } + } + + // we can only failover one at a time + if notDeletedCount == 0 { + // mark a peer as failure + for podName, pdMember := range tc.Status.PD.Members { + if pdMember.LastTransitionTime.IsZero() { + continue + } + deadline := pdMember.LastTransitionTime.Add(pf.pdFailoverPeriod) + _, exist := tc.Status.PD.FailureMembers[podName] + if !pdMember.Health && time.Now().After(deadline) && !exist { + err := pf.markThisMemberAsFailure(tc, pdMember) + if err != nil { + return err + } + break + } + } + } + + // invoke deleteMember api to delete a member from the pd cluster + for podName, failureMember := range tc.Status.PD.FailureMembers { + if !failureMember.MemberDeleted { + err := pf.pdControl.GetPDClient(tc).DeleteMember(failureMember.PodName) + if err != nil { + return err + } + + failureMember.MemberDeleted = true + tc.Status.PD.FailureMembers[podName] = failureMember + break + } + } + + // The order of old PVC deleting and the new Pod creating is not guaranteed by Kubernetes. + // If new Pod is created before old PVC deleted, new Pod will reuse old PVC. + // So we must try to delete the PVC and Pod of this PD peer over and over, + // and let StatefulSet create the new PD peer with the same ordinal, but don't use the tombstone PV + for podName, failureMember := range tc.Status.PD.FailureMembers { + if !failureMember.MemberDeleted { + continue + } + + // increase the replicas to add a new PD peer + if failureMember.Replicas+1 > tc.Spec.PD.Replicas { + tc.Spec.PD.Replicas = failureMember.Replicas + 1 + } + + pod, err := pf.podLister.Pods(ns).Get(podName) + if err != nil && !errors.IsNotFound(err) { + return err + } + ordinal, err := getOrdinalFromPodName(podName) + if err != nil { + return err + } + pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal) + pvc, err := pf.pvcLister.PersistentVolumeClaims(ns).Get(pvcName) + if errors.IsNotFound(err) { + if pod != nil && pod.DeletionTimestamp == nil { + err := pf.podControl.DeletePod(tc, pod) + if err != nil { + return err + } + } + continue + } + if err != nil { + return err + } + + pv, err := pf.pvLister.Get(pvc.Spec.VolumeName) + if errors.IsNotFound(err) { + continue + } + if err != nil { + return err + } + + if string(pv.UID) != string(failureMember.PVUID) { + continue + } + + if pod != nil && pod.DeletionTimestamp == nil { + err := pf.podControl.DeletePod(tc, pod) + if err != nil { + return err + } + } + if pvc.DeletionTimestamp == nil { + err = pf.pvcControl.DeletePVC(tc, pvc) + if err != nil { + return err + } + } + } + + return nil +} + +func (pf *pdFailover) Recover(tc *v1alpha1.TidbCluster) { + defer func() { + tc.Status.PD.FailureMembers = nil + }() + + maxReplicas := int32(0) + minReplicas := int32(0) + for _, failureMember := range tc.Status.PD.FailureMembers { + if minReplicas == int32(0) { + minReplicas = failureMember.Replicas + } + if failureMember.Replicas > maxReplicas { + maxReplicas = failureMember.Replicas + } else if failureMember.Replicas < minReplicas { + minReplicas = failureMember.Replicas + } + } + + if maxReplicas+1 == tc.Spec.PD.Replicas { + tc.Spec.PD.Replicas = minReplicas + } +} + +func (pf *pdFailover) markThisMemberAsFailure(tc *v1alpha1.TidbCluster, pdMember v1alpha1.PDMember) error { + ns := tc.GetNamespace() + tcName := tc.GetName() + podName := pdMember.Name + + ordinal, err := getOrdinalFromPodName(podName) + if err != nil { + return err + } + pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal) + + pvc, err := pf.pvcLister.PersistentVolumeClaims(ns).Get(pvcName) + if err != nil { + return err + } + pv, err := pf.pvLister.Get(pvc.Spec.VolumeName) + if err != nil { + return err + } + if tc.Status.PD.FailureMembers == nil { + tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{} + } + tc.Status.PD.FailureMembers[podName] = v1alpha1.PDFailureMember{ + PodName: podName, + MemberID: pdMember.ID, + PVUID: pv.UID, + Replicas: tc.Spec.PD.Replicas, + MemberDeleted: false, + } + + // we must update TidbCluster immediately before delete member, or data may not be consistent + tc, err = pf.tcControl.UpdateTidbCluster(tc) + return err +} + +func getOrdinalFromPodName(podName string) (int32, error) { + ordinalStr := podName[strings.LastIndex(podName, "-")+1:] + ordinalInt, err := strconv.Atoi(ordinalStr) + if err != nil { + return int32(0), err + } + + return int32(ordinalInt), nil +} + +func allPDMembersAreReady(tc *v1alpha1.TidbCluster) bool { + if int(tc.Spec.PD.Replicas) != len(tc.Status.PD.Members) { + return false + } + + for _, member := range tc.Status.PD.Members { + if !member.Health { + return false + } + } + + return true +} + +type fakePDFailover struct{} + +// NewFakePDFailover returns a fake Failover +func NewFakePDFailover() Failover { + return &fakePDFailover{} +} + +func (fpf *fakePDFailover) Failover(tc *v1alpha1.TidbCluster) error { + return nil +} + +func (fpf *fakePDFailover) Recover(tc *v1alpha1.TidbCluster) { + return +} diff --git a/pkg/manager/member/failover_test.go b/pkg/manager/member/failover_test.go new file mode 100644 index 0000000000..f4b4833e6c --- /dev/null +++ b/pkg/manager/member/failover_test.go @@ -0,0 +1,642 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package member + +import ( + "fmt" + "strings" + "testing" + "time" + + . "github.com/onsi/gomega" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/fake" + informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions" + "github.com/pingcap/tidb-operator/pkg/controller" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +func TestPDFailoverFailover(t *testing.T) { + g := NewGomegaWithT(t) + + type testcase struct { + name string + update func(*v1alpha1.TidbCluster) + hasPVC bool + hasPV bool + hasPod bool + podWithDeletionTimestamp bool + delMemberFailed bool + delPodFailed bool + delPVCFailed bool + tcUpdateFailed bool + errExpectFn func(*GomegaWithT, error) + expectFn func(*v1alpha1.TidbCluster) + } + testFn := func(test *testcase, t *testing.T) { + t.Log(test.name) + tc := newTidbClusterForPD() + test.update(tc) + + pdFailover, pvcIndexer, pvIndexer, podIndexer, fakePDControl, fakePodControl, fakePVCControl, fakeTCControl := newFakePDFailover() + pdClient := controller.NewFakePDClient() + fakePDControl.SetPDClient(tc, pdClient) + + pdClient.AddReaction(controller.DeleteMemberActionType, func(action *controller.Action) (interface{}, error) { + if test.delMemberFailed { + return nil, fmt.Errorf("failed to delete member") + } else { + return nil, nil + } + }) + + pvc := newPVCForPDFailover(tc, v1alpha1.PDMemberType, 1) + if test.hasPVC { + pvcIndexer.Add(pvc) + } + if test.hasPV { + pv := newPVForPDFailover(pvc) + pvIndexer.Add(pv) + } + if test.hasPod { + pod := newPodForPDFailover(tc, v1alpha1.PDMemberType, 1) + if test.podWithDeletionTimestamp { + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + } + podIndexer.Add(pod) + } + if test.delPodFailed { + fakePodControl.SetDeletePodError(errors.NewInternalError(fmt.Errorf("API server failed")), 0) + } + if test.delPVCFailed { + fakePVCControl.SetDeletePVCError(errors.NewInternalError(fmt.Errorf("API server failed")), 0) + } + if test.tcUpdateFailed { + fakeTCControl.SetUpdateTidbClusterError(errors.NewInternalError(fmt.Errorf("API server failed")), 0) + } + + err := pdFailover.Failover(tc) + test.errExpectFn(g, err) + test.expectFn(tc) + } + tests := []testcase{ + { + name: "all members are ready", + update: allMembersReady, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + }, + }, + { + name: "two members are not ready", + update: twoMembersNotReady, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "pd cluster is not health")).To(Equal(true)) + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + }, + }, + { + name: "has one not ready member, but not exceed deadline", + update: func(tc *v1alpha1.TidbCluster) { + oneNotReadyMember(tc) + pd1Name := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd1 := tc.Status.PD.Members[pd1Name] + pd1.LastTransitionTime = metav1.Time{Time: time.Now().Add(-2 * time.Minute)} + tc.Status.PD.Members[pd1Name] = pd1 + }, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + }, + }, + { + name: "has one not ready member, lastTransitionTime is zero", + update: func(tc *v1alpha1.TidbCluster) { + oneNotReadyMember(tc) + pd1Name := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd1 := tc.Status.PD.Members[pd1Name] + pd1.LastTransitionTime = metav1.Time{} + tc.Status.PD.Members[pd1Name] = pd1 + }, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + }, + }, + { + name: "has one not ready member, and exceed deadline, update TidbCluster success", + update: oneNotReadyMember, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + pd1Name := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + pd1, ok := tc.Status.PD.FailureMembers[pd1Name] + g.Expect(ok).To(Equal(true)) + g.Expect(pd1.MemberDeleted).To(Equal(true)) + g.Expect(int(pd1.Replicas)).To(Equal(3)) + }, + }, + { + name: "has one not ready member, and exceed deadline, update TidbCluster failed", + update: oneNotReadyMember, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: true, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "failover ongoing")).NotTo(Equal(true)) + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + pd1Name := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + pd1, ok := tc.Status.PD.FailureMembers[pd1Name] + g.Expect(ok).To(Equal(true)) + g.Expect(pd1.MemberDeleted).To(Equal(false)) + g.Expect(int(pd1.Replicas)).To(Equal(3)) + }, + }, + { + name: "has one not ready member, and exceed deadline, don't have PVC", + update: oneNotReadyMember, + hasPVC: false, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNotNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + }, + }, + { + name: "has one not ready member, and exceed deadline, don't have PV", + update: oneNotReadyMember, + hasPVC: true, + hasPV: false, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNotNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + }, + }, + { + name: "one failure member, delete member failed", + update: oneFailureMember, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: true, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNotNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + pd1Name := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + failMember, ok := tc.Status.PD.FailureMembers[pd1Name] + g.Expect(ok).To(Equal(true)) + g.Expect(failMember.MemberDeleted).To(Equal(false)) + }, + }, + { + name: "one failure member, don't have pvc, don't have pod, increase the replicas", + update: oneFailureMember, + hasPVC: false, + hasPV: true, + hasPod: false, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure member, don't have pvc, have pod with deletetimestamp, increase the replicas", + update: oneFailureMember, + hasPVC: false, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: true, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure member, don't have pvc, have pod without deletetimestamp, delete pod success, increase the replicas", + update: oneFailureMember, + hasPVC: false, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure member, don't have pvc, have pod without deletetimestamp, delete pod failed", + update: oneFailureMember, + hasPVC: false, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: true, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNotNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure member, don't have pv, increase the replicas", + update: oneFailureMember, + hasPVC: true, + hasPV: false, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure members, pv uid changed, increase the replicas", + update: func(tc *v1alpha1.TidbCluster) { + oneFailureMember(tc) + pd1Name := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd1 := tc.Status.PD.FailureMembers[pd1Name] + pd1.PVUID = "xxx" + tc.Status.PD.FailureMembers[pd1Name] = pd1 + }, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure members, has pod but delete fail", + update: oneFailureMember, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: true, + delPVCFailed: false, + tcUpdateFailed: false, + errExpectFn: errExpectNotNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + { + name: "one failure members, pvc delete fail", + update: oneFailureMember, + hasPVC: true, + hasPV: true, + hasPod: true, + podWithDeletionTimestamp: false, + delMemberFailed: false, + delPodFailed: false, + delPVCFailed: true, + tcUpdateFailed: false, + errExpectFn: errExpectNotNil, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + }, + }, + } + + for i := range tests { + testFn(&tests[i], t) + } +} + +func TestPDFailoverRecovery(t *testing.T) { + g := NewGomegaWithT(t) + + type testcase struct { + name string + update func(*v1alpha1.TidbCluster) + expectFn func(*v1alpha1.TidbCluster) + } + testFn := func(test *testcase, t *testing.T) { + t.Log(test.name) + tc := newTidbClusterForPD() + test.update(tc) + + pdFailover, _, _, _, _, _, _, _ := newFakePDFailover() + pdFailover.Recover(tc) + test.expectFn(tc) + } + tests := []testcase{ + { + name: "two failure member, user don't modify the replicas", + update: func(tc *v1alpha1.TidbCluster) { + twoFailureMembers(tc) + tc.Spec.PD.Replicas = 5 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + { + name: "two failure member, user modify the replicas to 4", + update: func(tc *v1alpha1.TidbCluster) { + twoFailureMembers(tc) + tc.Spec.PD.Replicas = 4 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(4)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + { + name: "two failure member, user increase the replicas", + update: func(tc *v1alpha1.TidbCluster) { + twoFailureMembers(tc) + tc.Spec.PD.Replicas = 7 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(7)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + { + name: "two failure member, user decrease the replicas", + update: func(tc *v1alpha1.TidbCluster) { + twoFailureMembers(tc) + tc.Spec.PD.Replicas = 1 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(1)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + { + name: "one failure member, user don't modify the replicas", + update: func(tc *v1alpha1.TidbCluster) { + oneFailureMember(tc) + tc.Spec.PD.Replicas = 4 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + { + name: "two failure member, user increase the replicas", + update: func(tc *v1alpha1.TidbCluster) { + oneFailureMember(tc) + tc.Spec.PD.Replicas = 5 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(5)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + { + name: "two failure member, user decrease the replicas", + update: func(tc *v1alpha1.TidbCluster) { + oneFailureMember(tc) + tc.Spec.PD.Replicas = 1 + }, + expectFn: func(tc *v1alpha1.TidbCluster) { + g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(1)) + g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0)) + }, + }, + } + + for i := range tests { + testFn(&tests[i], t) + } +} + +func newFakePDFailover() (*pdFailover, cache.Indexer, cache.Indexer, cache.Indexer, *controller.FakePDControl, *controller.FakePodControl, *controller.FakePVCControl, *controller.FakeTidbClusterControl) { + cli := fake.NewSimpleClientset() + kubeCli := kubefake.NewSimpleClientset() + pdControl := controller.NewFakePDControl() + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeCli, 0) + tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters() + podInformer := kubeInformerFactory.Core().V1().Pods() + pvcInformer := kubeInformerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := kubeInformerFactory.Core().V1().PersistentVolumes() + podControl := controller.NewFakePodControl(podInformer) + pvcControl := controller.NewFakePVCControl(pvcInformer) + tcControl := controller.NewFakeTidbClusterControl(tcInformer) + + return &pdFailover{ + cli, + tcControl, + pdControl, + 5 * time.Minute, + podInformer.Lister(), + podControl, + pvcInformer.Lister(), + pvcControl, + pvInformer.Lister()}, + pvcInformer.Informer().GetIndexer(), + pvInformer.Informer().GetIndexer(), + podInformer.Informer().GetIndexer(), + pdControl, podControl, pvcControl, tcControl +} + +func oneFailureMember(tc *v1alpha1.TidbCluster) { + pd0 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 0) + pd1 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd2 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 2) + tc.Status.PD.Members = map[string]v1alpha1.PDMember{ + pd0: {Name: pd0, ID: "0", Health: true}, + pd2: {Name: pd2, ID: "2", Health: true}, + } + tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{ + pd1: {Replicas: 3, PVUID: "uid-1"}, + } +} + +func twoFailureMembers(tc *v1alpha1.TidbCluster) { + pd0 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 0) + pd1 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd2 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 2) + tc.Status.PD.Members = map[string]v1alpha1.PDMember{ + pd2: {Name: pd2, ID: "2", Health: true}, + } + tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{ + pd0: {Replicas: 3}, + pd1: {Replicas: 4}, + } +} + +func oneNotReadyMember(tc *v1alpha1.TidbCluster) { + pd0 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 0) + pd1 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd2 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 2) + tc.Status.PD.Members = map[string]v1alpha1.PDMember{ + pd0: {Name: pd0, ID: "0", Health: true}, + pd1: {Name: pd1, ID: "1", Health: false, LastTransitionTime: metav1.Time{Time: time.Now().Add(-10 * time.Minute)}}, + pd2: {Name: pd2, ID: "2", Health: true}, + } +} + +func allMembersReady(tc *v1alpha1.TidbCluster) { + pd0 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 0) + pd1 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd2 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 2) + tc.Status.PD.Members = map[string]v1alpha1.PDMember{ + pd0: {Name: pd0, ID: "0", Health: true}, + pd1: {Name: pd1, ID: "1", Health: true}, + pd2: {Name: pd2, ID: "2", Health: true}, + } +} + +func twoMembersNotReady(tc *v1alpha1.TidbCluster) { + pd0 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 0) + pd1 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 1) + pd2 := ordinalPodName(v1alpha1.PDMemberType, tc.GetName(), 2) + tc.Status.PD.Members = map[string]v1alpha1.PDMember{ + pd0: {Name: pd0, ID: "0", Health: false}, + pd1: {Name: pd1, ID: "1", Health: false}, + pd2: {Name: pd2, ID: "2", Health: true}, + } +} + +func errExpectNil(g *GomegaWithT, err error) { + g.Expect(err).NotTo(HaveOccurred()) +} + +func errExpectNotNil(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) +} + +func newPVCForPDFailover(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType, ordinal int32) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: ordinalPVCName(memberType, controller.PDMemberName(tc.GetName()), ordinal), + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: fmt.Sprintf("pv-%d", ordinal), + }, + } +} + +func newPVForPDFailover(pvc *corev1.PersistentVolumeClaim) *corev1.PersistentVolume { + return &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvc.Spec.VolumeName, + UID: "uid-1", + }, + } +} + +func newPodForPDFailover(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberType, ordinal int32) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: ordinalPodName(memberType, tc.GetName(), ordinal), + Namespace: metav1.NamespaceDefault, + }, + } +} diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index 2dd01e7136..7f83f92f95 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -35,13 +35,18 @@ import ( const defaultReplicas = 3 type pdMemberManager struct { - pdControl controller.PDControlInterface - setControl controller.StatefulSetControlInterface - svcControl controller.ServiceControlInterface - setLister v1beta1.StatefulSetLister - svcLister corelisters.ServiceLister - pdScaler Scaler - pdUpgrader Upgrader + pdControl controller.PDControlInterface + setControl controller.StatefulSetControlInterface + svcControl controller.ServiceControlInterface + setLister v1beta1.StatefulSetLister + svcLister corelisters.ServiceLister + podLister corelisters.PodLister + podControl controller.PodControlInterface + pvcLister corelisters.PersistentVolumeClaimLister + pdScaler Scaler + pdUpgrader Upgrader + autoFailover bool + pdFailover Failover } // NewPDMemberManager returns a *pdMemberManager @@ -50,16 +55,26 @@ func NewPDMemberManager(pdControl controller.PDControlInterface, svcControl controller.ServiceControlInterface, setLister v1beta1.StatefulSetLister, svcLister corelisters.ServiceLister, + podLister corelisters.PodLister, + podControl controller.PodControlInterface, + pvcLister corelisters.PersistentVolumeClaimLister, pdScaler Scaler, - pdUpgrader Upgrader) manager.Manager { + pdUpgrader Upgrader, + autoFailover bool, + pdFailover Failover) manager.Manager { return &pdMemberManager{ pdControl, setControl, svcControl, setLister, svcLister, + podLister, + podControl, + pvcLister, pdScaler, - pdUpgrader} + pdUpgrader, + autoFailover, + pdFailover} } func (pmm *pdMemberManager) Sync(tc *v1alpha1.TidbCluster) error { @@ -73,6 +88,38 @@ func (pmm *pdMemberManager) Sync(tc *v1alpha1.TidbCluster) error { return err } + // TODO Move these to the back of syncStatus + // TODO unit tests + ns := tc.GetNamespace() + tcName := tc.GetName() + podName := ordinalPodName(v1alpha1.PDMemberType, tcName, 0) + firstPod, err := pmm.podLister.Pods(ns).Get(podName) + if err != nil && !errors.IsNotFound(err) { + return err + } + if firstPod != nil { + firstPodCopy := firstPod.DeepCopy() + + if firstPodCopy.Annotations[label.Bootstrapping] == "" { + nextPVCName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), 1) + _, err := pmm.pvcLister.PersistentVolumeClaims(ns).Get(nextPVCName) + if err != nil && !errors.IsNotFound(err) { + return err + } + if errors.IsNotFound(err) { + firstPodCopy.Annotations[label.Bootstrapping] = "true" + } else { + firstPodCopy.Annotations[label.Bootstrapping] = "false" + } + firstPodCopy.Annotations[label.Replicas] = fmt.Sprintf("%d", tc.Spec.PD.Replicas) + + err = pmm.podControl.UpdatePod(tc, firstPodCopy) + if err != nil { + return err + } + } + } + // Sync PD StatefulSet return pmm.syncPDStatefulSetForTidbCluster(tc) } @@ -163,7 +210,17 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu return err } if errors.IsNotFound(err) { - newPDSet.Spec.Replicas = func() *int32 { var i int32 = 1; return &i }() + pvcName := ordinalPVCName(v1alpha1.PDMemberType, newPDSet.GetName(), *newPDSet.Spec.Replicas-int32(1)) + var newReplicas int32 = 1 + pvc, err := pmm.pvcLister.PersistentVolumeClaims(ns).Get(pvcName) + if err != nil && !errors.IsNotFound(err) { + return err + } + // start all the pd members together, if this is an old cluster + if pvc != nil { + newReplicas = *newPDSet.Spec.Replicas + } + newPDSet.Spec.Replicas = func() *int32 { var i int32 = newReplicas; return &i }() err = SetLastAppliedConfigAnnotation(newPDSet) if err != nil { return err @@ -190,13 +247,25 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu return err } } - if *newPDSet.Spec.Replicas < *oldPDSet.Spec.Replicas { if err := pmm.pdScaler.ScaleIn(tc, oldPDSet, newPDSet); err != nil { return err } } + if pmm.autoFailover { + if allPDMembersAreReady(tc) { + if tc.Status.PD.FailureMembers != nil { + pmm.pdFailover.Recover(tc) + } + } else if tc.Spec.PD.Replicas == tc.Status.PD.StatefulSet.Replicas { + if err := pmm.pdFailover.Failover(tc); err != nil { + return err + } + } + } + + // TODO FIXME equal is false every time if !statefulSetEqual(*newPDSet, *oldPDSet) { set := *oldPDSet set.Spec.Template = newPDSet.Spec.Template @@ -244,6 +313,14 @@ func (pmm *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set Health: memberHealth.Health, } + oldPDMember, exist := tc.Status.PD.Members[name] + if exist { + status.LastTransitionTime = oldPDMember.LastTransitionTime + } + if !exist || status.Health != oldPDMember.Health { + status.LastTransitionTime = metav1.Now() + } + pdStatus[name] = status } @@ -431,6 +508,10 @@ func (pmm *pdMemberManager) getNewPDSetForTidbCluster(tc *v1alpha1.TidbCluster) Name: "PEER_SERVICE_NAME", Value: controller.PDPeerMemberName(tcName), }, + { + Name: "SERVICE_NAME", + Value: controller.PDMemberName(tcName), + }, { Name: "SET_NAME", Value: setName, @@ -462,7 +543,7 @@ func (pmm *pdMemberManager) getNewPDSetForTidbCluster(tc *v1alpha1.TidbCluster) }, }, ServiceName: controller.PDPeerMemberName(tcName), - PodManagementPolicy: apps.OrderedReadyPodManagement, + PodManagementPolicy: apps.ParallelPodManagement, UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType}, }, } diff --git a/pkg/manager/member/pd_member_manager_test.go b/pkg/manager/member/pd_member_manager_test.go index 9faa2bc76e..8531b5c135 100644 --- a/pkg/manager/member/pd_member_manager_test.go +++ b/pkg/manager/member/pd_member_manager_test.go @@ -294,11 +294,10 @@ func TestPDMemberManagerSyncUpdate(t *testing.T) { expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) { g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase)) g.Expect(*tc.Status.PD.StatefulSet.ObservedGeneration).To(Equal(int64(1))) - g.Expect(tc.Status.PD.Members).To(Equal(map[string]v1alpha1.PDMember{ - "pd1": {Name: "pd1", ID: "1", ClientURL: "http://pd1:2379", Health: true}, - "pd2": {Name: "pd2", ID: "2", ClientURL: "http://pd2:2379", Health: true}, - "pd3": {Name: "pd3", ID: "3", ClientURL: "http://pd3:2379", Health: false}, - })) + g.Expect(len(tc.Status.PD.Members)).To(Equal(3)) + g.Expect(tc.Status.PD.Members["pd1"].Health).To(Equal(true)) + g.Expect(tc.Status.PD.Members["pd2"].Health).To(Equal(true)) + g.Expect(tc.Status.PD.Members["pd3"].Health).To(Equal(false)) }, }, { @@ -393,11 +392,16 @@ func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetCont kubeCli := kubefake.NewSimpleClientset() setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets() svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services() + podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods() + pvcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().PersistentVolumeClaims() tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters() setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer) svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer) + podControl := controller.NewFakePodControl(podInformer) pdControl := controller.NewFakePDControl() pdScaler := NewFakePDScaler() + autoFailover := true + pdFailover := NewFakePDFailover() pdUpgrader := NewFakePDUpgrader() return &pdMemberManager{ @@ -406,8 +410,13 @@ func newFakePDMemberManager() (*pdMemberManager, *controller.FakeStatefulSetCont svcControl, setInformer.Lister(), svcInformer.Lister(), + podInformer.Lister(), + podControl, + pvcInformer.Lister(), pdScaler, pdUpgrader, + autoFailover, + pdFailover, }, setControl, svcControl, pdControl } @@ -531,11 +540,10 @@ func TestPDMemberManagerUpgrade(t *testing.T) { }, expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) { g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase)) - g.Expect(tc.Status.PD.Members).To(Equal(map[string]v1alpha1.PDMember{ - "pd1": {Name: "pd1", ID: "1", ClientURL: "http://pd1:2379", Health: true}, - "pd2": {Name: "pd2", ID: "2", ClientURL: "http://pd2:2379", Health: true}, - "pd3": {Name: "pd3", ID: "3", ClientURL: "http://pd3:2379", Health: false}, - })) + g.Expect(len(tc.Status.PD.Members)).To(Equal(3)) + g.Expect(tc.Status.PD.Members["pd1"].Health).To(Equal(true)) + g.Expect(tc.Status.PD.Members["pd2"].Health).To(Equal(true)) + g.Expect(tc.Status.PD.Members["pd3"].Health).To(Equal(false)) }, }, } diff --git a/pkg/manager/member/pd_scaler.go b/pkg/manager/member/pd_scaler.go index addca65593..a56a9b915d 100644 --- a/pkg/manager/member/pd_scaler.go +++ b/pkg/manager/member/pd_scaler.go @@ -51,14 +51,20 @@ func (psd *pdScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet return err } - var i int32 - for ; i < *oldSet.Spec.Replicas; i++ { + var i int32 = 0 + healthCount := 0 + totalCount := *oldSet.Spec.Replicas + for ; i < totalCount; i++ { podName := ordinalPodName(v1alpha1.PDMemberType, tcName, i) - if member, ok := tc.Status.PD.Members[podName]; !ok || !member.Health { - resetReplicas(newSet, oldSet) - return fmt.Errorf("%s/%s is not ready, can't scale out now", ns, podName) + if member, ok := tc.Status.PD.Members[podName]; ok && member.Health { + healthCount++ } } + if healthCount < int(totalCount/2+1) { + resetReplicas(newSet, oldSet) + return fmt.Errorf("TidbCluster: %s/%s's pd %d/%d is not ready, can't scale out now", + ns, tcName, healthCount, totalCount) + } increaseReplicas(newSet, oldSet) return nil diff --git a/pkg/manager/meta/meta_manager.go b/pkg/manager/meta/meta_manager.go index 5190d99c72..9a9bc8e584 100644 --- a/pkg/manager/meta/meta_manager.go +++ b/pkg/manager/meta/meta_manager.go @@ -73,9 +73,9 @@ func (pmm *metaManager) Sync(tc *v1alpha1.TidbCluster) error { return err } // update meta info for pvc - pvc, err := pmm.resolvePVCFromPod(pod) - if err != nil { - return err + pvc, _ := pmm.resolvePVCFromPod(pod) + if pvc == nil { + return nil } err = pmm.pvcControl.UpdateMetaInfo(tc, pvc, pod) if err != nil { @@ -104,6 +104,10 @@ func (pmm *metaManager) resolvePVCFromPod(pod *corev1.Pod) (*corev1.PersistentVo pvcName = vol.PersistentVolumeClaim.ClaimName break } + case v1alpha1.TiDBMemberType.String(): + return nil, nil + default: + return nil, nil } } if len(pvcName) == 0 { diff --git a/tests/e2e/scale.go b/tests/e2e/scale.go index def5b87e1d..a9977a3513 100644 --- a/tests/e2e/scale.go +++ b/tests/e2e/scale.go @@ -1,3 +1,16 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License.package spec + package e2e import ( diff --git a/tests/e2e/upgrade.go b/tests/e2e/upgrade.go index 8d4c556368..d29d9b83f5 100644 --- a/tests/e2e/upgrade.go +++ b/tests/e2e/upgrade.go @@ -1,3 +1,16 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License.package spec + package e2e import (