Skip to content

Commit

Permalink
Adding support for WinProxyEbpfMode in windows kubeproxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
princepereira committed Apr 13, 2024
1 parent d098af3 commit d081c20
Show file tree
Hide file tree
Showing 7 changed files with 695 additions and 155 deletions.
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -904,6 +904,12 @@ const (
// Allows kube-proxy to run in Overlay mode for Windows
WinOverlay featuregate.Feature = "WinOverlay"

// owner: @princepereira
// alpha: v1.27
//
// Allows windows kube-proxy to switch between old and new windows networking stack
WinProxyEbpfMode featuregate.Feature = "WinProxyEbpfMode"

// owner: @marosset
// kep: https://kep.k8s.io/3503
// alpha: v1.26
Expand Down Expand Up @@ -1234,6 +1240,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

WinOverlay: {Default: true, PreRelease: featuregate.Beta},

WinProxyEbpfMode: {Default: false, PreRelease: featuregate.Alpha},

WindowsHostNetwork: {Default: true, PreRelease: featuregate.Alpha},

NodeInclusionPolicyInPodTopologySpread: {Default: true, PreRelease: featuregate.Beta},
Expand Down
5 changes: 5 additions & 0 deletions pkg/proxy/winkernel/hcnutils.go
Expand Up @@ -41,6 +41,7 @@ type HcnService interface {
ListLoadBalancers() ([]hcn.HostComputeLoadBalancer, error)
GetLoadBalancerByID(loadBalancerId string) (*hcn.HostComputeLoadBalancer, error)
CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) (*hcn.HostComputeLoadBalancer, error)
UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error)
DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error
// Features functions
GetSupportedFeatures() hcn.SupportedFeatures
Expand Down Expand Up @@ -104,6 +105,10 @@ func (hcnObj hcnImpl) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc
return loadBalancer.Create()
}

func (hcnObj hcnImpl) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) {
return loadBalancer.Update(hnsLbID)
}

func (hcnObj hcnImpl) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error {
return loadBalancer.Delete()
}
Expand Down
123 changes: 103 additions & 20 deletions pkg/proxy/winkernel/hns.go
Expand Up @@ -38,8 +38,9 @@ type HostNetworkService interface {
getEndpointByName(id string) (*endpointInfo, error)
createEndpoint(ep *endpointInfo, networkName string) (*endpointInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
getOrCreateLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error)
updateLoadBalancer(hnsLbID string, sourceVip, vip string, endpoints []endpointInfo, flags loadBalancerFlags, protocol, internalPort, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error)
deleteLoadBalancer(hnsID string) error
}

Expand All @@ -54,6 +55,33 @@ var (
LoadBalancerPortMappingFlagsVipExternalIP hcn.LoadBalancerPortMappingFlags = 16
)

func getLoadBalancerPolicyFlags(flags loadBalancerFlags) (lbPortMappingFlags hcn.LoadBalancerPortMappingFlags, lbFlags hcn.LoadBalancerFlags) {
lbPortMappingFlags = hcn.LoadBalancerPortMappingFlagsNone
if flags.isILB {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB
}
if flags.useMUX {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux
}
if flags.preserveDIP {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP
}
if flags.localRoutedVIP {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP
}
if flags.isVipExternalIP {
lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP
}
lbFlags = hcn.LoadBalancerFlagsNone
if flags.isDSR {
lbFlags |= hcn.LoadBalancerFlagsDSR
}
if flags.isIPv6 {
lbFlags |= LoadBalancerFlagsIPv6
}
return
}

func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) {
hnsnetwork, err := hns.hcn.GetNetworkByName(name)
if err != nil {
Expand Down Expand Up @@ -315,7 +343,7 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
return loadBalancers, nil
}

func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
func (hns hns) getOrCreateLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
var id loadBalancerIdentifier
vips := []string{}
// Compute hash from backends (endpoint IDs)
Expand All @@ -336,32 +364,84 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags
return lb, nil
}

lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone
if flags.isILB {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB
lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags)

lbDistributionType := hcn.LoadBalancerDistributionNone

if flags.sessionAffinity {
lbDistributionType = hcn.LoadBalancerDistributionSourceIP
}
if flags.useMUX {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux

loadBalancer := &hcn.HostComputeLoadBalancer{
SourceVIP: sourceVip,
PortMappings: []hcn.LoadBalancerPortMapping{
{
Protocol: uint32(protocol),
InternalPort: internalPort,
ExternalPort: externalPort,
DistributionType: lbDistributionType,
Flags: lbPortMappingFlags,
},
},
FrontendVIPs: vips,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
Flags: lbFlags,
}
if flags.preserveDIP {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP

for _, ep := range endpoints {
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID)
}
if flags.localRoutedVIP {
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP

lb, err := hns.hcn.CreateLoadBalancer(loadBalancer)

if err != nil {
return nil, err
}
if flags.isVipExternalIP {
lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP

klog.V(1).InfoS("Created Hns loadbalancer policy resource", "loadBalancer", lb)
lbInfo := &loadBalancerInfo{
hnsID: lb.Id,
}
// Add to map of load balancers
previousLoadBalancers[id] = lbInfo
return lbInfo, err
}

lbFlags := hcn.LoadBalancerFlagsNone
if flags.isDSR {
lbFlags |= hcn.LoadBalancerFlagsDSR
func (hns hns) updateLoadBalancer(hnsLbID string,
sourceVip,
vip string,
endpoints []endpointInfo,
flags loadBalancerFlags,
protocol,
internalPort,
externalPort uint16,
previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {

var id loadBalancerIdentifier
vips := []string{}
// Compute hash from backends (endpoint IDs)
hash, err := hashEndpoints(endpoints)
if err != nil {
klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints)
return nil, err
}
if len(vip) > 0 {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}
vips = append(vips, vip)
} else {
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}
}

if flags.isIPv6 {
lbFlags |= LoadBalancerFlagsIPv6
if lb, found := previousLoadBalancers[id]; found {
klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb)
return lb, nil
}

lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags)

lbDistributionType := hcn.LoadBalancerDistributionNone

if flags.sessionAffinity {
Expand Down Expand Up @@ -391,13 +471,16 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags
loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID)
}

lb, err := hns.hcn.CreateLoadBalancer(loadBalancer)
klog.V(3).InfoS("Updating existing loadbalancer called", "hnsLbID", hnsLbID, "endpointCount", len(endpoints))

lb, err := hns.hcn.UpdateLoadBalancer(loadBalancer, hnsLbID)

if err != nil {
klog.V(2).ErrorS(err, "Error updating existing loadbalancer", "hnsLbID", hnsLbID, "error", err, "endpoints", endpoints)
return nil, err
}

klog.V(1).InfoS("Created Hns loadbalancer policy resource", "loadBalancer", lb)
klog.V(1).InfoS("Update loadbalancer is successful", "loadBalancer", lb)
lbInfo := &loadBalancerInfo{
hnsID: lb.Id,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/winkernel/hns_test.go
Expand Up @@ -364,7 +364,7 @@ func TestGetLoadBalancerExisting(t *testing.T) {
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsHash: hash}
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}

lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
lb, err := hns.getOrCreateLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)

if err != nil {
t.Error(err)
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestGetLoadBalancerNew(t *testing.T) {
hnsID: Endpoint.Id,
}
endpoints := []endpointInfo{*endpoint}
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
lb, err := hns.getOrCreateLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
if err != nil {
t.Error(err)
}
Expand Down

0 comments on commit d081c20

Please sign in to comment.