diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 2bd3d529de..23e9d76852 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -86,6 +86,7 @@ var ( EnableL7Ilb bool EnableCSM bool CSMServiceNEGSkipNamespaces []string + EnableNonGCPMode bool LeaderElection LeaderElectionConfiguration }{} @@ -202,6 +203,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 `Optional, whether or not to enable L7-ILB.`) flag.BoolVar(&F.EnableCSM, "enable-csm", false, "Enable CSM(Istio) support") flag.StringSliceVar(&F.CSMServiceNEGSkipNamespaces, "csm-service-skip-namespaces", []string{}, "Only for CSM mode, skip the NEG creation for Services in the given namespaces.") + flag.BoolVar(&F.EnableNonGCPMode, "enable-non-gcp-mode", false, "Set to true when running on a non-GCP cluster.") } type RateLimitSpecs struct { diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index cc28093275..ccc9041d28 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/neg/readiness" negsyncer "k8s.io/ingress-gce/pkg/neg/syncers" negtypes "k8s.io/ingress-gce/pkg/neg/types" @@ -113,9 +114,15 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg syncerKey := getSyncerKey(namespace, name, svcPort, portInfo) syncer, ok := manager.syncerMap[syncerKey] if !ok { + networkEndpointType := negtypes.VMNetworkEndpointType + if flags.F.EnableNonGCPMode { + networkEndpointType = negtypes.NonGCPPrivateEndpointType + } + syncer = negsyncer.NewTransactionSyncer( syncerKey, portInfo.NegName, + networkEndpointType, manager.recorder, manager.cloud, manager.zoneGetter, diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 92d188bf36..fba869fd7a 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -314,19 +314,22 @@ func TestGarbageCollectionNEG(t *testing.T) { t.Fatalf("Failed to ensure syncer: %v", err) } - negName := manager.namer.NEG("test", "test", 80) - manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ - Name: negName, - }, negtypes.TestZone1) - - if err := manager.GC(); err != nil { - t.Fatalf("Failed to GC: %v", err) - } + for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VMNetworkEndpointType, negtypes.NonGCPPrivateEndpointType} { + negName := manager.namer.NEG("test", "test", 80) + manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ + Name: negName, + NetworkEndpointType: string(networkEndpointType), + }, negtypes.TestZone1) + + if err := manager.GC(); err != nil { + t.Fatalf("Failed to GC: %v", err) + } - negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1) - for _, neg := range negs { - if neg.Name == negName { - t.Errorf("Expect NEG %q to be GCed.", negName) + negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1) + for _, neg := range negs { + if neg.Name == negName { + t.Errorf("Expect NEG %q to be GCed.", negName) + } } } diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index fe291f54d9..22e27610fa 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -36,6 +36,8 @@ type transactionSyncer struct { // metadata negtypes.NegSyncerKey negName string + // The type of the network endpoints in this NEG. + networkEndpointType negtypes.NetworkEndpointType // syncer provides syncer life cycle interfaces syncer negtypes.NegSyncer @@ -65,20 +67,21 @@ type transactionSyncer struct { reflector readiness.Reflector } -func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer { +func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, networkEndpointType negtypes.NetworkEndpointType, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer { // TransactionSyncer implements the syncer core ts := &transactionSyncer{ - NegSyncerKey: negSyncerKey, - negName: networkEndpointGroupName, - needInit: true, - transactions: NewTransactionTable(), - podLister: podLister, - serviceLister: serviceLister, - endpointLister: endpointLister, - recorder: recorder, - cloud: cloud, - zoneGetter: zoneGetter, - reflector: reflector, + NegSyncerKey: negSyncerKey, + negName: networkEndpointGroupName, + networkEndpointType: networkEndpointType, + needInit: true, + transactions: NewTransactionTable(), + podLister: podLister, + serviceLister: serviceLister, + endpointLister: endpointLister, + recorder: recorder, + cloud: cloud, + zoneGetter: zoneGetter, + reflector: reflector, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, networkEndpointGroupName, serviceLister, recorder, ts) @@ -130,7 +133,7 @@ func (s *transactionSyncer) syncInternal() error { return nil } - targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister, s.NegSyncerKey.SubsetLabels) + targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister, s.NegSyncerKey.SubsetLabels, s.networkEndpointType) if err != nil { return err } @@ -178,7 +181,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { var errList []error for _, zone := range zones { - if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.cloud, s.serviceLister, s.recorder); err != nil { + if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.networkEndpointType, s.cloud, s.serviceLister, s.recorder); err != nil { errList = append(errList, err) } } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index c4f8ce0c82..7a86ad2b0f 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1091,6 +1091,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud) (negty negsyncer := NewTransactionSyncer(svcPort, testNegName, + negtypes.VMNetworkEndpointType, record.NewFakeRecorder(100), fakeGCE, negtypes.NewFakeZoneGetter(), diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index f74881e8f7..9efc5bfdc7 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -39,11 +39,10 @@ const ( MAX_NETWORK_ENDPOINTS_PER_BATCH = 500 // For each NEG, only retries 15 times to process it. // This is a convention in kube-controller-manager. - maxRetries = 15 - minRetryDelay = 5 * time.Second - maxRetryDelay = 600 * time.Second - separator = "||" - negIPPortNetworkEndpointType = "GCE_VM_IP_PORT" + maxRetries = 15 + minRetryDelay = 5 * time.Second + maxRetryDelay = 600 * time.Second + separator = "||" ) // encodeEndpoint encodes ip and instance into a single string @@ -113,7 +112,7 @@ func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Serv } // ensureNetworkEndpointGroup ensures corresponding NEG is configured correctly in the specified zone. -func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error { +func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, networkEndpointType negtypes.NetworkEndpointType, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error { neg, err := cloud.GetNetworkEndpointGroup(negName, zone) if err != nil { // Most likely to be caused by non-existed NEG @@ -123,8 +122,11 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService needToCreate := false if neg == nil { needToCreate = true - } else if !utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) || - !utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL()) { + } else if networkEndpointType != negtypes.NonGCPPrivateEndpointType && + // Only perform the following checks when the NEGs are not Non-GCP NEGs. + // Non-GCP NEGs do not have associated network and subnetwork. + (!utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) || + !utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL())) { needToCreate = true klog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", negName, zone) err = cloud.DeleteNetworkEndpointGroup(negName, zone) @@ -141,11 +143,18 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService if needToCreate { klog.V(2).Infof("Creating NEG %q for %s in %q.", negName, negServicePortName, zone) + var subnetwork string + switch networkEndpointType { + case negtypes.NonGCPPrivateEndpointType: + subnetwork = "" + default: + subnetwork = cloud.SubnetworkURL() + } err = cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ Name: negName, - NetworkEndpointType: negIPPortNetworkEndpointType, + NetworkEndpointType: string(networkEndpointType), Network: cloud.NetworkURL(), - Subnetwork: cloud.SubnetworkURL(), + Subnetwork: subnetwork, }, zone) if err != nil { return err @@ -161,7 +170,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService } // toZoneNetworkEndpointMap translates addresses in endpoints object and Istio:DestinationRule subset into zone and endpoints map -func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer, subsetLables string) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) { +func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer, subsetLables string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) { zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} networkEndpointPodMap := negtypes.EndpointPodMap{} if endpoints == nil { @@ -227,6 +236,10 @@ func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.Zo if includeAllEndpoints || shouldPodBeInNeg(podLister, address.TargetRef.Namespace, address.TargetRef.Name) { networkEndpoint := negtypes.NetworkEndpoint{IP: address.IP, Port: matchPort, Node: *address.NodeName} + if networkEndpointType == negtypes.NonGCPPrivateEndpointType { + // Non-GCP network endpoints don't have associated nodes. + networkEndpoint.Node = "" + } zoneNetworkEndpointMap[zone].Insert(networkEndpoint) networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: address.TargetRef.Namespace, Name: address.TargetRef.Name} } diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 3f40182b01..6554dd2ebf 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -23,6 +23,8 @@ import ( "fmt" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -293,6 +295,85 @@ func TestNetworkEndpointCalculateDifference(t *testing.T) { } } +func TestEnsureNetworkEndpointGroup(t *testing.T) { + var ( + testZone = "test-zone" + testNamedPort = "named-port" + testServiceName = "test-svc" + testServiceNameSpace = "test-ns" + testNetwork = cloud.ResourcePath("network", &meta.Key{Zone: testZone, Name: "test-network"}) + testSubnetwork = cloud.ResourcePath("subnetwork", &meta.Key{Zone: testZone, Name: "test-subnetwork"}) + ) + + fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) + + testCases := []struct { + description string + negName string + enableNonGCPMode bool + networkEndpointType negtypes.NetworkEndpointType + expectedSubnetwork string + }{ + { + description: "Create NEG of type GCE_VM_IP_PORT", + negName: "gcp-neg", + enableNonGCPMode: false, + networkEndpointType: negtypes.VMNetworkEndpointType, + expectedSubnetwork: testSubnetwork, + }, + { + description: "Create NEG of type NON_GCP_PRIVATE_IP_PORT", + negName: "non-gcp-neg", + enableNonGCPMode: true, + networkEndpointType: negtypes.NonGCPPrivateEndpointType, + expectedSubnetwork: "", + }, + } + for _, tc := range testCases { + ensureNetworkEndpointGroup( + testServiceNameSpace, + testServiceName, + tc.negName, + testZone, + testNamedPort, + tc.networkEndpointType, + fakeCloud, + nil, + nil, + ) + + neg, err := fakeCloud.GetNetworkEndpointGroup(tc.negName, testZone) + if err != nil { + t.Errorf("Failed to retrieve NEG %q: %v", tc.negName, err) + } + + if neg.NetworkEndpointType != string(tc.networkEndpointType) { + t.Errorf("Unexpected NetworkEndpointType, expecting %q but got %q", tc.networkEndpointType, neg.NetworkEndpointType) + } + + if neg.Subnetwork != tc.expectedSubnetwork { + t.Errorf("Unexpected Subnetwork, expecting %q but got %q", tc.expectedSubnetwork, neg.Subnetwork) + } + + // Call ensureNetworkEndpointGroup with the same NEG. + err = ensureNetworkEndpointGroup( + testServiceNameSpace, + testServiceName, + tc.negName, + testZone, + testNamedPort, + tc.networkEndpointType, + fakeCloud, + nil, + nil, + ) + + if err != nil { + t.Errorf("Unexpected error when called with duplicated NEG: %v", err) + } + } +} + func TestToZoneNetworkEndpointMapUtil(t *testing.T) { t.Parallel() _, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues()))) @@ -326,16 +407,18 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { zoneGetter := negtypes.NewFakeZoneGetter() testCases := []struct { - desc string - targetPort string - endpointSets map[string]negtypes.NetworkEndpointSet - expectMap negtypes.EndpointPodMap + desc string + targetPort string + endpointSets map[string]negtypes.NetworkEndpointSet + expectMap negtypes.EndpointPodMap + networkEndpointType negtypes.NetworkEndpointType }{ { - desc: "non exist target port", - targetPort: "8888", - endpointSets: map[string]negtypes.NetworkEndpointSet{}, - expectMap: negtypes.EndpointPodMap{}, + desc: "non exist target port", + targetPort: "8888", + endpointSets: map[string]negtypes.NetworkEndpointSet{}, + expectMap: negtypes.EndpointPodMap{}, + networkEndpointType: negtypes.VMNetworkEndpointType, }, { desc: "target port number", @@ -356,6 +439,7 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"}, networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"}, }, + networkEndpointType: negtypes.VMNetworkEndpointType, }, { desc: "named target port", @@ -376,11 +460,33 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod10"}, networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod11"}, }, + networkEndpointType: negtypes.VMNetworkEndpointType, + }, + { + desc: "Non-GCP network endpoints", + targetPort: "80", + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.1.1||||80"), + networkEndpointFromEncodedEndpoint("10.100.1.2||||80"), + networkEndpointFromEncodedEndpoint("10.100.2.1||||80"), + networkEndpointFromEncodedEndpoint("10.100.1.3||||80")), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet( + networkEndpointFromEncodedEndpoint("10.100.3.1||||80")), + }, + expectMap: negtypes.EndpointPodMap{ + networkEndpointFromEncodedEndpoint("10.100.1.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod1"}, + networkEndpointFromEncodedEndpoint("10.100.1.2||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod2"}, + networkEndpointFromEncodedEndpoint("10.100.2.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"}, + networkEndpointFromEncodedEndpoint("10.100.3.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"}, + networkEndpointFromEncodedEndpoint("10.100.1.3||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"}, + }, + networkEndpointType: negtypes.NonGCPPrivateEndpointType, }, } for _, tc := range testCases { - retSet, retMap, err := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, podLister, "") + retSet, retMap, err := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, podLister, "", tc.networkEndpointType) if err != nil { t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) } diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index bde3503612..0857cccc3f 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -29,6 +29,13 @@ import ( "k8s.io/ingress-gce/pkg/annotations" ) +type NetworkEndpointType string + +const ( + VMNetworkEndpointType = NetworkEndpointType("GCE_VM_IP_PORT") + NonGCPPrivateEndpointType = NetworkEndpointType("NON_GCP_PRIVATE_IP_PORT") +) + // SvcPortMap is a map of ServicePort:TargetPort type SvcPortMap map[int32]string