Skip to content

Commit

Permalink
Merge pull request #5 from loxilb-io/multi-homing
Browse files Browse the repository at this point in the history
PR - SCTP Multi homing changes
  • Loading branch information
TrekkieCoder authored May 25, 2023
2 parents f146e65 + fac614a commit 073de11
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 16 deletions.
28 changes: 24 additions & 4 deletions cmd/loxilb-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package main
import (
"fmt"
"kube-loxilb/pkg/agent/config"
"kube-loxilb/pkg/agent/manager/loadbalancer"
"kube-loxilb/pkg/api"
"kube-loxilb/pkg/ippool"
"kube-loxilb/pkg/k8s"
"kube-loxilb/pkg/log"
"os"
"os/signal"
"strings"
"syscall"
"time"

"kube-loxilb/pkg/agent/manager/loadbalancer"
"kube-loxilb/pkg/k8s"
"kube-loxilb/pkg/log"

"k8s.io/client-go/informers"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -89,6 +89,25 @@ func run(o *Options) error {
return err
}

var sipPools []*ippool.IPPool
if o.config.ExternalSecondaryCIDRs != "" {
CIDRs := strings.Split(o.config.ExternalSecondaryCIDRs, ",")
if len(CIDRs) <= 0 && len(CIDRs) > 4 {
return fmt.Errorf("externalSecondaryCIDR %s config is invalid", o.config.ExternalSecondaryCIDRs)
}

for _, CIDR := range CIDRs {
ipPool, err := ippool.NewIPPool(tk.IpAllocatorNew(), CIDR, !o.config.ExclIPAM)
if err != nil {
klog.Errorf("failed to create external secondary IP Pool (CIDR: %s)", CIDR)
return err
}

networkConfig.ExternalSecondaryCIDRs = append(networkConfig.ExternalSecondaryCIDRs, CIDR)
sipPools = append(sipPools, ipPool)
}
}

loxiAliveCh := make(chan *api.LoxiClient)
var loxilbClients []*api.LoxiClient
for _, lbURL := range networkConfig.LoxilbURLs {
Expand All @@ -104,6 +123,7 @@ func run(o *Options) error {
k8sClient,
loxilbClients,
ipPool,
sipPools,
networkConfig,
informerFactory,
)
Expand Down
2 changes: 2 additions & 0 deletions cmd/loxilb-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type AgentConfig struct {
LoxilbLoadBalancerClass string `yaml:"loxilbLoadBalancerClass,omitempty"`
// support LoadBalancer external IP
ExternalCIDR string `yaml:"externalCIDR,omitempty"`
// support LoadBalancer external secondary IP. This is a comma separated list
ExternalSecondaryCIDRs string `yaml:"ExternalSecondaryCIDRs,omitempty"`
// support BGP protocol
SetBGP bool `yaml:"setBGP,omitempty"`
// loxilb loadbalancer mode
Expand Down
14 changes: 14 additions & 0 deletions cmd/loxilb-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (o *Options) addFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.configFile, "config", o.configFile, "The path to the configuration file")
fs.StringVar(&loxiURLFlag, "loxiURL", loxiURLFlag, "loxilb API server URL(s)")
fs.StringVar(&o.config.ExternalCIDR, "externalCIDR", o.config.ExternalCIDR, "External CIDR Range")
fs.StringVar(&o.config.ExternalSecondaryCIDRs, "externalSecondaryCIDRs", o.config.ExternalCIDR, "External Secondary CIDR Range(s)")
fs.StringVar(&o.config.LoxilbLoadBalancerClass, "loxilbLoadBalancerClass", o.config.LoxilbLoadBalancerClass, "Load-Balancer Class Name")
fs.BoolVar(&o.config.SetBGP, "setBGP", o.config.SetBGP, "Use BGP routing")
fs.BoolVar(&o.config.ExclIPAM, "setUniqueIP", o.config.ExclIPAM, "Use unique IPAM per service")
Expand Down Expand Up @@ -88,6 +89,19 @@ func (o *Options) validate(args []string) error {
}
}

if o.config.ExternalSecondaryCIDRs != "" {
CIDRs := strings.Split(o.config.ExternalSecondaryCIDRs, ",")
if len(CIDRs) <= 0 && len(CIDRs) > 4 {
return fmt.Errorf("externalSecondaryCIDR %s config is invalid", o.config.ExternalSecondaryCIDRs)
}

for _, CIDR := range CIDRs {
if _, _, err := net.ParseCIDR(CIDR); err != nil {
return fmt.Errorf("externalSecondaryCIDR %s config is invalid", CIDR)
}
}
}

if o.config.LoxilbLoadBalancerClass != "" {
if ok := strings.Contains(o.config.LoxilbLoadBalancerClass, "/"); !ok {
return fmt.Errorf("loxilbLoadBalancerClass must be a label-style identifier")
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type NetworkConfig struct {
LoxilbURLs []string
LoxilbLoadBalancerClass string
ExternalCIDR string
ExternalSecondaryCIDRs []string
SetBGP bool
SetLBMode uint16
Monitor bool
Expand Down
100 changes: 90 additions & 10 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"context"
"errors"
"fmt"
"net"
"path"
"reflect"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -38,6 +32,12 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"net"
"path"
"reflect"
"strconv"
"strings"
"time"

"kube-loxilb/pkg/agent/config"
"kube-loxilb/pkg/api"
Expand All @@ -53,6 +53,7 @@ const (
defaultWorkers = 4
LoxiMaxWeight = 10
LoxiMultusServiceAnnotation = "loxilb.io/multus-nets"
numSecIPAnnotation = "loxilb.io/num-secondary-networks"
)

type Manager struct {
Expand All @@ -66,13 +67,15 @@ type Manager struct {
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
ExternalIPPool *ippool.IPPool
ExtSecondaryIPPools []*ippool.IPPool

queue workqueue.RateLimitingInterface
lbCache LbCacheTable
}

type LbCacheEntry struct {
State string
SecIPs []string
LbModelList []api.LoadBalancerModel
}

Expand Down Expand Up @@ -100,6 +103,7 @@ func NewLoadBalancerManager(
kubeClient clientset.Interface,
loxiClients []*api.LoxiClient,
externalIPPool *ippool.IPPool,
externalSecondaryIPPools []*ippool.IPPool,
networkConfig *config.NetworkConfig,
informerFactory informers.SharedInformerFactory) *Manager {

Expand All @@ -109,6 +113,7 @@ func NewLoadBalancerManager(
kubeClient: kubeClient,
loxiClients: loxiClients,
ExternalIPPool: externalIPPool,
ExtSecondaryIPPools: externalSecondaryIPPools,
networkConfig: networkConfig,
serviceInformer: serviceInformer,
serviceLister: serviceInformer.Lister(),
Expand Down Expand Up @@ -234,10 +239,22 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return nil
}

numSecondarySvc := 0

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 {
return nil
}

// Check for loxilb specific annotations
if na := svc.Annotations[numSecIPAnnotation]; na != "" {
num, err := strconv.Atoi(na)
if err != nil {
numSecondarySvc = 0
} else {
numSecondarySvc = num
}
}

// Check for loxilb specific annotation
_, needPodEP := svc.Annotations[LoxiMultusServiceAnnotation]
endpointIPs, err := m.getEndpoints(svc, needPodEP)
Expand All @@ -250,7 +267,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
if !added {
//c.lbCache[cacheKey] = make([]api.LoadBalancerModel, 0)
m.lbCache[cacheKey] = &LbCacheEntry{
State: "Added",
State: "Added",
SecIPs: []string{},
}
}

Expand All @@ -262,6 +280,18 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return err
}

if !added {
ingSecSvcPairs, err := m.getIngressSecSvcPairs(svc, numSecondarySvc)
if err != nil {
return err
}
klog.Infof("Secondary IP Pairs %v", ingSecSvcPairs)

for _, ingSecSvcPair := range ingSecSvcPairs {
m.lbCache[cacheKey].SecIPs = append(m.lbCache[cacheKey].SecIPs, ingSecSvcPair.IPString)
}
}

update := false
if len(m.lbCache[cacheKey].LbModelList) <= 0 {
update = true
Expand Down Expand Up @@ -316,7 +346,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
var errChList []chan error
var lbModelList []api.LoadBalancerModel
for _, port := range svc.Spec.Ports {
lbModel, err := m.makeLoxiLoadBalancerModel(ingSvcPair.IPString, svc, port, endpointIPs, needPodEP)
lbModel, err := m.makeLoxiLoadBalancerModel(ingSvcPair.IPString, m.lbCache[cacheKey].SecIPs, svc, port, endpointIPs, needPodEP)
if err != nil {
return err
}
Expand Down Expand Up @@ -409,6 +439,11 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {
return fmt.Errorf("failed to delete loxiLB LoadBalancer")
}
m.ExternalIPPool.ReturnIPAddr(lb.Service.ExternalIP, uint32(lb.Service.Port), lb.Service.Protocol)
for idx, ingSecIP := range lbEntry.SecIPs {
if idx < len(m.ExtSecondaryIPPools) {
m.ExtSecondaryIPPools[idx].ReturnIPAddr(ingSecIP, uint32(lb.Service.Port), lb.Service.Protocol)
}
}
}

delete(m.lbCache, cacheKey)
Expand Down Expand Up @@ -544,6 +579,43 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service) ([]SvcPair, error)
return sPairs, nil
}

// getIngressSecSvcPairs returns a set of secondary IPs
func (m *Manager) getIngressSecSvcPairs(service *corev1.Service, numSecondary int) ([]SvcPair, error) {
var sPairs []SvcPair

if len(m.ExtSecondaryIPPools) < numSecondary {
klog.Errorf("failed to generate external secondary IP. No IP pools")
return sPairs, errors.New("failed to generate external secondary IP. No IP pools")
}

for i := 0; i < numSecondary; i++ {
for _, port := range service.Spec.Ports {
pool := m.ExtSecondaryIPPools[i]
proto := strings.ToLower(string(port.Protocol))
portNum := port.Port
newIP := pool.GetNewIPAddr(uint32(portNum), proto)
if newIP == nil {
// This is a safety code in case the service has the same port.
for _, s := range sPairs {
if s.Port == portNum && s.Protocol == proto {
continue
}
}
for j := 0; j < i; j++ {
rpool := m.ExtSecondaryIPPools[j]
rpool.ReturnIPAddr(sPairs[j].IPString, uint32(portNum), proto)
}
klog.Errorf("failed to generate external secondary IP. IP Pool is full")
return nil, errors.New("failed to generate external secondary IP. IP Pool is full")
}
sp := SvcPair{newIP.String(), portNum, proto}
sPairs = append(sPairs, sp)
}
}

return sPairs, nil
}

func (m *Manager) getLoadBalancerServiceIngressIPs(service *corev1.Service) []string {
var ips []string
for _, ingress := range service.Status.LoadBalancer.Ingress {
Expand All @@ -553,8 +625,9 @@ func (m *Manager) getLoadBalancerServiceIngressIPs(service *corev1.Service) []st
return ips
}

func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Service, port corev1.ServicePort, endpointIPs []string, needPodEP bool) (api.LoadBalancerModel, error) {
func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, secIPs []string, svc *corev1.Service, port corev1.ServicePort, endpointIPs []string, needPodEP bool) (api.LoadBalancerModel, error) {
loxiEndpointModelList := []api.LoadBalancerEndpoint{}
loxiSecIPModelList := []api.LoadBalancerSecIp{}

if len(endpointIPs) > 0 {
endpointWeight := uint8(LoxiMaxWeight / len(endpointIPs))
Expand Down Expand Up @@ -584,6 +657,12 @@ func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Servi
}
}

if len(secIPs) > 0 {
for _, secIP := range secIPs {
loxiSecIPModelList = append(loxiSecIPModelList, api.LoadBalancerSecIp{SecondaryIP: secIP})
}
}

return api.LoadBalancerModel{
Service: api.LoadBalancerService{
ExternalIP: externalIP,
Expand All @@ -593,7 +672,8 @@ func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Servi
Mode: api.LbMode(m.networkConfig.SetLBMode),
Monitor: m.networkConfig.Monitor,
},
Endpoints: loxiEndpointModelList,
SecondaryIPs: loxiSecIPModelList,
Endpoints: loxiEndpointModelList,
}, nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/api/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ func (lbListModel *LoadBalancerListModel) GetKeyStruct() LoxiModel {
}

type LoadBalancerModel struct {
Service LoadBalancerService `json:"serviceArguments"`
Endpoints []LoadBalancerEndpoint `json:"endpoints"`
Service LoadBalancerService `json:"serviceArguments"`
SecondaryIPs []LoadBalancerSecIp `json:"secondaryIPs"`
Endpoints []LoadBalancerEndpoint `json:"endpoints"`
}

func (lbModel *LoadBalancerModel) GetKeyStruct() LoxiModel {
Expand Down Expand Up @@ -47,6 +48,10 @@ type LoadBalancerEndpoint struct {
State string `json:"state"`
}

type LoadBalancerSecIp struct {
SecondaryIP string `json:"secondaryIP"`
}

type LoadBalancerAPI struct {
resource string
provider string
Expand Down

0 comments on commit 073de11

Please sign in to comment.