diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 87aeca3fd2fb..ad89d6af1388 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -904,6 +904,12 @@ const ( // Allows kube-proxy to run in Overlay mode for Windows WinOverlay featuregate.Feature = "WinOverlay" + // owner: @princepereira + // alpha: v1.27 + // + // Allows windows kube-proxy to switch between old and new windows networking stack + WinProxyEbpfMode featuregate.Feature = "WinProxyEbpfMode" + // owner: @marosset // kep: https://kep.k8s.io/3503 // alpha: v1.26 @@ -1234,6 +1240,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS WinOverlay: {Default: true, PreRelease: featuregate.Beta}, + WinProxyEbpfMode: {Default: false, PreRelease: featuregate.Alpha}, + WindowsHostNetwork: {Default: true, PreRelease: featuregate.Alpha}, NodeInclusionPolicyInPodTopologySpread: {Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/proxy/winkernel/hcnutils.go b/pkg/proxy/winkernel/hcnutils.go index ec6cf81651da..6070c1aabce2 100644 --- a/pkg/proxy/winkernel/hcnutils.go +++ b/pkg/proxy/winkernel/hcnutils.go @@ -41,6 +41,7 @@ type HcnService interface { ListLoadBalancers() ([]hcn.HostComputeLoadBalancer, error) GetLoadBalancerByID(loadBalancerId string) (*hcn.HostComputeLoadBalancer, error) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) (*hcn.HostComputeLoadBalancer, error) + UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error // Features functions GetSupportedFeatures() hcn.SupportedFeatures @@ -104,6 +105,10 @@ func (hcnObj hcnImpl) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc return loadBalancer.Create() } +func (hcnObj hcnImpl) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) { + return loadBalancer.Update(hnsLbID) +} + func (hcnObj hcnImpl) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error { return loadBalancer.Delete() } diff --git a/pkg/proxy/winkernel/hns.go b/pkg/proxy/winkernel/hns.go index b11eb7cacd4e..7bf0c964ef88 100644 --- a/pkg/proxy/winkernel/hns.go +++ b/pkg/proxy/winkernel/hns.go @@ -38,8 +38,9 @@ type HostNetworkService interface { getEndpointByName(id string) (*endpointInfo, error) createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error) deleteEndpoint(hnsID string) error - getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) + getOrCreateLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) + updateLoadBalancer(hnsLbID string, sourceVip, vip string, endpoints []endpointInfo, flags loadBalancerFlags, protocol, internalPort, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) deleteLoadBalancer(hnsID string) error } @@ -54,6 +55,33 @@ var ( LoadBalancerPortMappingFlagsVipExternalIP hcn.LoadBalancerPortMappingFlags = 16 ) +func getLoadBalancerPolicyFlags(flags loadBalancerFlags) (lbPortMappingFlags hcn.LoadBalancerPortMappingFlags, lbFlags hcn.LoadBalancerFlags) { + lbPortMappingFlags = hcn.LoadBalancerPortMappingFlagsNone + if flags.isILB { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB + } + if flags.useMUX { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux + } + if flags.preserveDIP { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP + } + if flags.localRoutedVIP { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP + } + if flags.isVipExternalIP { + lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP + } + lbFlags = hcn.LoadBalancerFlagsNone + if flags.isDSR { + lbFlags |= hcn.LoadBalancerFlagsDSR + } + if flags.isIPv6 { + lbFlags |= LoadBalancerFlagsIPv6 + } + return +} + func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) { hnsnetwork, err := hns.hcn.GetNetworkByName(name) if err != nil { @@ -315,7 +343,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn return loadBalancers, nil } -func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { +func (hns hns) getOrCreateLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { var id loadBalancerIdentifier vips := []string{} // Compute hash from backends (endpoint IDs) @@ -336,32 +364,84 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags return lb, nil } - lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone - if flags.isILB { - lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB + lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags) + + lbDistributionType := hcn.LoadBalancerDistributionNone + + if flags.sessionAffinity { + lbDistributionType = hcn.LoadBalancerDistributionSourceIP } - if flags.useMUX { - lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux + + loadBalancer := &hcn.HostComputeLoadBalancer{ + SourceVIP: sourceVip, + PortMappings: []hcn.LoadBalancerPortMapping{ + { + Protocol: uint32(protocol), + InternalPort: internalPort, + ExternalPort: externalPort, + DistributionType: lbDistributionType, + Flags: lbPortMappingFlags, + }, + }, + FrontendVIPs: vips, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + Flags: lbFlags, } - if flags.preserveDIP { - lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP + + for _, ep := range endpoints { + loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID) } - if flags.localRoutedVIP { - lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP + + lb, err := hns.hcn.CreateLoadBalancer(loadBalancer) + + if err != nil { + return nil, err } - if flags.isVipExternalIP { - lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP + + klog.V(1).InfoS("Created Hns loadbalancer policy resource", "loadBalancer", lb) + lbInfo := &loadBalancerInfo{ + hnsID: lb.Id, } + // Add to map of load balancers + previousLoadBalancers[id] = lbInfo + return lbInfo, err +} - lbFlags := hcn.LoadBalancerFlagsNone - if flags.isDSR { - lbFlags |= hcn.LoadBalancerFlagsDSR +func (hns hns) updateLoadBalancer(hnsLbID string, + sourceVip, + vip string, + endpoints []endpointInfo, + flags loadBalancerFlags, + protocol, + internalPort, + externalPort uint16, + previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { + + var id loadBalancerIdentifier + vips := []string{} + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(endpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) + return nil, err + } + if len(vip) > 0 { + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash} + vips = append(vips, vip) + } else { + id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash} } - if flags.isIPv6 { - lbFlags |= LoadBalancerFlagsIPv6 + if lb, found := previousLoadBalancers[id]; found { + klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb) + return lb, nil } + lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags) + lbDistributionType := hcn.LoadBalancerDistributionNone if flags.sessionAffinity { @@ -391,13 +471,16 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID) } - lb, err := hns.hcn.CreateLoadBalancer(loadBalancer) + klog.V(3).InfoS("Updating existing loadbalancer called", "hnsLbID", hnsLbID, "endpointCount", len(endpoints)) + + lb, err := hns.hcn.UpdateLoadBalancer(loadBalancer, hnsLbID) if err != nil { + klog.V(2).ErrorS(err, "Error updating existing loadbalancer", "hnsLbID", hnsLbID, "error", err, "endpoints", endpoints) return nil, err } - klog.V(1).InfoS("Created Hns loadbalancer policy resource", "loadBalancer", lb) + klog.V(1).InfoS("Update loadbalancer is successful", "loadBalancer", lb) lbInfo := &loadBalancerInfo{ hnsID: lb.Id, } diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go index 1567d467c095..a40a1166576a 100644 --- a/pkg/proxy/winkernel/hns_test.go +++ b/pkg/proxy/winkernel/hns_test.go @@ -364,7 +364,7 @@ func TestGetLoadBalancerExisting(t *testing.T) { id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsHash: hash} lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id} - lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) + lb, err := hns.getOrCreateLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) if err != nil { t.Error(err) @@ -414,7 +414,7 @@ func TestGetLoadBalancerNew(t *testing.T) { hnsID: Endpoint.Id, } endpoints := []endpointInfo{*endpoint} - lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) + lb, err := hns.getOrCreateLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs) if err != nil { t.Error(err) } diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index a09a693f5ff9..f48aedd46204 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -145,6 +145,20 @@ type remoteSubnetInfo struct { drMacAddress string } +type lbType int8 + +const ( + lbTypeClusterIP lbType = iota + lbTypeNodePort + lbTypeExternalIP + lbTypeIngressIP + lbTypeHealthCheck +) + +func (t lbType) String() string { + return [...]string{"ClusterIP", "NodePort", "ExternalIP", "IngressIP", "HealthCheck"}[t] +} + const ( NETWORK_TYPE_OVERLAY = "overlay" // MAX_COUNT_STALE_LOADBALANCERS is the maximum number of stale loadbalancers which cleanedup in single syncproxyrules. @@ -348,14 +362,30 @@ func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap prox // This will optimize remote endpoint and loadbalancer deletion based on the annotation var svcPortMap = make(map[proxy.ServicePortName]bool) var logLevel klog.Level = 5 + proxier.remoteEPsToCleanup = make(map[string]bool) + for svcPortName, eps := range oldEndpointsMap { logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps) + if proxier.winProxyEbpfMode { + for _, ep := range eps { + if !ep.IsLocal() && !proxier.remoteEPsToCleanup[ep.IP()] { + proxier.remoteEPsToCleanup[ep.IP()] = true + } + } + } svcPortMap[svcPortName] = true proxier.onEndpointsMapChange(&svcPortName, false) } for svcPortName, eps := range newEndpointsMap { logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps) + if proxier.winProxyEbpfMode { + for _, ep := range eps { + if !ep.IsLocal() && proxier.remoteEPsToCleanup[ep.IP()] { + delete(proxier.remoteEPsToCleanup, ep.IP()) + } + } + } // redundantCleanup true means cleanup is called second time on the same svcPort redundantCleanup := svcPortMap[svcPortName] proxier.onEndpointsMapChange(&svcPortName, redundantCleanup) @@ -614,7 +644,10 @@ type Proxier struct { forwardHealthCheckVip bool rootHnsEndpointName string + remoteEPsToCleanup map[string]bool // Map will maintain the list of remote endpoints to be deleted if WinProxyEbpfMode is enabled. mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration + + winProxyEbpfMode bool // The flag allows windows kube-proxy to switch between old and new windows networking stack } type localPort struct { @@ -712,6 +745,9 @@ func NewProxier( return nil, err } + winProxyEbpfModeEnabled := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinProxyEbpfMode) + klog.V(3).InfoS("WinProxyEbpfMode flag status", "Enabled", winProxyEbpfModeEnabled) + var sourceVip string var hostMac string if isOverlay(hnsNetworkInfo) { @@ -772,7 +808,10 @@ func NewProxier( healthzPort: healthzPort, rootHnsEndpointName: config.RootHnsEndpointName, forwardHealthCheckVip: config.ForwardHealthCheckVip, + remoteEPsToCleanup: make(map[string]bool), mapStaleLoadbalancers: make(map[string]bool), + + winProxyEbpfMode: winProxyEbpfModeEnabled, } serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) @@ -1167,6 +1206,9 @@ func (proxier *Proxier) syncProxyRules() { } klog.V(3).InfoS("Syncing Policies") + if proxier.winProxyEbpfMode { + defer proxier.deleteUnusedRemoteEndpoints(queriedEndpoints) + } // Program HNS by adding corresponding policies for each service. for svcName, svc := range proxier.svcPortMap { @@ -1368,7 +1410,7 @@ func (proxier *Proxier) syncProxyRules() { if len(svcInfo.hnsID) > 0 { // This should not happen - klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID) + klog.InfoS("Load Balancer already exists", "hnsID", svcInfo.hnsID) } // In ETP:Cluster, if all endpoints are under termination, @@ -1384,19 +1426,13 @@ func (proxier *Proxier) syncProxyRules() { } klog.V(4).InfoS("Trying to apply Policies for service", "serviceInfo", svcInfo) - var hnsLoadBalancer *loadBalancerInfo + var sourceVip = proxier.sourceVip if containsPublicIP || containsNodeIP { sourceVip = proxier.nodeIP.String() } - sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP - if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity { - klog.InfoS("Session Affinity is not supported on this version of Windows") - } - endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) // clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer. clusterIPEndpoints := hnsEndpoints @@ -1405,30 +1441,20 @@ func (proxier *Proxier) syncProxyRules() { clusterIPEndpoints = hnsLocalEndpoints } - if len(clusterIPEndpoints) > 0 { + _, err := proxier.configureLoadbalancer( + hns, + lbTypeClusterIP, + &svcInfo.hnsID, + svcInfo, + sourceVip, + svcInfo.ClusterIP().String(), + clusterIPEndpoints, + endpointsAvailableForLB, + queriedLoadBalancers) - // If all endpoints are terminating, then no need to create Cluster IP LoadBalancer - // Cluster IP LoadBalancer creation - hnsLoadBalancer, err := hns.getLoadBalancer( - clusterIPEndpoints, - loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, - sourceVip, - svcInfo.ClusterIP().String(), - Enum(svcInfo.Protocol()), - uint16(svcInfo.targetPort), - uint16(svcInfo.Port()), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue - } - - svcInfo.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) - - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + if err != nil { + klog.ErrorS(err, "ClusterIP policy creation failed") + continue } // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints @@ -1440,29 +1466,20 @@ func (proxier *Proxier) syncProxyRules() { nodePortEndpoints = hnsLocalEndpoints } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers) - - if len(nodePortEndpoints) > 0 && endpointsAvailableForLB { - // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer - hnsLoadBalancer, err := hns.getLoadBalancer( - nodePortEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, - sourceVip, - "", - Enum(svcInfo.Protocol()), - uint16(svcInfo.targetPort), - uint16(svcInfo.NodePort()), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue - } + _, err := proxier.configureLoadbalancer( + hns, + lbTypeNodePort, + &svcInfo.nodePorthnsID, + svcInfo, + sourceVip, + "", + nodePortEndpoints, + endpointsAvailableForLB, + queriedLoadBalancers) - svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + if err != nil { + klog.ErrorS(err, "NodePort policy creation failed") + continue } } @@ -1474,29 +1491,20 @@ func (proxier *Proxier) syncProxyRules() { externalIPEndpoints = hnsLocalEndpoints } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) + _, err := proxier.configureLoadbalancer( + hns, + lbTypeExternalIP, + &externalIP.hnsID, + svcInfo, + sourceVip, + externalIP.ip, + externalIPEndpoints, + endpointsAvailableForLB, + queriedLoadBalancers) - if len(externalIPEndpoints) > 0 && endpointsAvailableForLB { - // If all endpoints are in terminating stage, then no need to External IP LoadBalancer - // Try loading existing policies, if already available - hnsLoadBalancer, err = hns.getLoadBalancer( - externalIPEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, - sourceVip, - externalIP.ip, - Enum(svcInfo.Protocol()), - uint16(svcInfo.targetPort), - uint16(svcInfo.Port()), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue - } - externalIP.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating) + if err != nil { + klog.ErrorS(err, "ExternalIP policy creation failed") + continue } } // Create a Load Balancer Policy for each loadbalancer ingress @@ -1507,55 +1515,41 @@ func (proxier *Proxier) syncProxyRules() { lbIngressEndpoints = hnsLocalEndpoints } - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) + _, err := proxier.configureLoadbalancer( + hns, + lbTypeIngressIP, + &lbIngressIP.hnsID, + svcInfo, + sourceVip, + lbIngressIP.ip, + lbIngressEndpoints, + endpointsAvailableForLB, + queriedLoadBalancers) - if len(lbIngressEndpoints) > 0 { - hnsLoadBalancer, err := hns.getLoadBalancer( - lbIngressEndpoints, - loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, - sourceVip, - lbIngressIP.ip, - Enum(svcInfo.Protocol()), - uint16(svcInfo.targetPort), - uint16(svcInfo.Port()), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue - } - lbIngressIP.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) - } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) + if err != nil { + klog.ErrorS(err, "IngressIP policy creation failed") + continue } if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB { // Avoid creating health check loadbalancer if all the endpoints are terminating - nodeport := proxier.healthzPort - if svcInfo.HealthCheckNodePort() != 0 { - nodeport = svcInfo.HealthCheckNodePort() - } - - proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers) - - hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( - []endpointInfo{*gatewayHnsendpoint}, - loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, + _, err := proxier.configureLoadbalancer( + hns, + lbTypeHealthCheck, + &lbIngressIP.healthCheckHnsID, + svcInfo, sourceVip, lbIngressIP.ip, - Enum(svcInfo.Protocol()), - uint16(nodeport), - uint16(nodeport), - queriedLoadBalancers, - ) + []endpointInfo{*gatewayHnsendpoint}, + endpointsAvailableForLB, + queriedLoadBalancers) + if err != nil { - klog.ErrorS(err, "Policy creation failed") + klog.ErrorS(err, "Health check policy creation failed") continue } - lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID - klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP) } else { + lbIngressIP.healthCheckHnsID = "" klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating) } } @@ -1585,25 +1579,57 @@ func (proxier *Proxier) syncProxyRules() { klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP) } - // remove stale endpoint refcount entries - for hnsID, referenceCount := range proxier.endPointsRefCount { - if *referenceCount <= 0 { - klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID) - proxier.hns.deleteEndpoint(hnsID) - delete(proxier.endPointsRefCount, hnsID) + if !proxier.winProxyEbpfMode { + // remove stale endpoint refcount entries + for hnsID, referenceCount := range proxier.endPointsRefCount { + if *referenceCount <= 0 { + klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID) + proxier.hns.deleteEndpoint(hnsID) + delete(proxier.endPointsRefCount, hnsID) + } } } + // This will cleanup stale load balancers which are pending delete // in last iteration proxier.cleanupStaleLoadbalancers() } -// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. -// If it is needed, the function will delete the existing loadbalancer and return true, else false. -func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { +func (proxier *Proxier) deleteUnusedRemoteEndpoints(queriedEndpoints map[string]*endpointInfo) { + klog.V(3).InfoS("Unused endpoints to be deleted", "remoteEPsToCleanup", proxier.remoteEPsToCleanup, "queriedEndpoints", queriedEndpoints) + for epIP, _ := range proxier.remoteEPsToCleanup { + if epToDelete := queriedEndpoints[epIP]; epToDelete != nil && epToDelete.hnsID != "" { + klog.V(3).InfoS("deleting remote endpoint", "IP", epIP, "hnsID", epToDelete.hnsID) + proxier.hns.deleteEndpoint(epToDelete.hnsID) + } + } +} + +func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool { + klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID) + if err := hns.deleteLoadBalancer(*lbHnsID); err != nil { + // This will be cleanup by cleanupStaleLoadbalancer fnction. + proxier.mapStaleLoadbalancers[*lbHnsID] = true + } + *lbHnsID = "" + return true +} - if !winProxyOptimization || *lbHnsID == "" { - // Loadbalancer delete not needed +// needsPolicyDeletion function returns true if the loadbalancer policy delete is +// is required. +func (proxier *Proxier) needsPolicyDeletion( + winProxyOptimization bool, + lbHnsID *string, + sourceVip string, + protocol, + intPort, + extPort uint16, + endpoints []endpointInfo, + queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { + + if !winProxyOptimization { + // Loadbalancer delete not needed. In this case, the delete loadbalancer will already been handled + // at the beginning of syncProxyRules function. return false } @@ -1616,7 +1642,7 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr ) if lbIdErr != nil { - return proxier.deleteLoadBalancer(hns, lbHnsID) + return true } if _, ok := queriedLoadBalancers[lbID]; ok { @@ -1624,15 +1650,139 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr return false } - return proxier.deleteLoadBalancer(hns, lbHnsID) + return true } -func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool { - klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID) - if err := hns.deleteLoadBalancer(*lbHnsID); err != nil { - // This will be cleanup by cleanupStaleLoadbalancer fnction. - proxier.mapStaleLoadbalancers[*lbHnsID] = true +// configureLoadbalancer takes care of deleting and creating lb policy in old hns architecture and update policy in +// new architecture. If there are no backend endpoints, then code will only delete existing loadbalancer. +// If the call is for the first time where there is no policy for the vip (*lbHnsID == ""), then it does only create policy. +func (proxier *Proxier) configureLoadbalancer( + hns HostNetworkService, + lbType lbType, + lbHnsID *string, + svcInfo *serviceInfo, + sourceVip, + vip string, + hnsEndpoints []endpointInfo, + endpointsAvailableForLB bool, + queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (hnsLoadbalancer *loadBalancerInfo, hnsErr error) { + + var extPort uint16 + var lbFlags loadBalancerFlags + + protocol, targetPort, extPort := Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()) + needsPolicyCreation := len(hnsEndpoints) != 0 && endpointsAvailableForLB + + sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP + if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity { + klog.InfoS("Session Affinity is not supported on this version of Windows") } - *lbHnsID = "" - return true + + switch lbType { + + case lbTypeClusterIP: + lbFlags = loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP} + needsPolicyCreation = len(hnsEndpoints) != 0 + + case lbTypeNodePort: + lbFlags = loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol} + extPort = uint16(svcInfo.NodePort()) + + case lbTypeExternalIP: + lbFlags = loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol} + + case lbTypeIngressIP: + lbFlags = loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol} + + case lbTypeHealthCheck: + lbFlags = loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP} + nodePort := proxier.healthzPort + if svcInfo.HealthCheckNodePort() != 0 { + nodePort = svcInfo.HealthCheckNodePort() + } + targetPort, extPort = uint16(nodePort), uint16(nodePort) + + } + + needsPolicyDeletion := proxier.needsPolicyDeletion( + svcInfo.winProxyOptimization, + lbHnsID, + sourceVip, + protocol, + targetPort, + extPort, + hnsEndpoints, + queriedLoadBalancers) + + if !needsPolicyCreation && !needsPolicyDeletion { + klog.V(5).InfoS("No need of loadbalancer policy updation.", "lbHnsID", *lbHnsID, "sourceVip", "vip", vip, sourceVip, "port", targetPort, "lbType", lbType) + return nil, nil + } + + defer func() { + *lbHnsID = "" + if hnsLoadbalancer != nil { + *lbHnsID = hnsLoadbalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer policy updated.", "sourceVip", sourceVip, "vip", vip, "protocol", protocol, "targetPort", targetPort, "endpointCount", len(hnsEndpoints), "lbHnsID", *lbHnsID, "lbType", lbType, "winProxyEbpfMode", proxier.winProxyEbpfMode) + } else if hnsErr == nil { + klog.V(3).InfoS("Skipped Hns LoadBalancer policy creation.", "sourceVip", sourceVip, "vip", vip, "protocol", protocol, "targetPort", targetPort, "endpointCount", len(hnsEndpoints), "lbType", lbType, "winProxyEbpfMode", proxier.winProxyEbpfMode) + } + }() + + if !needsPolicyCreation { + if *lbHnsID == "" { + klog.V(5).InfoS("Loadbalancer policy creation skipped.", "lbHnsID", *lbHnsID, "sourceVip", "vip", vip, sourceVip, "port", targetPort, "lbType", lbType) + } else { + klog.V(3).InfoS("Deleting loadbalancer as 0 endpoints for policy creation.", "lbHnsID", *lbHnsID, "sourceVip", sourceVip, "vip", vip, "port", targetPort, "lbType", lbType) + proxier.deleteLoadBalancer(hns, lbHnsID) + } + return nil, nil + } + + if *lbHnsID == "" { + klog.V(3).InfoS("Creating new loadbalancer.", "sourceVip", sourceVip, "vip", vip, "port", targetPort, "lbType", lbType) + hnsLoadbalancer, hnsErr = hns.getOrCreateLoadBalancer( + hnsEndpoints, + lbFlags, + sourceVip, + vip, + protocol, + targetPort, + extPort, + queriedLoadBalancers, + ) + return + } + + if proxier.winProxyEbpfMode { + klog.V(3).InfoS("Updating loadbalancer called.", "lbHnsID", *lbHnsID, "sourceVip", sourceVip, "vip", vip, "port", targetPort, "lbType", lbType) + hnsLoadbalancer, hnsErr = hns.updateLoadBalancer( + *lbHnsID, + sourceVip, + vip, + hnsEndpoints, + lbFlags, + protocol, + targetPort, + extPort, + queriedLoadBalancers, + ) + return + } + + klog.V(3).InfoS("Create and delete loadbalancer called.", "lbHnsID", *lbHnsID, "sourceVip", sourceVip, "vip", vip, "port", targetPort, "lbType", lbType) + // Updating LoadBalancer is only supported in eBPF mode while calling update method. In non eBPF mode update loadbalancer is supported through + // create and delete loadbalancer. + proxier.deleteLoadBalancer(hns, lbHnsID) + hnsLoadbalancer, hnsErr = hns.getOrCreateLoadBalancer( + hnsEndpoints, + lbFlags, + sourceVip, + vip, + protocol, + targetPort, + extPort, + queriedLoadBalancers, + ) + return } diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 964ca5428350..eb87f619d9bb 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -682,6 +682,291 @@ func TestCreateLoadBalancer(t *testing.T) { } } +func TestUpdateLoadBalancerInEbpfMode(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + if proxier == nil { + t.Error() + } + + proxier.winProxyEbpfMode = true + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + populateEndpointSlices(proxier, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + } + + proxier.setInitialized(false) + + proxier.OnEndpointSliceUpdate( + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epPaAddress}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + })) + + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.mu.Unlock() + + proxier.setInitialized(true) + + epObj, err := proxier.hcn.GetEndpointByID("EPID-3") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + proxier.syncProxyRules() + + // The endpoint should be deleted as it is not present in the new endpoint slice + epObj, err = proxier.hcn.GetEndpointByID("EPID-3") + if err == nil || epObj != nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointInfo) + + epObj, err = proxier.hcn.GetEndpointByID("EPID-5") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-5") + } + + if !ok { + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) + + } else { + if epInfo.hnsID != "EPID-5" { + t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5") + } + } + + if *epInfo.refCount != 1 { + t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) + } + + if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) + } + + svc = proxier.svcPortMap[svcPortName] + svcInfo, ok = svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + // Loadbalancer id should not change after the update + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + } + +} + +func TestUpdateLoadBalancerInNonEbpfMode(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) + if proxier == nil { + t.Error() + } + + // By default the value is false, for the readibility of the test case setting it to false again + proxier.winProxyEbpfMode = false + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + Protocol: v1.ProtocolTCP, + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + populateEndpointSlices(proxier, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + ) + + proxier.setInitialized(true) + proxier.syncProxyRules() + + svc := proxier.svcPortMap[svcPortName] + svcInfo, ok := svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + if svcInfo.hnsID != loadbalancerGuid1 { + t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) + } + } + + proxier.setInitialized(false) + + proxier.OnEndpointSliceUpdate( + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIpAddressRemote}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + }), + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epPaAddress}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: ptr.To(svcPortName.Port), + Port: ptr.To(int32(svcPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }} + })) + + proxier.mu.Lock() + proxier.endpointSlicesSynced = true + proxier.mu.Unlock() + + proxier.setInitialized(true) + + epObj, err := proxier.hcn.GetEndpointByID("EPID-3") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + proxier.syncProxyRules() + + // The endpoint should be deleted as it is not present in the new endpoint slice + epObj, err = proxier.hcn.GetEndpointByID("EPID-3") + if err == nil || epObj != nil { + t.Errorf("Failed to fetch endpoint: EPID-3") + } + + ep := proxier.endpointsMap[svcPortName][0] + epInfo, ok := ep.(*endpointInfo) + + epObj, err = proxier.hcn.GetEndpointByID("EPID-5") + if err != nil || epObj == nil { + t.Errorf("Failed to fetch endpoint: EPID-5") + } + + if !ok { + t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) + + } else { + if epInfo.hnsID != "EPID-5" { + t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5") + } + } + + if *epInfo.refCount != 1 { + t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) + } + + if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount { + t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) + } + + svc = proxier.svcPortMap[svcPortName] + svcInfo, ok = svc.(*serviceInfo) + if !ok { + t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) + + } else { + // Loadbalancer id should change after the update + if svcInfo.hnsID != "LBID-3" { + t.Errorf("%v does not match %v", svcInfo.hnsID, "LBID-3") + } + } + +} + func TestCreateDsrLoadBalancer(t *testing.T) { syncPeriod := 30 * time.Second proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) diff --git a/pkg/proxy/winkernel/testing/hcnutils_mock.go b/pkg/proxy/winkernel/testing/hcnutils_mock.go index 319f2e10c474..a626efb274ff 100644 --- a/pkg/proxy/winkernel/testing/hcnutils_mock.go +++ b/pkg/proxy/winkernel/testing/hcnutils_mock.go @@ -183,6 +183,15 @@ func (hcnObj HcnMock) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc return loadBalancer, nil } +func (hcnObj HcnMock) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) { + if _, ok := loadbalancerMap[hnsLbID]; !ok { + return nil, fmt.Errorf("LoadBalancer id %s Not Present", loadBalancer.Id) + } + loadBalancer.Id = hnsLbID + loadbalancerMap[hnsLbID] = loadBalancer + return loadBalancer, nil +} + func (hcnObj HcnMock) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error { if _, ok := loadbalancerMap[loadBalancer.Id]; !ok { return hcn.LoadBalancerNotFoundError{LoadBalancerId: loadBalancer.Id}