Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.0] Add servie annotation to disable lb floating ip #1981

Merged
merged 1 commit into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
k8s.io/component-base v0.21.11
k8s.io/controller-manager v0.21.11
k8s.io/klog/v2 v2.9.0
k8s.io/utils v0.0.0-20211116205334-6203023598ed
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
sigs.k8s.io/yaml v1.2.0
)

Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,9 @@ k8s.io/klog/v2 v2.9.0 h1:D7HV+n1V57XeZ0m6tdRkfknthUaM06VFbWldOFh8kzM=
k8s.io/klog/v2 v2.9.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20211110012726-3cc51fd1e909 h1:s77MRc/+/eQjsF89MB12JssAlsoi9mnNoaacRqibeAU=
k8s.io/kube-openapi v0.0.0-20211110012726-3cc51fd1e909/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE=
k8s.io/utils v0.0.0-20211116205334-6203023598ed h1:ck1fRPWPJWsMd8ZRFsWc6mh/zHp5fZ/shhbrgPUxDAE=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 h1:HNSDgDCrr/6Ly3WEGKZftiE7IY19Vz2GdbOCyI4qqhc=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
4 changes: 4 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ const (
// is `a=b,c=d,...`. After updated, the old user-assigned tags would not be replaced by the new ones.
ServiceAnnotationAzurePIPTags = "service.beta.kubernetes.io/azure-pip-tags"

// ServiceAnnotationDisableLoadBalancerFloatingIP is the annotation used on the service to disable floating IP in load balancer rule.
// If omitted, the default value is false
ServiceAnnotationDisableLoadBalancerFloatingIP = "service.beta.kubernetes.io/azure-disable-load-balancer-floating-ip"

// ServiceAnnotationAzurePIPTags sets the additional Public IPs (split by comma) besides the service's Public IP configured on LoadBalancer.
// These additional Public IPs would be consumed by kube-proxy to configure the iptables rules on each node. Note they would not be configured
// automatically on Azure LoadBalancer. Instead, they need to be configured manually (e.g. on Azure cross-region LoadBalancer by another operator).
Expand Down
5 changes: 5 additions & 0 deletions pkg/consts/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func IsK8sServiceInternalIPv6(service *v1.Service) bool {
return IsK8sServiceUsingInternalLoadBalancer(service) && net.IsIPv6String(service.Spec.ClusterIP)
}

// IsK8sServiceDisableLoadBalancerFloatingIP return if floating IP in load balancer is disabled in kubernetes service annotations
func IsK8sServiceDisableLoadBalancerFloatingIP(service *v1.Service) bool {
return expectAttributeInSvcAnnotationBeEqualTo(service.Annotations, ServiceAnnotationDisableLoadBalancerFloatingIP, TrueAnnotationValue)
}

// GetHealthProbeConfigOfPortFromK8sSvcAnnotation get health probe configuration for port
func GetHealthProbeConfigOfPortFromK8sSvcAnnotation(annotations map[string]string, port int32, key HealthProbeParams, validators ...BusinessValidator) (*string, error) {
return GetAttributeValueInSvcAnnotation(annotations, BuildHealthProbeAnnotationKeyForPort(port, key), validators...)
Expand Down
69 changes: 62 additions & 7 deletions pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ func (az *Cloud) reconcileService(ctx context.Context, clusterName string, servi
if lbStatus != nil && len(lbStatus.Ingress) > 0 {
serviceIP = &lbStatus.Ingress[0].IP
}

backendPrivateIPs := az.GetBackendPrivateIPs(clusterName, service, lb)
klog.V(2).Infof("reconcileService: reconciling security group for service %q with IP %q, wantLb = true", serviceName, logSafe(serviceIP))
if _, err := az.reconcileSecurityGroup(clusterName, service, serviceIP, true /* wantLb */); err != nil {
if _, err := az.reconcileSecurityGroup(clusterName, service, serviceIP, &backendPrivateIPs, true /* wantLb */); err != nil {
klog.Errorf("reconcileSecurityGroup(%s) failed: %#v", serviceName, err)
return nil, err
}
Expand Down Expand Up @@ -135,6 +137,43 @@ func (az *Cloud) reconcileService(ctx context.Context, clusterName string, servi
return lbStatus, nil
}

func (az *Cloud) GetBackendPrivateIPs(clusterName string, service *v1.Service, lb *network.LoadBalancer) []string {
serviceName := getServiceName(service)
lbBackendPoolName := getBackendPoolName(clusterName, service)
if lb.LoadBalancerPropertiesFormat == nil || lb.LoadBalancerPropertiesFormat.BackendAddressPools == nil {
return nil
}

backendPrivateIPs := sets.NewString()
for _, bp := range *lb.BackendAddressPools {
if strings.EqualFold(to.String(bp.Name), lbBackendPoolName) {
klog.V(10).Infof("bc.GetBackendPrivateIPs for service (%s): found wanted backendpool %s", serviceName, to.String(bp.Name))
if bp.BackendAddressPoolPropertiesFormat != nil && bp.BackendIPConfigurations != nil {
for _, backendIPConfig := range *bp.BackendIPConfigurations {
ipConfigID := to.String(backendIPConfig.ID)
nodeName, _, err := az.VMSet.GetNodeNameByIPConfigurationID(ipConfigID)
if err != nil {
klog.Errorf("bc.GetBackendPrivateIPs for service (%s): GetNodeNameByIPConfigurationID failed with error: %v", serviceName, err)
continue
}
privateIPs, err := az.VMSet.GetPrivateIPsByNodeName(nodeName)
if err != nil {
klog.Errorf("bc.GetBackendPrivateIPs for service (%s): GetPrivateIPsByNodeName(%s) failed with error: %v", serviceName, nodeName, err)
continue
}
if privateIPs != nil {
klog.V(2).Infof("bc.GetBackendPrivateIPs for service (%s): lb backendpool - found private IPs %v of node %s", serviceName, privateIPs, nodeName)
backendPrivateIPs.Insert(privateIPs...)
}
}
}
} else {
klog.V(10).Infof("bc.GetBackendPrivateIPs for service (%s): found unmanaged backendpool %s", serviceName, to.String(bp.Name))
}
}
return backendPrivateIPs.List()
}

// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
// When a client updates the internal load balancer annotation,
Expand Down Expand Up @@ -212,7 +251,7 @@ func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName stri
}

klog.V(2).Infof("EnsureLoadBalancerDeleted: reconciling security group for service %q with IP %q, wantLb = false", serviceName, serviceIPToCleanup)
_, err = az.reconcileSecurityGroup(clusterName, service, &serviceIPToCleanup, false /* wantLb */)
_, err = az.reconcileSecurityGroup(clusterName, service, &serviceIPToCleanup, &[]string{}, false /* wantLb */)
if err != nil {
return err
}
Expand Down Expand Up @@ -2346,6 +2385,10 @@ func (az *Cloud) getExpectedLBRules(
ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, az.getLoadBalancerResourceGroup(), *nodeEndpointHealthprobe.Name)),
}
}
if consts.IsK8sServiceDisableLoadBalancerFloatingIP(service) {
props.BackendPort = to.Int32Ptr(port.NodePort)
props.EnableFloatingIP = to.BoolPtr(false)
}
expectedRules = append(expectedRules, network.LoadBalancingRule{
Name: &lbRuleName,
LoadBalancingRulePropertiesFormat: props,
Expand Down Expand Up @@ -2428,7 +2471,7 @@ func (az *Cloud) getExpectedHAModeLoadBalancingRuleProperties(

// This reconciles the Network Security Group similar to how the LB is reconciled.
// This entails adding required, missing SecurityRules and removing stale rules.
func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, lbIP *string, wantLb bool) (*network.SecurityGroup, error) {
func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, lbIP *string, backendIPAddresses *[]string, wantLb bool) (*network.SecurityGroup, error) {
serviceName := getServiceName(service)
klog.V(5).Infof("reconcileSecurityGroup(%s): START clusterName=%q", serviceName, clusterName)

Expand All @@ -2446,6 +2489,11 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
return nil, err
}

disableFloatingIP := false
if consts.IsK8sServiceDisableLoadBalancerFloatingIP(service) {
disableFloatingIP = true
}

destinationIPAddress := ""
if wantLb && lbIP == nil {
return nil, fmt.Errorf("no load balancer IP for setting up security rules for service %s", service.Name)
Expand Down Expand Up @@ -2489,7 +2537,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
sourceAddressPrefixes = append(sourceAddressPrefixes, serviceTags...)
}

expectedSecurityRules, err := az.getExpectedSecurityRules(wantLb, ports, sourceAddressPrefixes, service, destinationIPAddresses, sourceRanges)
expectedSecurityRules, err := az.getExpectedSecurityRules(wantLb, ports, sourceAddressPrefixes, service, destinationIPAddresses, sourceRanges, *backendIPAddresses, disableFloatingIP)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2629,7 +2677,7 @@ func (az *Cloud) reconcileSecurityRules(sg network.SecurityGroup, service *v1.Se
return dirtySg, updatedRules, nil
}

func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, sourceAddressPrefixes []string, service *v1.Service, destinationIPAddresses []string, sourceRanges utilnet.IPNetSet) ([]network.SecurityRule, error) {
func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, sourceAddressPrefixes []string, service *v1.Service, destinationIPAddresses []string, sourceRanges utilnet.IPNetSet, backendIPAddresses []string, disableFloatingIP bool) ([]network.SecurityRule, error) {
expectedSecurityRules := []network.SecurityRule{}

if wantLb {
Expand All @@ -2640,6 +2688,10 @@ func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, s
if err != nil {
return nil, err
}
dstPort := port.Port
if disableFloatingIP {
dstPort = port.NodePort
}
for j := range sourceAddressPrefixes {
ix := i*len(sourceAddressPrefixes) + j
securityRuleName := az.getSecurityRuleName(service, port, sourceAddressPrefixes[j])
Expand All @@ -2648,13 +2700,16 @@ func (az *Cloud) getExpectedSecurityRules(wantLb bool, ports []v1.ServicePort, s
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Protocol: *securityProto,
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(dstPort))),
SourceAddressPrefix: to.StringPtr(sourceAddressPrefixes[j]),
Access: network.SecurityRuleAccessAllow,
Direction: network.SecurityRuleDirectionInbound,
},
}
if len(destinationIPAddresses) == 1 {

if len(destinationIPAddresses) == 1 && disableFloatingIP {
nsgRule.DestinationAddressPrefixes = to.StringSlicePtr(backendIPAddresses)
} else if len(destinationIPAddresses) == 1 && !disableFloatingIP {
// continue to use DestinationAddressPrefix to avoid NSG updates for existing rules.
nsgRule.DestinationAddressPrefix = to.StringPtr(destinationIPAddresses[0])
} else {
Expand Down
72 changes: 70 additions & 2 deletions pkg/provider/azure_loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,15 @@ func TestReconcileLoadBalancerRule(t *testing.T) {
probeProtocol: "Tcp",
expectedErr: true,
},
{
desc: "getExpectedLBRules should return correct rule when floating ip annotations are added",
service: getTestService("test1", v1.ProtocolTCP, map[string]string{consts.ServiceAnnotationDisableLoadBalancerFloatingIP: "true"}, false, 80),
loadBalancerSku: "basic",
expectedRules: []network.LoadBalancingRule{
getFloatingIPTestRule(false, false, 80),
},
expectedProbes: getDefaultTestProbes("Tcp", ""),
},
}
rules := getDefaultTestRules(true)
rules[0].IdleTimeoutInMinutes = to.Int32Ptr(5)
Expand Down Expand Up @@ -2271,6 +2280,35 @@ func getHATestRules(enableTCPReset, hasProbe bool, protocol v1.Protocol) []netwo
return expectedRules
}

func getFloatingIPTestRule(enableTCPReset, enableFloatingIP bool, port int32) network.LoadBalancingRule {
expectedRules := network.LoadBalancingRule{
Name: to.StringPtr(fmt.Sprintf("atest1-TCP-%d", port)),
LoadBalancingRulePropertiesFormat: &network.LoadBalancingRulePropertiesFormat{
Protocol: network.TransportProtocol("Tcp"),
FrontendIPConfiguration: &network.SubResource{
ID: to.StringPtr("frontendIPConfigID"),
},
BackendAddressPool: &network.SubResource{
ID: to.StringPtr("backendPoolID"),
},
LoadDistribution: "Default",
FrontendPort: to.Int32Ptr(port),
BackendPort: to.Int32Ptr(getBackendPort(port)),
EnableFloatingIP: to.BoolPtr(enableFloatingIP),
DisableOutboundSnat: to.BoolPtr(false),
IdleTimeoutInMinutes: to.Int32Ptr(4),
Probe: &network.SubResource{
ID: to.StringPtr("/subscriptions/subscription/resourceGroups/rg/providers/" +
fmt.Sprintf("Microsoft.Network/loadBalancers/lbname/probes/atest1-TCP-%d", port)),
},
},
}
if enableTCPReset {
expectedRules.EnableTCPReset = to.BoolPtr(true)
}
return expectedRules
}

func getTestLoadBalancer(name, rgName, clusterName, identifier *string, service v1.Service, lbSku string) network.LoadBalancer {
caser := cases.Title(language.English)
lb := network.LoadBalancer{
Expand Down Expand Up @@ -3096,6 +3134,36 @@ func TestReconcileSecurityGroup(t *testing.T) {
},
},
},
{
desc: "reconcileSecurityGroup shall create sgs with floating IP disabled",
service: getTestService("test1", v1.ProtocolTCP, map[string]string{consts.ServiceAnnotationDisableLoadBalancerFloatingIP: "true"}, false, 80),
existingSgs: map[string]network.SecurityGroup{"nsg": {
Name: to.StringPtr("nsg"),
SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{},
}},
lbIP: to.StringPtr("1.2.3.4"),
wantLb: true,
expectedSg: &network.SecurityGroup{
Name: to.StringPtr("nsg"),
SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{
SecurityRules: &[]network.SecurityRule{
{
Name: to.StringPtr("atest1-TCP-80-Internet"),
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Protocol: network.SecurityRuleProtocol("Tcp"),
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(getBackendPort(80)))),
SourceAddressPrefix: to.StringPtr("Internet"),
DestinationAddressPrefixes: to.StringSlicePtr([]string{}),
Access: network.SecurityRuleAccess("Allow"),
Priority: to.Int32Ptr(500),
Direction: network.SecurityRuleDirection("Inbound"),
},
},
},
},
},
},
}

for i, test := range testCases {
Expand All @@ -3112,7 +3180,7 @@ func TestReconcileSecurityGroup(t *testing.T) {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
}
sg, err := az.reconcileSecurityGroup("testCluster", &test.service, test.lbIP, test.wantLb)
sg, err := az.reconcileSecurityGroup("testCluster", &test.service, test.lbIP, &[]string{}, test.wantLb)
assert.Equal(t, test.expectedSg, sg, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
}
Expand Down Expand Up @@ -3168,7 +3236,7 @@ func TestReconcileSecurityGroupLoadBalancerSourceRanges(t *testing.T) {
mockSGClient := az.SecurityGroupsClient.(*mocksecuritygroupclient.MockInterface)
mockSGClient.EXPECT().Get(gomock.Any(), az.ResourceGroup, gomock.Any(), gomock.Any()).Return(existingSg, nil)
mockSGClient.EXPECT().CreateOrUpdate(gomock.Any(), az.ResourceGroup, gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
sg, err := az.reconcileSecurityGroup("testCluster", &service, lbIP, true)
sg, err := az.reconcileSecurityGroup("testCluster", &service, lbIP, &[]string{}, true)
assert.NoError(t, err)
assert.Equal(t, expectedSg, *sg)
}
Expand Down