Skip to content

Commit

Permalink
Revisited TrafficPolicyTypeLocal implementation for incluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
TrekkieCoder committed Jan 21, 2024
1 parent 933e963 commit 02512fe
Showing 1 changed file with 39 additions and 23 deletions.
62 changes: 39 additions & 23 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type LbCacheEntry struct {
LbMode int
Timeout int
ActCheck bool
PrefLocal bool
Addr string
State string
ProbeType string
Expand Down Expand Up @@ -304,6 +305,10 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
probePort := 0
probeReq := ""
probeResp := ""
prefLocal := false
if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
prefLocal = true
}

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 {
return nil
Expand Down Expand Up @@ -447,6 +452,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
m.lbCache[cacheKey] = &LbCacheEntry{
LbMode: lbMode,
ActCheck: livenessCheck,
PrefLocal: prefLocal,
Timeout: timeout,
State: "Added",
ProbeType: probeType,
Expand Down Expand Up @@ -690,17 +696,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
ch := make(chan error)
go func(c *api.LoxiClient, h chan error) {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for _, lb := range lbModelList {
if err = c.LoadBalancer().Create(ctx, &lb.LbModel); err != nil {
if !strings.Contains(err.Error(), "exist") {
klog.Errorf("failed to create load-balancer(%s) :%v", c.Url, err)
break
} else {
err = nil
}
}
err = m.installLB(c, lb, m.lbCache[cacheKey].PrefLocal)
}
h <- err
}(client, ch)
Expand Down Expand Up @@ -831,6 +828,36 @@ func (m *Manager) DeleteAllLoadBalancer() {
m.lbCache = nil
}

func (m *Manager) installLB(c *api.LoxiClient, lb LbModelEnt, prefLocal bool) error {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Model := lb.LbModel
model := &Model

// Optimization for local Preference
if prefLocal {
model.Endpoints = nil
for _, ep := range lb.LbModel.Endpoints {
if ep.EndpointIP == c.Host {
model.Endpoints = append(model.Endpoints, ep)
}
}
if len(model.Endpoints) <= 0 {
model.Endpoints = lb.LbModel.Endpoints
}
}
if err = c.LoadBalancer().Create(ctx, model); err != nil {
if !strings.Contains(err.Error(), "exist") {
klog.Errorf("failed to create load-balancer(%s) :%v", c.Url, err)
} else {
err = nil
}
}

return err
}

// getEndpoints return LB's endpoints IP list.
// If podEP is true, return multus endpoints list.
// If false, return worker nodes IP list.
Expand Down Expand Up @@ -1136,15 +1163,8 @@ func (m *Manager) makeLoxiLoadBalancerModel(lbArgs *LbArgs, svc *corev1.Service,
lbModeSvc := api.LbMode(m.networkConfig.SetLBMode)

if len(lbArgs.endpointIPs) > 0 {
endpointWeight := uint8(LoxiMaxWeight / len(lbArgs.endpointIPs))
remainderWeight := uint8(LoxiMaxWeight % len(lbArgs.endpointIPs))

for _, endpoint := range lbArgs.endpointIPs {
weight := endpointWeight
if remainderWeight > 0 {
weight++
remainderWeight--
}

tport := uint16(port.NodePort)
if lbArgs.needPodEP {
Expand All @@ -1158,7 +1178,7 @@ func (m *Manager) makeLoxiLoadBalancerModel(lbArgs *LbArgs, svc *corev1.Service,
loxiEndpointModelList = append(loxiEndpointModelList, api.LoadBalancerEndpoint{
EndpointIP: endpoint,
TargetPort: tport,
Weight: weight,
Weight: 1,
})
}
}
Expand Down Expand Up @@ -1592,11 +1612,7 @@ loop:
for _, value := range m.lbCache {
for _, lb := range value.LbModelList {
for retry := 0; retry < 5; retry++ {
err := func(lbModel *api.LoadBalancerModel) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return aliveClient.LoadBalancer().Create(ctx, lbModel)
}(&lb.LbModel)
err := m.installLB(aliveClient, lb, value.PrefLocal)
if err == nil {
klog.Infof("reinstallLoxiLbRules: lbModel: %v success", lb)
isSuccess = true
Expand Down

0 comments on commit 02512fe

Please sign in to comment.