diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 257fde1abde5..bfb3b1d971a2 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -4405,6 +4405,9 @@ const ( // LoadBalancerPortsError represents the condition of the requested ports // on the cloud load balancer instance. LoadBalancerPortsError = "LoadBalancerPortsError" + // LoadBalancerPortsErrorReason reason in ServiceStatus condition LoadBalancerPortsError + // means the LoadBalancer was not able to be configured correctly. + LoadBalancerPortsErrorReason = "LoadBalancerMixedProtocolNotSupported" ) // ServiceStatus represents the current status of a service. @@ -6761,6 +6764,13 @@ const ( PortForwardRequestIDHeader = "requestID" ) +const ( + // MixedProtocolNotSupported error in PortStatus means that the cloud provider + // can't publish the port on the load balancer because mixed values of protocols + // on the same LoadBalancer type of Service are not supported by the cloud provider. + MixedProtocolNotSupported = "MixedProtocolNotSupported" +) + // PortStatus represents the error condition of a service port type PortStatus struct { diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go index d9056205352c..969eaf3fc5b8 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go @@ -27,6 +27,9 @@ import ( "strings" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" + metav1apply "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/klog/v2" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" @@ -134,6 +137,28 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc return nil, err } + // Services with multiples protocols are not supported by this controller, warn the users and sets + // the corresponding Service Status Condition. + // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb + if err := checkMixedProtocol(svc.Spec.Ports); err != nil { + if hasLoadBalancerPortsError(svc) { + return nil, err + } + klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil { + return nil, errApply + } + return nil, err + } + klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme) existingFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region) @@ -187,6 +212,25 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc return err } + // Services with multiples protocols are not supported by this controller, warn the users and sets + // the corresponding Service Status Condition, but keep processing the Update to not break upgrades. + // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb + if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { + klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil { + // the error is retried by the controller loop + return errApply + } + } + klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v, %v): updating with %d nodes", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, len(nodes)) switch scheme { @@ -226,3 +270,33 @@ func getSvcScheme(svc *v1.Service) cloud.LbScheme { } return cloud.SchemeExternal } + +// checkMixedProtocol checks if the Service Ports uses different protocols, +// per examples, TCP and UDP. +func checkMixedProtocol(ports []v1.ServicePort) error { + if len(ports) == 0 { + return nil + } + + firstProtocol := ports[0].Protocol + for _, port := range ports[1:] { + if port.Protocol != firstProtocol { + return fmt.Errorf("mixed protocol is not supported for LoadBalancer") + } + } + return nil +} + +// hasLoadBalancerPortsError checks if the Service has the LoadBalancerPortsError set to True +func hasLoadBalancerPortsError(service *v1.Service) bool { + if service == nil { + return false + } + + for _, cond := range service.Status.Conditions { + if cond.Type == v1.LoadBalancerPortsError { + return cond.Status == metav1.ConditionTrue + } + } + return false +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go index 3fc89668966b..0737df352647 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go @@ -21,10 +21,12 @@ package gce import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -193,3 +195,256 @@ func TestProjectsBasePath(t *testing.T) { t.Errorf("Compute projectsBasePath has changed. Got %q, want %q or %q", gce.projectsBasePath, expectProjectsBasePath, expectMtlsProjectsBasePath) } } + +func TestEnsureLoadBalancerMixedProtocols(t *testing.T) { + t.Parallel() + + vals := DefaultTestClusterValues() + gce, err := fakeGCECloud(vals) + require.NoError(t, err) + + nodeNames := []string{"test-node-1"} + nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) + require.NoError(t, err) + + apiService := fakeLoadbalancerService("") + apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{ + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }) + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{}) + require.NoError(t, err) + _, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes) + if err == nil { + t.Errorf("Expected error ensuring loadbalancer for Service with multiple ports") + } + if err.Error() != "mixed protocol is not supported for LoadBalancer" { + t.Fatalf("unexpected error, got: %s wanted \"mixed protocol is not supported for LoadBalancer\"", err.Error()) + } + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !hasLoadBalancerPortsError(apiService) { + t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions) + } +} + +func TestUpdateLoadBalancerMixedProtocols(t *testing.T) { + t.Parallel() + + vals := DefaultTestClusterValues() + gce, err := fakeGCECloud(vals) + require.NoError(t, err) + + nodeNames := []string{"test-node-1"} + nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) + require.NoError(t, err) + + apiService := fakeLoadbalancerService("") + apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{ + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }) + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{}) + require.NoError(t, err) + + // create an external loadbalancer to simulate an upgrade scenario where the loadbalancer exists + // before the new controller is running and later the Service is updated + _, err = createExternalLoadBalancer(gce, apiService, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName) + assert.NoError(t, err) + + err = gce.UpdateLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !hasLoadBalancerPortsError(apiService) { + t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions) + } +} + +func TestCheckMixedProtocol(t *testing.T) { + tests := []struct { + name string + annotations map[string]string + ports []v1.ServicePort + wantErr error + }{ + { + name: "TCP", + annotations: make(map[string]string), + ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "UDP", + annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"}, + ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "TCP", + annotations: make(map[string]string), + ports: []v1.ServicePort{ + { + Name: "port80", + Protocol: v1.ProtocolTCP, + Port: int32(80), + }, + { + Name: "port8080", + Protocol: v1.ProtocolTCP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "UDP", + annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"}, + ports: []v1.ServicePort{ + { + Name: "port80", + Protocol: v1.ProtocolUDP, + Port: int32(80), + }, + { + Name: "port8080", + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "TCP and UDP", + annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"}, + ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolUDP, + Port: int32(53), + }, + { + Protocol: v1.ProtocolTCP, + Port: int32(53), + }, + }, + wantErr: fmt.Errorf("mixed protocol is not supported for LoadBalancer"), + }, + } + for _, test := range tests { + tt := test + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + err := checkMixedProtocol(tt.ports) + if tt.wantErr != nil { + assert.EqualError(t, err, tt.wantErr.Error()) + } else { + assert.Equal(t, err, nil) + } + }) + } +} + +func Test_hasLoadBalancerPortsError(t *testing.T) { + tests := []struct { + name string + service *v1.Service + want bool + }{ + { + name: "no status", + service: &v1.Service{}, + }, + { + name: "condition set to true", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "service1"}, + Spec: v1.ServiceSpec{ + ClusterIPs: []string{"1.2.3.4"}, + Type: "LoadBalancer", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}}, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}}, + Conditions: []metav1.Condition{ + { + Type: v1.LoadBalancerPortsError, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + want: true, + }, + { + name: "condition set false", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "service1"}, + Spec: v1.ServiceSpec{ + ClusterIPs: []string{"1.2.3.4"}, + Type: "LoadBalancer", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}}, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}}, + Conditions: []metav1.Condition{ + { + Type: v1.LoadBalancerPortsError, + Status: metav1.ConditionFalse, + }, + }, + }, + }, + }, + { + name: "multiple conditions unrelated", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "service1"}, + Spec: v1.ServiceSpec{ + ClusterIPs: []string{"1.2.3.4"}, + Type: "LoadBalancer", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}}, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}}, + Conditions: []metav1.Condition{ + { + Type: "condition1", + Status: metav1.ConditionFalse, + }, + { + Type: "condition2", + Status: metav1.ConditionTrue, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := hasLoadBalancerPortsError(tt.service); got != tt.want { + t.Errorf("hasLoadBalancerPortsError() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go index 035ff25bbebe..4daff14d07bd 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" servicehelper "k8s.io/cloud-provider/service/helpers" netutils "k8s.io/utils/net" ) @@ -57,6 +58,7 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) { gce.AlphaFeatureGate = NewAlphaFeatureGate([]string{}) gce.nodeInformerSynced = func() bool { return true } gce.client = fake.NewSimpleClientset() + gce.eventRecorder = &record.FakeRecorder{} mockGCE := gce.c.(*cloud.MockGCE) mockGCE.MockTargetPools.AddInstanceHook = mock.AddInstanceHook