diff --git a/pkg/agent/manager/loadbalancer/loadbalancer.go b/pkg/agent/manager/loadbalancer/loadbalancer.go index 061bfa8..73a88b8 100644 --- a/pkg/agent/manager/loadbalancer/loadbalancer.go +++ b/pkg/agent/manager/loadbalancer/loadbalancer.go @@ -104,25 +104,32 @@ type LbArgs struct { } type LbModelEnt struct { - inRange bool - staticIP bool - IdentIPAM string - LbModel api.LoadBalancerModel + LbModel api.LoadBalancerModel +} + +type LbServicePairEntry struct { + ExternalIP string + Port uint16 + Protocol string + StaticIP bool + InRange bool + IdentIPAM string + LbModelList []api.LoadBalancerModel } type LbCacheEntry struct { - LbMode int - Timeout int - ActCheck bool - PrefLocal bool - Addr string - State string - ProbeType string - ProbePort uint16 - ProbeReq string - ProbeResp string - SecIPs []string - LbModelList []LbModelEnt + LbMode int + Timeout int + ActCheck bool + PrefLocal bool + Addr string + State string + ProbeType string + ProbePort uint16 + ProbeReq string + ProbeResp string + SecIPs []string + LbServicePairs map[string]*LbServicePairEntry } type LbCacheTable map[string]*LbCacheEntry @@ -139,6 +146,7 @@ type SvcPair struct { InRange bool StaticIP bool IdentIPAM string + IPAllocd bool K8sSvcPort corev1.ServicePort } @@ -147,6 +155,11 @@ func GenKey(ns, name string) string { return path.Join(ns, name) } +// GenSPKey generate key for cache +func GenSPKey(IPString string, Port uint16, Protocol string) string { + return fmt.Sprintf("%s:%v:%s", IPString, Port, Protocol) +} + // Create and Init Manager. // Manager is called by kube-loxilb when k8s service is created & updated. func NewLoadBalancerManager( @@ -442,7 +455,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } cacheKey := GenKey(svc.Namespace, svc.Name) - _, added := m.lbCache[cacheKey] + lbCacheEntry, added := m.lbCache[cacheKey] if !added { if len(endpointIPs) <= 0 { return errors.New("no active endpoints") @@ -450,18 +463,21 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { //c.lbCache[cacheKey] = make([]api.LoadBalancerModel, 0) m.lbCache[cacheKey] = &LbCacheEntry{ - LbMode: lbMode, - ActCheck: livenessCheck, - PrefLocal: prefLocal, - Timeout: timeout, - State: "Added", - ProbeType: probeType, - ProbePort: uint16(probePort), - ProbeReq: probeReq, - ProbeResp: probeResp, - Addr: addrType, - SecIPs: []string{}, - } + LbMode: lbMode, + ActCheck: livenessCheck, + PrefLocal: prefLocal, + Timeout: timeout, + State: "Added", + ProbeType: probeType, + ProbePort: uint16(probePort), + ProbeReq: probeReq, + ProbeResp: probeResp, + Addr: addrType, + SecIPs: []string{}, + LbServicePairs: make(map[string]*LbServicePairEntry), + } + lbCacheEntry = m.lbCache[cacheKey] + klog.Infof("New LbCache %s Added", cacheKey) } retIPAMOnErr := false @@ -469,7 +485,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { oldsvc := svc.DeepCopy() // Check if service has ingress IP already allocated - ingSvcPairs, err, hasExistingEIP := m.getIngressSvcPairs(svc, addrType) + ingSvcPairs, err, hasExistingEIP := m.getIngressSvcPairs(svc, addrType, lbCacheEntry) if err != nil { if !hasExistingEIP { @@ -488,7 +504,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } klog.Infof("deallocateOnFailure defer function called") for i, sp := range ingSvcPairs { - if sp.InRange { + if sp.InRange && sp.IPAllocd { klog.Infof("Returning ip %s to free pool", sp.IPString) ipPool.ReturnIPAddr(sp.IPString, sp.IdentIPAM) } @@ -509,73 +525,87 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } update := false - delete := false - if len(m.lbCache[cacheKey].LbModelList) <= 0 { + needDelete := false + if len(m.lbCache[cacheKey].LbServicePairs) <= 0 { update = true + } else { + for _, lbServPair := range m.lbCache[cacheKey].LbServicePairs { + if len(lbServPair.LbModelList) <= 0 { + update = true + } + } } if addrType != m.lbCache[cacheKey].Addr { m.lbCache[cacheKey].Addr = addrType update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: addr-type update", cacheKey) } if timeout != m.lbCache[cacheKey].Timeout { m.lbCache[cacheKey].Timeout = timeout update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: Timeout update", cacheKey) } if livenessCheck != m.lbCache[cacheKey].ActCheck { m.lbCache[cacheKey].ActCheck = livenessCheck update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: Liveness update", cacheKey) } if lbMode != m.lbCache[cacheKey].LbMode { m.lbCache[cacheKey].LbMode = lbMode update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: LbMode update", cacheKey) } if probeType != m.lbCache[cacheKey].ProbeType { m.lbCache[cacheKey].ProbeType = probeType update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: ProbeType update", cacheKey) } if probePort != int(m.lbCache[cacheKey].ProbePort) { m.lbCache[cacheKey].ProbePort = uint16(probePort) update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: ProbePort update", cacheKey) } if probeReq != m.lbCache[cacheKey].ProbeReq { m.lbCache[cacheKey].ProbeReq = probeReq update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: ProbeReq update", cacheKey) } if probeResp != m.lbCache[cacheKey].ProbeResp { m.lbCache[cacheKey].ProbeResp = probeResp update = true if added { - delete = true + needDelete = true } + klog.Infof("%s: ProbeResp update", cacheKey) } // If the user specifies a secondary IP in the annotation, update the existing secondary IP. @@ -598,8 +628,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } for idx, ingSecIP := range m.lbCache[cacheKey].SecIPs { if idx < len(sipPools) { - for _, lb := range m.lbCache[cacheKey].LbModelList { - sipPools[idx].ReturnIPAddr(ingSecIP, lb.IdentIPAM) + for _, sp := range m.lbCache[cacheKey].LbServicePairs { + sipPools[idx].ReturnIPAddr(ingSecIP, sp.IdentIPAM) } } } @@ -612,27 +642,32 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } // Update endpoint list if the list has changed - for _, lb := range m.lbCache[cacheKey].LbModelList { - if len(endpointIPs) == len(lb.LbModel.Endpoints) { - nEps := 0 - for _, ep := range endpointIPs { - found := false - for _, oldEp := range lb.LbModel.Endpoints { - if ep == oldEp.EndpointIP { - found = true - nEps++ + for _, sp := range m.lbCache[cacheKey].LbServicePairs { + for _, lb := range sp.LbModelList { + if len(endpointIPs) == len(lb.Endpoints) { + nEps := 0 + for _, ep := range endpointIPs { + found := false + for _, oldEp := range lb.Endpoints { + if ep == oldEp.EndpointIP { + found = true + nEps++ + break + } + } + if !found { break } } - if !found { - break + if nEps != len(endpointIPs) { + update = true } - } - if nEps != len(endpointIPs) { + } else { update = true } - } else { - update = true + } + if update { + klog.Infof("%s: Endpoint update", cacheKey) } } @@ -643,34 +678,35 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { ingSvcPairs = nil return nil } else { - if delete { + if needDelete { m.deleteLoadBalancer(svc.Namespace, svc.Name) } if added { - for _, lb := range m.lbCache[cacheKey].LbModelList { + for _, sp := range m.lbCache[cacheKey].LbServicePairs { for idx := range ingSvcPairs { ingSvcPair := &ingSvcPairs[idx] - if ingSvcPair.IPString == lb.LbModel.Service.ExternalIP && - ingSvcPair.Port == int32(lb.LbModel.Service.Port) && - ingSvcPair.Protocol == lb.LbModel.Service.Protocol { - ingSvcPair.InRange = lb.inRange - ingSvcPair.StaticIP = lb.staticIP - ingSvcPair.IdentIPAM = lb.IdentIPAM + if ingSvcPair.IPString == sp.ExternalIP && + ingSvcPair.Port == int32(sp.Port) && + ingSvcPair.Protocol == sp.Protocol { + ingSvcPair.InRange = sp.InRange + ingSvcPair.StaticIP = sp.StaticIP + ingSvcPair.IdentIPAM = sp.IdentIPAM } } + delete(m.lbCache[cacheKey].LbServicePairs, GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol)) } + m.lbCache[cacheKey].LbServicePairs = make(map[string]*LbServicePairEntry) } - m.lbCache[cacheKey].LbModelList = nil if !hasExistingEIP { svc.Status.LoadBalancer.Ingress = nil } + klog.Infof("%s: Added(%v) Update(%v) needDelete(%v)", cacheKey, added, update, needDelete) klog.Infof("Endpoint IP Pairs %v", endpointIPs) klog.Infof("Secondary IP Pairs %v", m.lbCache[cacheKey].SecIPs) } for _, ingSvcPair := range ingSvcPairs { var errChList []chan error - var lbModelList []LbModelEnt lbArgs := LbArgs{ externalIP: ingSvcPair.IPString, livenessCheck: m.lbCache[cacheKey].ActCheck, @@ -685,20 +721,25 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { lbArgs.secIPs = append(lbArgs.secIPs, m.lbCache[cacheKey].SecIPs...) lbArgs.endpointIPs = append(lbArgs.endpointIPs, endpointIPs...) + sp := LbServicePairEntry{ + ExternalIP: ingSvcPair.IPString, + Port: uint16(ingSvcPair.Port), + Protocol: ingSvcPair.Protocol, + StaticIP: ingSvcPair.StaticIP, + InRange: ingSvcPair.InRange, + IdentIPAM: ingSvcPair.IdentIPAM, + } + lbModel, err := m.makeLoxiLoadBalancerModel(&lbArgs, svc, ingSvcPair.K8sSvcPort) if err != nil { retIPAMOnErr = true return err } - lbModelList = append(lbModelList, LbModelEnt{ingSvcPair.InRange, ingSvcPair.StaticIP, ingSvcPair.IdentIPAM, lbModel}) for _, client := range m.LoxiClients { ch := make(chan error) go func(c *api.LoxiClient, h chan error) { - var err error - for _, lb := range lbModelList { - err = m.installLB(c, lb, m.lbCache[cacheKey].PrefLocal) - } + err := m.installLB(c, lbModel, m.lbCache[cacheKey].PrefLocal) h <- err }(client, ch) @@ -714,16 +755,17 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error { } if isError { retIPAMOnErr = isError - klog.Errorf("failed to add load-balancer") - return fmt.Errorf("failed to add loxiLB loadBalancer") + klog.Errorf("failed to add load-balancer - spair(%s)", GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol)) + return fmt.Errorf("failed to add loxiLB loadBalancer - spair(%s)", GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol)) } - m.lbCache[cacheKey].LbModelList = append(m.lbCache[cacheKey].LbModelList, lbModelList...) + + sp.LbModelList = append(sp.LbModelList, lbModel) + m.lbCache[cacheKey].LbServicePairs[GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol)] = &sp if ingSvcPair.InRange || ingSvcPair.StaticIP { retIngress := corev1.LoadBalancerIngress{Hostname: "llb-" + ingSvcPair.IPString} //retIngress.Ports = append(retIngress.Ports, corev1.PortStatus{Port: ingSvcPair.Port, Protocol: corev1.Protocol(strings.ToUpper(ingSvcPair.Protocol))}) svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, retIngress) } - klog.Infof("load-balancer (%v) added", lbModelList) } // Update service.Status.LoadBalancer.Ingress @@ -759,16 +801,18 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error { sipPools = m.ExtSecondaryIP6Pools } - for _, lb := range lbEntry.LbModelList { + for _, sp := range lbEntry.LbServicePairs { var errChList []chan error - for _, loxiClient := range m.LoxiClients { - ch := make(chan error) - errChList = append(errChList, ch) + for _, lb := range sp.LbModelList { + for _, loxiClient := range m.LoxiClients { + ch := make(chan error) + errChList = append(errChList, ch) - go func(client *api.LoxiClient, ch chan error) { - klog.Infof("called loxilb API: delete lb rule %v", lb) - ch <- client.LoadBalancer().Delete(context.Background(), &lb.LbModel) - }(loxiClient, ch) + go func(client *api.LoxiClient, ch chan error) { + klog.Infof("loxilb-lb(%s): delete lb %v", client.Host, lb) + ch <- client.LoadBalancer().Delete(context.Background(), &lb) + }(loxiClient, ch) + } } var err error @@ -783,12 +827,12 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error { if isError { return fmt.Errorf("failed to delete loxiLB LoadBalancer. err: %v", err) } - if lb.inRange { - ipPool.ReturnIPAddr(lb.LbModel.Service.ExternalIP, lb.IdentIPAM) + if sp.InRange { + ipPool.ReturnIPAddr(sp.ExternalIP, sp.IdentIPAM) } for idx, ingSecIP := range lbEntry.SecIPs { if idx < len(sipPools) { - sipPools[idx].ReturnIPAddr(ingSecIP, lb.IdentIPAM) + sipPools[idx].ReturnIPAddr(ingSecIP, sp.IdentIPAM) } } } @@ -809,18 +853,19 @@ func (m *Manager) DeleteAllLoadBalancer() { sipPools = m.ExtSecondaryIP6Pools } - for _, lb := range lbEntry.LbModelList { + for _, sp := range lbEntry.LbServicePairs { for _, loxiClient := range m.LoxiClients { - klog.Infof("called loxilb API: delete lb rule %v", lb) - loxiClient.LoadBalancer().Delete(context.Background(), &lb.LbModel) - } - - if lb.inRange { - ipPool.ReturnIPAddr(lb.LbModel.Service.ExternalIP, lb.IdentIPAM) - } - for idx, ingSecIP := range lbEntry.SecIPs { - if idx < len(sipPools) { - sipPools[idx].ReturnIPAddr(ingSecIP, lb.IdentIPAM) + for _, lb := range sp.LbModelList { + klog.Infof("loxilb(%s): deleteAll lb %v", loxiClient.Host, lb) + loxiClient.LoadBalancer().Delete(context.Background(), &lb) + } + if sp.InRange { + ipPool.ReturnIPAddr(sp.ExternalIP, sp.IdentIPAM) + } + for idx, ingSecIP := range lbEntry.SecIPs { + if idx < len(sipPools) { + sipPools[idx].ReturnIPAddr(ingSecIP, sp.IdentIPAM) + } } } } @@ -828,23 +873,23 @@ func (m *Manager) DeleteAllLoadBalancer() { m.lbCache = nil } -func (m *Manager) installLB(c *api.LoxiClient, lb LbModelEnt, prefLocal bool) error { +func (m *Manager) installLB(c *api.LoxiClient, lb api.LoadBalancerModel, prefLocal bool) error { var err error ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - Model := lb.LbModel + Model := lb model := &Model // Optimization for local Preference if prefLocal { model.Endpoints = nil - for _, ep := range lb.LbModel.Endpoints { + for _, ep := range lb.Endpoints { if ep.EndpointIP == c.Host { model.Endpoints = append(model.Endpoints, ep) } } if len(model.Endpoints) <= 0 { - model.Endpoints = lb.LbModel.Endpoints + model.Endpoints = lb.Endpoints } } if err = c.LoadBalancer().Create(ctx, model); err != nil { @@ -855,6 +900,10 @@ func (m *Manager) installLB(c *api.LoxiClient, lb LbModelEnt, prefLocal bool) er } } + if err == nil { + klog.Infof("loxilb-lb(%s): add lb %v", c.Host, lb) + } + return err } @@ -963,19 +1012,19 @@ func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair { var sp SvcPair if ingress.IP != "" { //klog.Errorf("Ingress IP %s", ingress.IP) - sp = SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false, true, "", port} + sp = SvcPair{ingress.IP, port.Port, strings.ToLower(string(port.Protocol)), false, true, "", false, port} } else if ingress.Hostname != "" { llbHost := strings.Split(ingress.Hostname, "-") if len(llbHost) != 2 { //klog.Errorf("Ingress host1 %s", llbHost[0]) if net.ParseIP(llbHost[0]) != nil { - sp = SvcPair{llbHost[0], port.Port, strings.ToLower(string(port.Protocol)), false, true, "", port} + sp = SvcPair{llbHost[0], port.Port, strings.ToLower(string(port.Protocol)), false, true, "", false, port} } } else { if llbHost[0] == "llb" { if net.ParseIP(llbHost[1]) != nil { //klog.Errorf("Ingress llb host %s", llbHost[1]) - sp = SvcPair{llbHost[1], port.Port, strings.ToLower(string(port.Protocol)), false, true, "", port} + sp = SvcPair{llbHost[1], port.Port, strings.ToLower(string(port.Protocol)), false, true, "", false, port} } } } @@ -986,7 +1035,7 @@ func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair { for _, extIP := range service.Spec.ExternalIPs { for _, port := range service.Spec.Ports { - sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false, true, "", port} + sp := SvcPair{extIP, port.Port, strings.ToLower(string(port.Protocol)), false, true, "", false, port} spairs = append(spairs, sp) } } @@ -997,7 +1046,7 @@ func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair { klog.Errorf("%s annotation has invalid IP (%s)", staticIPAnnotation, staticIPStr) } else { for _, port := range service.Spec.Ports { - sp := SvcPair{staticIPStr, port.Port, strings.ToLower(string(port.Protocol)), false, true, "", port} + sp := SvcPair{staticIPStr, port.Port, strings.ToLower(string(port.Protocol)), false, true, "", false, port} spairs = append(spairs, sp) } } @@ -1008,7 +1057,7 @@ func (m *Manager) getLBIngressSvcPairs(service *corev1.Service) []SvcPair { // getIngressSvcPairs check validation if service have ingress or externalIPs already. // If service have no ingress IP, assign new IP in IP pool -func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ([]SvcPair, error, bool) { +func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string, lbCacheEntry *LbCacheEntry) ([]SvcPair, error, bool) { var sPairs []SvcPair inSPairs := m.getLBIngressSvcPairs(service) hasExtIPAllocated := false @@ -1022,13 +1071,19 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ( // k8s service has ingress IP already if len(inSPairs) >= 1 { for _, inSPair := range inSPairs { - ident := inSPair.Port - proto := inSPair.Protocol - inRange, _, identStr := ipPool.CheckAndReserveIP(inSPair.IPString, cacheKey, uint32(ident), proto) - sp := SvcPair{inSPair.IPString, ident, inSPair.Protocol, inRange, true, identStr, inSPair.K8sSvcPort} - sPairs = append(sPairs, sp) hasExtIPAllocated = true + for _, sp := range lbCacheEntry.LbServicePairs { + if GenSPKey(inSPair.IPString, uint16(inSPair.Port), inSPair.Protocol) == GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol) { + sp := SvcPair{sp.ExternalIP, int32(sp.Port), sp.Protocol, sp.InRange, sp.StaticIP, sp.IdentIPAM, false, inSPair.K8sSvcPort} + sPairs = append(sPairs, sp) + continue + } + } + + inRange, _, identStr := ipPool.CheckAndReserveIP(inSPair.IPString, cacheKey, uint32(inSPair.Port), inSPair.Protocol) + sp := SvcPair{inSPair.IPString, inSPair.Port, inSPair.Protocol, inRange, true, identStr, true, inSPair.K8sSvcPort} + sPairs = append(sPairs, sp) } } @@ -1041,6 +1096,15 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ( for _, port := range service.Spec.Ports { proto := strings.ToLower(string(port.Protocol)) portNum := port.Port + + for _, sp := range lbCacheEntry.LbServicePairs { + if sp.Port == uint16(portNum) && proto == sp.Protocol { + sp := SvcPair{sp.ExternalIP, int32(sp.Port), sp.Protocol, sp.InRange, sp.StaticIP, sp.IdentIPAM, false, port} + sPairs = append(sPairs, sp) + continue + } + } + newIP, identIPAM = ipPool.GetNewIPAddr(cacheKey, uint32(portNum), proto) if newIP == nil { // This is a safety code in case the service has the same port. @@ -1052,7 +1116,7 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service, addrType string) ( klog.Errorf("failed to generate external IP. IP Pool is full") return nil, errors.New("failed to generate external IP. IP Pool is full"), hasExtIPAllocated } - sp = SvcPair{newIP.String(), portNum, proto, true, false, identIPAM, port} + sp = SvcPair{newIP.String(), portNum, proto, true, false, identIPAM, true, port} sPairs = append(sPairs, sp) } } @@ -1071,8 +1135,8 @@ func (m *Manager) returnSecondaryIPs(service *corev1.Service, secIPs []string, a for idx, ingSecIP := range secIPs { if idx < len(sipPools) { - for _, lb := range m.lbCache[cacheKey].LbModelList { - sipPools[idx].ReturnIPAddr(ingSecIP, lb.IdentIPAM) + for _, sp := range m.lbCache[cacheKey].LbServicePairs { + sipPools[idx].ReturnIPAddr(ingSecIP, sp.IdentIPAM) } } } @@ -1140,7 +1204,7 @@ func (m *Manager) getIngressSecSvcPairs(service *corev1.Service, numSecondary in klog.Errorf("failed to generate external secondary IP. IP Pool is full") return nil, errors.New("failed to generate external secondary IP. IP Pool is full") } - sp := SvcPair{newIP.String(), portNum, proto, true, false, identIPAM, port} + sp := SvcPair{newIP.String(), portNum, proto, true, false, identIPAM, true, port} sPairs = append(sPairs, sp) } } @@ -1401,7 +1465,7 @@ func (m *Manager) SelectLoxiLBRoles(sendSigCh bool, loxiLBSelMasterEvent chan bo if v.IsAlive { v.MasterLB = true selMaster = true - klog.Infof("loxilb-lb(%v) set-role master", v.Url) + klog.Infof("loxilb-lb(%v): set-role master", v.Host) } } if selMaster { @@ -1446,17 +1510,17 @@ loop: return lc.CIStatus().Create(ctx, cisModel) }(&cisModel) if err == nil { - klog.Infof("%v: set-role-master(%v) - OK", lc.Host, lc.MasterLB) + klog.Infof("loxilb-lb(%s): set-role-master(%v) - OK", lc.Host, lc.MasterLB) break } else { - klog.Infof("%v: set-role-master(%v) - failed(%d)", lc.Host, lc.MasterLB, retry) + klog.Infof("loxilb-lb(%s): set-role-master(%v) - failed(%d)", lc.Host, lc.MasterLB, retry) time.Sleep(1 * time.Second) } } } } case purgedClient := <-loxiPurgeCh: - klog.Infof("loxilb-client (%s) : purged", purgedClient.Host) + klog.Infof("loxilb-lb(%s): purged", purgedClient.Host) if m.networkConfig.SetBGP != 0 { deleteNeigh := func(client *api.LoxiClient, neighIP string, remoteAs int) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) @@ -1470,9 +1534,9 @@ loop: } err := deleteNeigh(otherClient, purgedClient.Host, int(m.networkConfig.SetBGP)) - klog.Infof("loxilb(%s) call delete neigh API: peer %s", otherClient.Host, purgedClient.Host) + klog.Infof("loxilb-lb(%s): delete neigh peer %s", otherClient.Host, purgedClient.Host) if err != nil { - klog.Errorf("loxilb(%s) delete neigh API return error: %v", otherClient.Host, err) + klog.Errorf("loxilb-lb(%s): delete neigh peer error: %v", otherClient.Host, err) } } } @@ -1493,10 +1557,10 @@ loop: return aliveClient.CIStatus().Create(ctx, cisModel) }(&cisModel) if err == nil { - klog.Infof("%v: set-role-master(%v) - OK", aliveClient.Host, aliveClient.MasterLB) + klog.Infof("loxilb-lb(%s): set-role-master(%v) - OK", aliveClient.Host, aliveClient.MasterLB) break } else { - klog.Infof("%v: set-role-master(%v) - failed(%d)", aliveClient.Host, aliveClient.MasterLB, retry) + klog.Infof("loxilb-lb(%s): set-role-master(%v) - failed(%d)", aliveClient.Host, aliveClient.MasterLB, retry) time.Sleep(1 * time.Second) } } @@ -1610,25 +1674,27 @@ loop: if !aliveClient.PeeringOnly { isSuccess := false for _, value := range m.lbCache { - for _, lb := range value.LbModelList { - for retry := 0; retry < 5; retry++ { - err := m.installLB(aliveClient, lb, value.PrefLocal) - if err == nil { - klog.Infof("reinstallLoxiLbRules: lbModel: %v success", lb) - isSuccess = true - break - } else { - if !strings.Contains(err.Error(), "exist") { - klog.Infof("reinstallLoxiLbRules: lbModel: %v retry(%d)", lb, retry) - time.Sleep(1 * time.Second) - } else { + for _, sp := range value.LbServicePairs { + for _, lb := range sp.LbModelList { + for retry := 0; retry < 5; retry++ { + err := m.installLB(aliveClient, lb, value.PrefLocal) + if err == nil { + klog.Infof("reinstallLoxiLbRules: lbModel: %v success", lb) isSuccess = true break + } else { + if !strings.Contains(err.Error(), "exist") { + klog.Infof("reinstallLoxiLbRules: lbModel: %v retry(%d)", lb, retry) + time.Sleep(1 * time.Second) + } else { + isSuccess = true + break + } } } - } - if !isSuccess && aliveClient.IsAlive { - klog.Exit("restart kube-loxilb") + if !isSuccess && aliveClient.IsAlive { + klog.Exit("restart kube-loxilb") + } } } } diff --git a/pkg/api/client.go b/pkg/api/client.go index a2e6ea7..75f75c5 100644 --- a/pkg/api/client.go +++ b/pkg/api/client.go @@ -74,10 +74,10 @@ func (l *LoxiClient) StartLoxiHealthCheckChan(aliveCh chan *LoxiClient, deadCh c go wait.Until(func() { if _, err := l.HealthCheck().Get(context.Background(), ""); err != nil { if l.IsAlive { - klog.Infof("LoxiHealthCheckChan: loxilb(%s) is down", l.RestClient.baseURL.String()) + klog.Infof("LoxiHealthCheckChan: loxilb-lb(%s) is down", l.Host) l.IsAlive = false if time.Duration(time.Since(l.DeadSigTs).Seconds()) >= 3 && l.MasterLB { - klog.Infof("LoxiHealthCheckChan: master down") + klog.Infof("LoxiHealthCheckChan: loxilb-lb(%s) master down", l.Host) l.DeadSigTs = time.Now() deadCh <- struct{}{} } else { @@ -86,7 +86,7 @@ func (l *LoxiClient) StartLoxiHealthCheckChan(aliveCh chan *LoxiClient, deadCh c } } else { if !l.IsAlive { - klog.Infof("LoxiHealthCheckChan: loxilb(%s) is alive", l.RestClient.baseURL.String()) + klog.Infof("LoxiHealthCheckChan: loxilb-lb(%s) is alive", l.Host) l.IsAlive = true aliveCh <- l }