Skip to content

Commit

Permalink
don't process unsupported loadbalancers with mixed protocols
Browse files Browse the repository at this point in the history
Mixed protocols were not supported in GCE loadbalancers for old
versions, so we should fail to avoid confusions on users.

Ref: k8s.io/cloud-provider-gcp#475
Change-Id: I5fbd5230afbc51d595cacc96b3fa86473a3eb131
  • Loading branch information
aojea committed May 25, 2023
1 parent 32ff319 commit 6300196
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 2 deletions.
6 changes: 4 additions & 2 deletions staging/src/k8s.io/api/core/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4405,6 +4405,9 @@ const (
// LoadBalancerPortsError represents the condition of the requested ports
// on the cloud load balancer instance.
LoadBalancerPortsError = "LoadBalancerPortsError"
// LoadBalancerPortsErrorReason reason in ServiceStatus condition LoadBalancerPortsError
// means the LoadBalancer was not able to be configured correctly.
LoadBalancerPortsErrorReason = "LoadBalancerMixedProtocolNotSupported"
)

// ServiceStatus represents the current status of a service.
Expand Down Expand Up @@ -6761,10 +6764,9 @@ const (
PortForwardRequestIDHeader = "requestID"
)

// These are the built-in errors for PortStatus.
const (
// MixedProtocolNotSupported error in PortStatus means that the cloud provider
// can't ensure the port on the load balancer because mixed values of protocols
// can't publish the port on the load balancer because mixed values of protocols
// on the same LoadBalancer type of Service are not supported by the cloud provider.
MixedProtocolNotSupported = "MixedProtocolNotSupported"
)
Expand Down
74 changes: 74 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
metav1apply "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/klog/v2"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
Expand Down Expand Up @@ -134,6 +137,28 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc
return nil, err
}

// Services with multiples protocols are not supported by this controller, warn the users and sets
// the corresponding Service Status Condition.
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb
if err := checkMixedProtocol(svc.Spec.Ports); err != nil {
if hasLoadBalancerPortsError(svc) {
return nil, err
}
klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name)
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.")
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
metav1apply.Condition().
WithType(v1.LoadBalancerPortsError).
WithStatus(metav1.ConditionTrue).
WithReason(v1.LoadBalancerPortsErrorReason).
WithMessage("LoadBalancer with multiple protocols are not supported"))
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil {
return nil, errApply
}
return nil, err
}

klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme)

existingFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region)
Expand Down Expand Up @@ -187,6 +212,25 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc
return err
}

// Services with multiples protocols are not supported by this controller, warn the users and sets
// the corresponding Service Status Condition, but keep processing the Update to not break upgrades.
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb
if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) {
klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name)
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.")
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
metav1apply.Condition().
WithType(v1.LoadBalancerPortsError).
WithStatus(metav1.ConditionTrue).
WithReason(v1.LoadBalancerPortsErrorReason).
WithMessage("LoadBalancer with multiple protocols are not supported"))
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil {
// the error is retried by the controller loop
return errApply
}
}

klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v, %v): updating with %d nodes", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, len(nodes))

switch scheme {
Expand Down Expand Up @@ -226,3 +270,33 @@ func getSvcScheme(svc *v1.Service) cloud.LbScheme {
}
return cloud.SchemeExternal
}

// checkMixedProtocol checks if the Service Ports uses different protocols,
// per examples, TCP and UDP.
func checkMixedProtocol(ports []v1.ServicePort) error {
if len(ports) == 0 {
return nil
}

firstProtocol := ports[0].Protocol
for _, port := range ports[1:] {
if port.Protocol != firstProtocol {
return fmt.Errorf("mixed protocol is not supported for LoadBalancer")
}
}
return nil
}

// hasLoadBalancerPortsError checks if the Service has the LoadBalancerPortsError set to True
func hasLoadBalancerPortsError(service *v1.Service) bool {
if service == nil {
return false
}

for _, cond := range service.Status.Conditions {
if cond.Type == v1.LoadBalancerPortsError {
return cond.Status == metav1.ConditionTrue
}
}
return false
}
255 changes: 255 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ package gce

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -193,3 +195,256 @@ func TestProjectsBasePath(t *testing.T) {
t.Errorf("Compute projectsBasePath has changed. Got %q, want %q or %q", gce.projectsBasePath, expectProjectsBasePath, expectMtlsProjectsBasePath)
}
}

func TestEnsureLoadBalancerMixedProtocols(t *testing.T) {
t.Parallel()

vals := DefaultTestClusterValues()
gce, err := fakeGCECloud(vals)
require.NoError(t, err)

nodeNames := []string{"test-node-1"}
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
require.NoError(t, err)

apiService := fakeLoadbalancerService("")
apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{
Protocol: v1.ProtocolUDP,
Port: int32(8080),
})
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{})
require.NoError(t, err)
_, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes)
if err == nil {
t.Errorf("Expected error ensuring loadbalancer for Service with multiple ports")
}
if err.Error() != "mixed protocol is not supported for LoadBalancer" {
t.Fatalf("unexpected error, got: %s wanted \"mixed protocol is not supported for LoadBalancer\"", err.Error())
}
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !hasLoadBalancerPortsError(apiService) {
t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions)
}
}

func TestUpdateLoadBalancerMixedProtocols(t *testing.T) {
t.Parallel()

vals := DefaultTestClusterValues()
gce, err := fakeGCECloud(vals)
require.NoError(t, err)

nodeNames := []string{"test-node-1"}
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
require.NoError(t, err)

apiService := fakeLoadbalancerService("")
apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{
Protocol: v1.ProtocolUDP,
Port: int32(8080),
})
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{})
require.NoError(t, err)

// create an external loadbalancer to simulate an upgrade scenario where the loadbalancer exists
// before the new controller is running and later the Service is updated
_, err = createExternalLoadBalancer(gce, apiService, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
assert.NoError(t, err)

err = gce.UpdateLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !hasLoadBalancerPortsError(apiService) {
t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions)
}
}

func TestCheckMixedProtocol(t *testing.T) {
tests := []struct {
name string
annotations map[string]string
ports []v1.ServicePort
wantErr error
}{
{
name: "TCP",
annotations: make(map[string]string),
ports: []v1.ServicePort{
{
Protocol: v1.ProtocolTCP,
Port: int32(8080),
},
},
wantErr: nil,
},
{
name: "UDP",
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
ports: []v1.ServicePort{
{
Protocol: v1.ProtocolUDP,
Port: int32(8080),
},
},
wantErr: nil,
},
{
name: "TCP",
annotations: make(map[string]string),
ports: []v1.ServicePort{
{
Name: "port80",
Protocol: v1.ProtocolTCP,
Port: int32(80),
},
{
Name: "port8080",
Protocol: v1.ProtocolTCP,
Port: int32(8080),
},
},
wantErr: nil,
},
{
name: "UDP",
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
ports: []v1.ServicePort{
{
Name: "port80",
Protocol: v1.ProtocolUDP,
Port: int32(80),
},
{
Name: "port8080",
Protocol: v1.ProtocolUDP,
Port: int32(8080),
},
},
wantErr: nil,
},
{
name: "TCP and UDP",
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
ports: []v1.ServicePort{
{
Protocol: v1.ProtocolUDP,
Port: int32(53),
},
{
Protocol: v1.ProtocolTCP,
Port: int32(53),
},
},
wantErr: fmt.Errorf("mixed protocol is not supported for LoadBalancer"),
},
}
for _, test := range tests {
tt := test
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := checkMixedProtocol(tt.ports)
if tt.wantErr != nil {
assert.EqualError(t, err, tt.wantErr.Error())
} else {
assert.Equal(t, err, nil)
}
})
}
}

func Test_hasLoadBalancerPortsError(t *testing.T) {
tests := []struct {
name string
service *v1.Service
want bool
}{
{
name: "no status",
service: &v1.Service{},
},
{
name: "condition set to true",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "service1"},
Spec: v1.ServiceSpec{
ClusterIPs: []string{"1.2.3.4"},
Type: "LoadBalancer",
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}},
Conditions: []metav1.Condition{
{
Type: v1.LoadBalancerPortsError,
Status: metav1.ConditionTrue,
},
},
},
},
want: true,
},
{
name: "condition set false",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "service1"},
Spec: v1.ServiceSpec{
ClusterIPs: []string{"1.2.3.4"},
Type: "LoadBalancer",
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}},
Conditions: []metav1.Condition{
{
Type: v1.LoadBalancerPortsError,
Status: metav1.ConditionFalse,
},
},
},
},
},
{
name: "multiple conditions unrelated",
service: &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "service1"},
Spec: v1.ServiceSpec{
ClusterIPs: []string{"1.2.3.4"},
Type: "LoadBalancer",
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}},
Conditions: []metav1.Condition{
{
Type: "condition1",
Status: metav1.ConditionFalse,
},
{
Type: "condition2",
Status: metav1.ConditionTrue,
},
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := hasLoadBalancerPortsError(tt.service); got != tt.want {
t.Errorf("hasLoadBalancerPortsError() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 6300196

Please sign in to comment.