Skip to content

Commit

Permalink
Fixed targetPort for multus when port has no Int value
Browse files Browse the repository at this point in the history
  • Loading branch information
backguynn committed May 18, 2023
1 parent 0579161 commit 2855567
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 58 deletions.
75 changes: 17 additions & 58 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
var errChList []chan error
var lbModelList []api.LoadBalancerModel
for _, port := range svc.Spec.Ports {
lbModel := m.makeLoxiLoadBalancerModel(ingSvcPair.IPString, port, endpointIPs, needPodEP)
lbModel, err := m.makeLoxiLoadBalancerModel(ingSvcPair.IPString, svc, port, endpointIPs, needPodEP)
if err != nil {
return err
}
lbModelList = append(lbModelList, lbModel)
}

Expand Down Expand Up @@ -412,6 +415,9 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {
return nil
}

// getEndpoints return LB's endpoints IP list.
// If podEP is true, return multus endpoints list.
// If false, return worker nodes IP list.
func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool) ([]string, error) {
if podEP {
return m.getMultusEndpoints(svc)
Expand All @@ -420,6 +426,7 @@ func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool) ([]string, error
return m.getNodeEndpoints()
}

// getNodeEndpoints returns the IP list of nodes available as nodePort service.
func (m *Manager) getNodeEndpoints() ([]string, error) {
req, err := labels.NewRequirement("node.kubernetes.io/exclude-from-external-load-balancers", selection.DoesNotExist, []string{})
if err != nil {
Expand All @@ -436,67 +443,15 @@ func (m *Manager) getNodeEndpoints() ([]string, error) {
return m.getEndpointsForLB(nodes), nil
}

// getMultusEndpoints returns the IP list of the Pods connected to the multus network.
func (m *Manager) getMultusEndpoints(svc *corev1.Service) ([]string, error) {
var epList []string

netListStr, ok := svc.Annotations[LoxiMultusServiceAnnotation]
if !ok {
return nil, errors.New("not found multus annotations")
}
netList := strings.Split(netListStr, ",")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

selectorLabelStr := labels.Set(svc.Spec.Selector).String()
podList, err := m.kubeClient.CoreV1().Pods(svc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selectorLabelStr})
if err != nil {
return epList, err
}

contain := func(strList []string, s string) bool {
for _, str := range strList {
if str == s {
return true
}
}
return false
}

for _, pod := range podList.Items {
multusNetworkListStr, ok := pod.Annotations["k8s.v1.cni.cncf.io/networks"]
if !ok {
continue
}

networkStatusListStr, ok := pod.Annotations["k8s.v1.cni.cncf.io/networks-status"]
if !ok {
return epList, errors.New("net found k8s.v1.cni.cncf.io/networks-status annotation")
}

networkStatusList, err := k8s.UnmarshalNetworkStatus(networkStatusListStr)
if err != nil {
return epList, err
}

multusNetworkList := strings.Split(multusNetworkListStr, ",")
for _, mNet := range multusNetworkList {
if !contain(netList, mNet) {
continue
}

netName := k8s.GetMultusNetworkName(pod.Namespace, mNet)
for _, ns := range networkStatusList {
if ns.Name == netName {
if len(ns.Ips) > 0 {
epList = append(epList, ns.Ips...)
}
}
}
}
}

return epList, nil
return k8s.GetMultusEndpoints(m.kubeClient, svc, netList)
}

func (m *Manager) getNodeAddress(node corev1.Node) (string, error) {
Expand Down Expand Up @@ -598,7 +553,7 @@ func (m *Manager) getLoadBalancerServiceIngressIPs(service *corev1.Service) []st
return ips
}

func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, port corev1.ServicePort, endpointIPs []string, needPodEP bool) api.LoadBalancerModel {
func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Service, port corev1.ServicePort, endpointIPs []string, needPodEP bool) (api.LoadBalancerModel, error) {
loxiEndpointModelList := []api.LoadBalancerEndpoint{}

if len(endpointIPs) > 0 {
Expand All @@ -614,7 +569,11 @@ func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, port corev1.Servi

tport := uint16(port.NodePort)
if needPodEP {
tport = uint16(port.TargetPort.IntVal)
portNum, err := k8s.GetServicePortIntValue(m.kubeClient, svc, port)
if err != nil {
return api.LoadBalancerModel{}, err
}
tport = uint16(portNum)
}

loxiEndpointModelList = append(loxiEndpointModelList, api.LoadBalancerEndpoint{
Expand All @@ -635,7 +594,7 @@ func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, port corev1.Servi
Monitor: m.networkConfig.Monitor,
},
Endpoints: loxiEndpointModelList,
}
}, nil
}

func (m *Manager) addIngress(service *corev1.Service, newIP net.IP) {
Expand Down
65 changes: 65 additions & 0 deletions pkg/k8s/multus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@
package k8s

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
)

type dnsIf interface{}
Expand Down Expand Up @@ -61,3 +69,60 @@ func GetMultusNetworkStatus(ns, name string) (networkStatus, error) {

return networkStatus{}, fmt.Errorf("not found %s network", name)
}

func GetMultusEndpoints(kubeClient clientset.Interface, svc *corev1.Service, netList []string) ([]string, error) {
var epList []string

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

selectorLabelStr := labels.Set(svc.Spec.Selector).String()
podList, err := kubeClient.CoreV1().Pods(svc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selectorLabelStr})
if err != nil {
return epList, err
}

contain := func(strList []string, s string) bool {
for _, str := range strList {
if str == s {
return true
}
}
return false
}

for _, pod := range podList.Items {
multusNetworkListStr, ok := pod.Annotations["k8s.v1.cni.cncf.io/networks"]
if !ok {
continue
}

networkStatusListStr, ok := pod.Annotations["k8s.v1.cni.cncf.io/networks-status"]
if !ok {
return epList, errors.New("net found k8s.v1.cni.cncf.io/networks-status annotation")
}

networkStatusList, err := UnmarshalNetworkStatus(networkStatusListStr)
if err != nil {
return epList, err
}

multusNetworkList := strings.Split(multusNetworkListStr, ",")
for _, mNet := range multusNetworkList {
if !contain(netList, mNet) {
continue
}

netName := GetMultusNetworkName(pod.Namespace, mNet)
for _, ns := range networkStatusList {
if ns.Name == netName {
if len(ns.Ips) > 0 {
epList = append(epList, ns.Ips...)
}
}
}
}
}

return epList, nil
}
55 changes: 55 additions & 0 deletions pkg/k8s/servicePort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2022 NetLOX Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package k8s

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
)

func GetServicePortIntValue(kubeClient clientset.Interface, svc *corev1.Service, port corev1.ServicePort) (int, error) {
if port.TargetPort.IntValue() != 0 {
return port.TargetPort.IntValue(), nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

selectorLabelStr := labels.Set(svc.Spec.Selector).String()
podList, err := kubeClient.CoreV1().Pods(svc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selectorLabelStr})
if err != nil {
return 0, err
}

for _, pod := range podList.Items {
for _, c := range pod.Spec.Containers {
for _, p := range c.Ports {
if p.Name == port.TargetPort.String() {
return int(p.ContainerPort), nil
}
}
}
}

return 0, fmt.Errorf("not found port name %s in service %s", port.TargetPort.String(), svc.Name)
}

0 comments on commit 2855567

Please sign in to comment.