Skip to content

Commit

Permalink
gh-6 support for ExternalTrafficPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder committed May 26, 2023
1 parent 98b201d commit a30a312
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return err
}

klog.Infof("Endpoint IP Pairs %v", endpointIPs)

cacheKey := GenKey(svc.Namespace, svc.Name)
_, added := m.lbCache[cacheKey]
if !added {
Expand Down Expand Up @@ -469,9 +471,14 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {
// If false, return worker nodes IP list.
func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool) ([]string, error) {
if podEP {
klog.Infof("getEndpoints: Pod end-points")
return m.getMultusEndpoints(svc)
}

if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
klog.Infof("getEndpoints: Traffic Policy Local")
return k8s.GetServiceLocalEndpoints(m.kubeClient, svc)
}
return m.getNodeEndpoints()
}

Expand All @@ -492,6 +499,17 @@ func (m *Manager) getNodeEndpoints() ([]string, error) {
return m.getEndpointsForLB(nodes), nil
}

// getLocalEndpoints returns the IP list of the Pods connected to the multus network.
func (m *Manager) getLocalEndpoints(svc *corev1.Service) ([]string, error) {
netListStr, ok := svc.Annotations[LoxiMultusServiceAnnotation]
if !ok {
return nil, errors.New("not found multus annotations")
}
netList := strings.Split(netListStr, ",")

return k8s.GetMultusEndpoints(m.kubeClient, svc, netList)
}

// getMultusEndpoints returns the IP list of the Pods connected to the multus network.
func (m *Manager) getMultusEndpoints(svc *corev1.Service) ([]string, error) {
netListStr, ok := svc.Annotations[LoxiMultusServiceAnnotation]
Expand Down
36 changes: 36 additions & 0 deletions pkg/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package k8s

import (
"context"
"errors"
"fmt"
"net"
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
)

// GetNodeAddr gets the available IP address of a Node.
Expand All @@ -44,3 +51,32 @@ func GetNodeAddr(node *v1.Node) (net.IP, error) {
}
return ipAddr, nil
}

// GetServiceLocalEndpoints - Get HostIPs of pods belonging to the given service
func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service) ([]string, error) {
var epList []string
var epMap map[string]struct{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

selectorLabelStr := labels.Set(svc.Spec.Selector).String()
podList, err := kubeClient.CoreV1().Pods(svc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selectorLabelStr})
if err != nil {
return epList, err
}

for _, pod := range podList.Items {
if pod.Status.HostIP != "" {
if _, found := epMap[pod.Status.HostIP]; !found {
epList = append(epList, pod.Status.HostIP)
} else {
epMap[pod.Status.HostIP] = struct{}{}
}
}
}
if len(epList) <= 0 {
return epList, errors.New("no active endpoints")
}
return epList, nil
}

0 comments on commit a30a312

Please sign in to comment.