From a30a31263aa2f50a31bc14f96881353570b33a9e Mon Sep 17 00:00:00 2001 From: Trekkie Coder Date: Fri, 26 May 2023 15:29:15 +0900 Subject: [PATCH] gh-6 support for ExternalTrafficPolicy --- .../manager/loadbalancer/loadbalancer.go | 18 ++++++++++ pkg/k8s/node.go | 36 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/pkg/agent/manager/loadbalancer/loadbalancer.go b/pkg/agent/manager/loadbalancer/loadbalancer.go index 5e4425b..2551502 100644 --- a/pkg/agent/manager/loadbalancer/loadbalancer.go +++ b/pkg/agent/manager/loadbalancer/loadbalancer.go @@ -271,6 +271,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { return err } + klog.Infof("Endpoint IP Pairs %v", endpointIPs) + cacheKey := GenKey(svc.Namespace, svc.Name) _, added := m.lbCache[cacheKey] if !added { @@ -469,9 +471,14 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error { // If false, return worker nodes IP list. func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool) ([]string, error) { if podEP { + klog.Infof("getEndpoints: Pod end-points") return m.getMultusEndpoints(svc) } + if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + klog.Infof("getEndpoints: Traffic Policy Local") + return k8s.GetServiceLocalEndpoints(m.kubeClient, svc) + } return m.getNodeEndpoints() } @@ -492,6 +499,17 @@ func (m *Manager) getNodeEndpoints() ([]string, error) { return m.getEndpointsForLB(nodes), nil } +// getLocalEndpoints returns the IP list of the Pods connected to the multus network. +func (m *Manager) getLocalEndpoints(svc *corev1.Service) ([]string, error) { + netListStr, ok := svc.Annotations[LoxiMultusServiceAnnotation] + if !ok { + return nil, errors.New("not found multus annotations") + } + netList := strings.Split(netListStr, ",") + + return k8s.GetMultusEndpoints(m.kubeClient, svc, netList) +} + // getMultusEndpoints returns the IP list of the Pods connected to the multus network. func (m *Manager) getMultusEndpoints(svc *corev1.Service) ([]string, error) { netListStr, ok := svc.Annotations[LoxiMultusServiceAnnotation] diff --git a/pkg/k8s/node.go b/pkg/k8s/node.go index fdf591e..22c7fde 100644 --- a/pkg/k8s/node.go +++ b/pkg/k8s/node.go @@ -17,10 +17,17 @@ package k8s import ( + "context" + "errors" "fmt" "net" + "time" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + clientset "k8s.io/client-go/kubernetes" ) // GetNodeAddr gets the available IP address of a Node. @@ -44,3 +51,32 @@ func GetNodeAddr(node *v1.Node) (net.IP, error) { } return ipAddr, nil } + +// GetServiceLocalEndpoints - Get HostIPs of pods belonging to the given service +func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service) ([]string, error) { + var epList []string + var epMap map[string]struct{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + selectorLabelStr := labels.Set(svc.Spec.Selector).String() + podList, err := kubeClient.CoreV1().Pods(svc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selectorLabelStr}) + if err != nil { + return epList, err + } + + for _, pod := range podList.Items { + if pod.Status.HostIP != "" { + if _, found := epMap[pod.Status.HostIP]; !found { + epList = append(epList, pod.Status.HostIP) + } else { + epMap[pod.Status.HostIP] = struct{}{} + } + } + } + if len(epList) <= 0 { + return epList, errors.New("no active endpoints") + } + return epList, nil +}