From 8964b6285473303e983b9d13183bb2834d6736ca Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 28 Feb 2023 18:06:30 +0000 Subject: [PATCH 1/3] make MixedProtocolNotSupported public Change-Id: Ib9f5ea8e36c831cd0e9649aa998c96f61d56122d --- staging/src/k8s.io/api/core/v1/types.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 754a23613db5..cfcfcab89ec4 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -6624,6 +6624,14 @@ const ( PortForwardRequestIDHeader = "requestID" ) +// These are the built-in errors for PortStatus. +const ( + // MixedProtocolNotSupported error in PortStatus means that the cloud provider + // can't ensure the port on the load balancer because mixed values of protocols + // on the same LoadBalancer type of Service are not supported by the cloud provider. + MixedProtocolNotSupported = "MixedProtocolNotSupported" +) + // PortStatus represents the error condition of a service port type PortStatus struct { From f86b911ab0ead1ef5773c21803643dcefc05e130 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 22 Feb 2023 15:05:57 +0000 Subject: [PATCH 2/3] don't process unsupported loadbalancers with mixed protocols Mixed protocols were not supported in GCE loadbalancers for old versions, so we should fail to avoid confusions on users. Ref: k8s.io/cloud-provider-gcp#475 Change-Id: I5fbd5230afbc51d595cacc96b3fa86473a3eb131 --- staging/src/k8s.io/api/core/v1/types.go | 6 +- .../gce/gce_loadbalancer.go | 74 +++++ .../gce/gce_loadbalancer_test.go | 255 ++++++++++++++++++ .../legacy-cloud-providers/gce/gce_util.go | 2 + 4 files changed, 335 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index cfcfcab89ec4..535921b27014 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -4266,6 +4266,9 @@ const ( // LoadBalancerPortsError represents the condition of the requested ports // on the cloud load balancer instance. LoadBalancerPortsError = "LoadBalancerPortsError" + // LoadBalancerPortsErrorReason reason in ServiceStatus condition LoadBalancerPortsError + // means the LoadBalancer was not able to be configured correctly. + LoadBalancerPortsErrorReason = "LoadBalancerMixedProtocolNotSupported" ) // ServiceStatus represents the current status of a service. @@ -6624,10 +6627,9 @@ const ( PortForwardRequestIDHeader = "requestID" ) -// These are the built-in errors for PortStatus. const ( // MixedProtocolNotSupported error in PortStatus means that the cloud provider - // can't ensure the port on the load balancer because mixed values of protocols + // can't publish the port on the load balancer because mixed values of protocols // on the same LoadBalancer type of Service are not supported by the cloud provider. MixedProtocolNotSupported = "MixedProtocolNotSupported" ) diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go index d9056205352c..969eaf3fc5b8 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer.go @@ -27,6 +27,9 @@ import ( "strings" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1apply "k8s.io/client-go/applyconfigurations/core/v1" + metav1apply "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/klog/v2" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" @@ -134,6 +137,28 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc return nil, err } + // Services with multiples protocols are not supported by this controller, warn the users and sets + // the corresponding Service Status Condition. + // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb + if err := checkMixedProtocol(svc.Spec.Ports); err != nil { + if hasLoadBalancerPortsError(svc) { + return nil, err + } + klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil { + return nil, errApply + } + return nil, err + } + klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme) existingFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region) @@ -187,6 +212,25 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc return err } + // Services with multiples protocols are not supported by this controller, warn the users and sets + // the corresponding Service Status Condition, but keep processing the Update to not break upgrades. + // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb + if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) { + klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name) + g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.") + svcApplyStatus := corev1apply.ServiceStatus().WithConditions( + metav1apply.Condition(). + WithType(v1.LoadBalancerPortsError). + WithStatus(metav1.ConditionTrue). + WithReason(v1.LoadBalancerPortsErrorReason). + WithMessage("LoadBalancer with multiple protocols are not supported")) + svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus) + if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil { + // the error is retried by the controller loop + return errApply + } + } + klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v, %v): updating with %d nodes", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, len(nodes)) switch scheme { @@ -226,3 +270,33 @@ func getSvcScheme(svc *v1.Service) cloud.LbScheme { } return cloud.SchemeExternal } + +// checkMixedProtocol checks if the Service Ports uses different protocols, +// per examples, TCP and UDP. +func checkMixedProtocol(ports []v1.ServicePort) error { + if len(ports) == 0 { + return nil + } + + firstProtocol := ports[0].Protocol + for _, port := range ports[1:] { + if port.Protocol != firstProtocol { + return fmt.Errorf("mixed protocol is not supported for LoadBalancer") + } + } + return nil +} + +// hasLoadBalancerPortsError checks if the Service has the LoadBalancerPortsError set to True +func hasLoadBalancerPortsError(service *v1.Service) bool { + if service == nil { + return false + } + + for _, cond := range service.Status.Conditions { + if cond.Type == v1.LoadBalancerPortsError { + return cond.Status == metav1.ConditionTrue + } + } + return false +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go index 3fc89668966b..0737df352647 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go @@ -21,10 +21,12 @@ package gce import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -193,3 +195,256 @@ func TestProjectsBasePath(t *testing.T) { t.Errorf("Compute projectsBasePath has changed. Got %q, want %q or %q", gce.projectsBasePath, expectProjectsBasePath, expectMtlsProjectsBasePath) } } + +func TestEnsureLoadBalancerMixedProtocols(t *testing.T) { + t.Parallel() + + vals := DefaultTestClusterValues() + gce, err := fakeGCECloud(vals) + require.NoError(t, err) + + nodeNames := []string{"test-node-1"} + nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) + require.NoError(t, err) + + apiService := fakeLoadbalancerService("") + apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{ + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }) + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{}) + require.NoError(t, err) + _, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes) + if err == nil { + t.Errorf("Expected error ensuring loadbalancer for Service with multiple ports") + } + if err.Error() != "mixed protocol is not supported for LoadBalancer" { + t.Fatalf("unexpected error, got: %s wanted \"mixed protocol is not supported for LoadBalancer\"", err.Error()) + } + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !hasLoadBalancerPortsError(apiService) { + t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions) + } +} + +func TestUpdateLoadBalancerMixedProtocols(t *testing.T) { + t.Parallel() + + vals := DefaultTestClusterValues() + gce, err := fakeGCECloud(vals) + require.NoError(t, err) + + nodeNames := []string{"test-node-1"} + nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) + require.NoError(t, err) + + apiService := fakeLoadbalancerService("") + apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{ + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }) + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{}) + require.NoError(t, err) + + // create an external loadbalancer to simulate an upgrade scenario where the loadbalancer exists + // before the new controller is running and later the Service is updated + _, err = createExternalLoadBalancer(gce, apiService, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName) + assert.NoError(t, err) + + err = gce.UpdateLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !hasLoadBalancerPortsError(apiService) { + t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions) + } +} + +func TestCheckMixedProtocol(t *testing.T) { + tests := []struct { + name string + annotations map[string]string + ports []v1.ServicePort + wantErr error + }{ + { + name: "TCP", + annotations: make(map[string]string), + ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "UDP", + annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"}, + ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "TCP", + annotations: make(map[string]string), + ports: []v1.ServicePort{ + { + Name: "port80", + Protocol: v1.ProtocolTCP, + Port: int32(80), + }, + { + Name: "port8080", + Protocol: v1.ProtocolTCP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "UDP", + annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"}, + ports: []v1.ServicePort{ + { + Name: "port80", + Protocol: v1.ProtocolUDP, + Port: int32(80), + }, + { + Name: "port8080", + Protocol: v1.ProtocolUDP, + Port: int32(8080), + }, + }, + wantErr: nil, + }, + { + name: "TCP and UDP", + annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"}, + ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolUDP, + Port: int32(53), + }, + { + Protocol: v1.ProtocolTCP, + Port: int32(53), + }, + }, + wantErr: fmt.Errorf("mixed protocol is not supported for LoadBalancer"), + }, + } + for _, test := range tests { + tt := test + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + err := checkMixedProtocol(tt.ports) + if tt.wantErr != nil { + assert.EqualError(t, err, tt.wantErr.Error()) + } else { + assert.Equal(t, err, nil) + } + }) + } +} + +func Test_hasLoadBalancerPortsError(t *testing.T) { + tests := []struct { + name string + service *v1.Service + want bool + }{ + { + name: "no status", + service: &v1.Service{}, + }, + { + name: "condition set to true", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "service1"}, + Spec: v1.ServiceSpec{ + ClusterIPs: []string{"1.2.3.4"}, + Type: "LoadBalancer", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}}, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}}, + Conditions: []metav1.Condition{ + { + Type: v1.LoadBalancerPortsError, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + want: true, + }, + { + name: "condition set false", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "service1"}, + Spec: v1.ServiceSpec{ + ClusterIPs: []string{"1.2.3.4"}, + Type: "LoadBalancer", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}}, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}}, + Conditions: []metav1.Condition{ + { + Type: v1.LoadBalancerPortsError, + Status: metav1.ConditionFalse, + }, + }, + }, + }, + }, + { + name: "multiple conditions unrelated", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "service1"}, + Spec: v1.ServiceSpec{ + ClusterIPs: []string{"1.2.3.4"}, + Type: "LoadBalancer", + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}}, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}}, + Conditions: []metav1.Condition{ + { + Type: "condition1", + Status: metav1.ConditionFalse, + }, + { + Type: "condition2", + Status: metav1.ConditionTrue, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := hasLoadBalancerPortsError(tt.service); got != tt.want { + t.Errorf("hasLoadBalancerPortsError() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go index 035ff25bbebe..4daff14d07bd 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" servicehelper "k8s.io/cloud-provider/service/helpers" netutils "k8s.io/utils/net" ) @@ -57,6 +58,7 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) { gce.AlphaFeatureGate = NewAlphaFeatureGate([]string{}) gce.nodeInformerSynced = func() bool { return true } gce.client = fake.NewSimpleClientset() + gce.eventRecorder = &record.FakeRecorder{} mockGCE := gce.c.(*cloud.MockGCE) mockGCE.MockTargetPools.AddInstanceHook = mock.AddInstanceHook From a423a5c9f4fc0c039ca45193f0e2183855e9b503 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 23 Mar 2023 22:50:59 +0000 Subject: [PATCH 3/3] test server side apply patch Change-Id: I739995f3cec6836ef721cd72608f66b1e0893a7d --- .../gce/gce_loadbalancer_test.go | 49 ++++++++++++++++--- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go index 0737df352647..f1d44370cd26 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/gce/gce_loadbalancer_test.go @@ -28,6 +28,10 @@ import ( "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clienttesting "k8s.io/client-go/testing" ) func TestGetLoadBalancer(t *testing.T) { @@ -202,6 +206,23 @@ func TestEnsureLoadBalancerMixedProtocols(t *testing.T) { vals := DefaultTestClusterValues() gce, err := fakeGCECloud(vals) require.NoError(t, err) + fakeClient := gce.client.(*fake.Clientset) + fakeClient.PrependReactor("patch", "services", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "status" { + return true, &v1.Service{}, fmt.Errorf("unexpected action %s", action) + } + patch := action.(clienttesting.PatchAction) + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode(patch.GetPatch(), nil, nil) + if err != nil { + return true, &v1.Service{}, fmt.Errorf("unable to decode patch") + } + service := obj.(*v1.Service) + if hasLoadBalancerPortsError(service) { + return true, service, nil + } + return true, service, fmt.Errorf("unexpected object %v", service) + }) nodeNames := []string{"test-node-1"} nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) @@ -221,13 +242,10 @@ func TestEnsureLoadBalancerMixedProtocols(t *testing.T) { if err.Error() != "mixed protocol is not supported for LoadBalancer" { t.Fatalf("unexpected error, got: %s wanted \"mixed protocol is not supported for LoadBalancer\"", err.Error()) } - apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) + _, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !hasLoadBalancerPortsError(apiService) { - t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions) - } } func TestUpdateLoadBalancerMixedProtocols(t *testing.T) { @@ -236,6 +254,23 @@ func TestUpdateLoadBalancerMixedProtocols(t *testing.T) { vals := DefaultTestClusterValues() gce, err := fakeGCECloud(vals) require.NoError(t, err) + fakeClient := gce.client.(*fake.Clientset) + fakeClient.PrependReactor("patch", "services", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "status" { + return true, &v1.Service{}, fmt.Errorf("unexpected action %s", action) + } + patch := action.(clienttesting.PatchAction) + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode(patch.GetPatch(), nil, nil) + if err != nil { + return true, &v1.Service{}, fmt.Errorf("unable to decode patch") + } + service := obj.(*v1.Service) + if hasLoadBalancerPortsError(service) { + return true, service, nil + } + return true, service, fmt.Errorf("unexpected object %v", service) + }) nodeNames := []string{"test-node-1"} nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) @@ -258,13 +293,11 @@ func TestUpdateLoadBalancerMixedProtocols(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) + _, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !hasLoadBalancerPortsError(apiService) { - t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions) - } + } func TestCheckMixedProtocol(t *testing.T) {