Skip to content

Commit

Permalink
service etcd client disable unready client
Browse files Browse the repository at this point in the history
  • Loading branch information
wanyaoqi committed Jul 1, 2020
1 parent 9614bd7 commit 052dca4
Showing 1 changed file with 83 additions and 2 deletions.
85 changes: 83 additions & 2 deletions pkg/manager/component/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"path"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -26,6 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"

"yunion.io/x/onecloud-operator/pkg/apis/constants"
"yunion.io/x/onecloud-operator/pkg/apis/onecloud/v1alpha1"
Expand Down Expand Up @@ -570,12 +573,90 @@ func (m *etcdManager) reportFailedStatus() {
}

func (m *etcdManager) setupServices() error {
err := k8sutil.CreateClientService(m.kubeCli, m.getEtcdClusterPrefix(), m.oc.Namespace, controller.GetOwnerRef(m.oc))
err := CreateClientService(m.kubeCli, m.getEtcdClusterPrefix(), m.oc.Namespace, controller.GetOwnerRef(m.oc))
if err != nil {
return err
}

return k8sutil.CreatePeerService(m.kubeCli, m.getEtcdClusterPrefix(), m.oc.Namespace, controller.GetOwnerRef(m.oc))
return CreatePeerService(m.kubeCli, m.getEtcdClusterPrefix(), m.oc.Namespace, controller.GetOwnerRef(m.oc))
}

func CreateClientService(kubecli kubernetes.Interface, clusterName, ns string, owner metav1.OwnerReference) error {
ports := []corev1.ServicePort{{
Name: "client",
Port: constants.EtcdClientPort,
TargetPort: intstr.FromInt(constants.EtcdClientPort),
Protocol: corev1.ProtocolTCP,
}}
return createService(kubecli, ClientServiceName(clusterName), clusterName, ns, "", ports, owner, false)
}

func ClientServiceName(clusterName string) string {
return clusterName + "-client"
}

func CreatePeerService(kubecli kubernetes.Interface, clusterName, ns string, owner metav1.OwnerReference) error {
ports := []corev1.ServicePort{{
Name: "client",
Port: constants.EtcdClientPort,
TargetPort: intstr.FromInt(constants.EtcdClientPort),
Protocol: corev1.ProtocolTCP,
}, {
Name: "peer",
Port: 2380,
TargetPort: intstr.FromInt(2380),
Protocol: corev1.ProtocolTCP,
}}

return createService(kubecli, clusterName, clusterName, ns, corev1.ClusterIPNone, ports, owner, true)
}

func createService(
kubecli kubernetes.Interface, svcName, clusterName, ns, clusterIP string,
ports []corev1.ServicePort, owner metav1.OwnerReference, tolerateUnreadyEndpoints bool,
) error {
svc := newEtcdServiceManifest(svcName, clusterName, clusterIP, ports, tolerateUnreadyEndpoints)
o := svc.GetObjectMeta()
o.SetOwnerReferences(append(o.GetOwnerReferences(), owner))

oldSvc, err := kubecli.CoreV1().Services(ns).Get(svcName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
} else if err == nil {
if !reflect.DeepEqual(oldSvc.Annotations, svc.Annotations) {
oldSvc.Annotations = svc.Annotations
_, err = kubecli.CoreV1().Services(ns).Update(oldSvc)
return err
}
}
_, err = kubecli.CoreV1().Services(ns).Create(svc)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}

func newEtcdServiceManifest(
svcName, clusterName, clusterIP string, ports []corev1.ServicePort, tolerateUnreadyEndpoints bool,
) *corev1.Service {
labels := k8sutil.LabelsForCluster(clusterName)
annotations := map[string]string{}
if tolerateUnreadyEndpoints {
annotations[k8sutil.TolerateUnreadyEndpointsAnnotation] = "true"
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Labels: labels,
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Ports: ports,
Selector: labels,
ClusterIP: clusterIP,
},
}
return svc
}

func (m *etcdManager) run() {
Expand Down

0 comments on commit 052dca4

Please sign in to comment.