Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a flag for Non-GCP mode NEG controller. #920

Merged
merged 1 commit into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
EnableL7Ilb bool
EnableCSM bool
CSMServiceNEGSkipNamespaces []string
EnableNonGCPMode bool

LeaderElection LeaderElectionConfiguration
}{}
Expand Down Expand Up @@ -202,6 +203,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
`Optional, whether or not to enable L7-ILB.`)
flag.BoolVar(&F.EnableCSM, "enable-csm", false, "Enable CSM(Istio) support")
flag.StringSliceVar(&F.CSMServiceNEGSkipNamespaces, "csm-service-skip-namespaces", []string{}, "Only for CSM mode, skip the NEG creation for Services in the given namespaces.")
flag.BoolVar(&F.EnableNonGCPMode, "enable-non-gcp-mode", false, "Set to true when running on a non-GCP cluster.")
}

type RateLimitSpecs struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/neg/readiness"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
Expand Down Expand Up @@ -113,9 +114,15 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
syncerKey := getSyncerKey(namespace, name, svcPort, portInfo)
syncer, ok := manager.syncerMap[syncerKey]
if !ok {
networkEndpointType := negtypes.VMNetworkEndpointType
cxhiano marked this conversation as resolved.
Show resolved Hide resolved
if flags.F.EnableNonGCPMode {
networkEndpointType = negtypes.NonGCPPrivateEndpointType
}

syncer = negsyncer.NewTransactionSyncer(
syncerKey,
portInfo.NegName,
networkEndpointType,
manager.recorder,
manager.cloud,
manager.zoneGetter,
Expand Down
27 changes: 15 additions & 12 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,22 @@ func TestGarbageCollectionNEG(t *testing.T) {
t.Fatalf("Failed to ensure syncer: %v", err)
}

negName := manager.namer.NEG("test", "test", 80)
manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: negName,
}, negtypes.TestZone1)

if err := manager.GC(); err != nil {
t.Fatalf("Failed to GC: %v", err)
}
for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VMNetworkEndpointType, negtypes.NonGCPPrivateEndpointType} {
negName := manager.namer.NEG("test", "test", 80)
manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: negName,
NetworkEndpointType: string(networkEndpointType),
}, negtypes.TestZone1)

if err := manager.GC(); err != nil {
t.Fatalf("Failed to GC: %v", err)
}

negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1)
for _, neg := range negs {
if neg.Name == negName {
t.Errorf("Expect NEG %q to be GCed.", negName)
negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1)
for _, neg := range negs {
if neg.Name == negName {
t.Errorf("Expect NEG %q to be GCed.", negName)
}
}
}

Expand Down
31 changes: 17 additions & 14 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type transactionSyncer struct {
// metadata
negtypes.NegSyncerKey
negName string
// The type of the network endpoints in this NEG.
networkEndpointType negtypes.NetworkEndpointType

// syncer provides syncer life cycle interfaces
syncer negtypes.NegSyncer
Expand Down Expand Up @@ -65,20 +67,21 @@ type transactionSyncer struct {
reflector readiness.Reflector
}

func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer {
func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, networkEndpointType negtypes.NetworkEndpointType, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer {
// TransactionSyncer implements the syncer core
ts := &transactionSyncer{
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
needInit: true,
transactions: NewTransactionTable(),
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
reflector: reflector,
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
networkEndpointType: networkEndpointType,
needInit: true,
transactions: NewTransactionTable(),
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
reflector: reflector,
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, networkEndpointGroupName, serviceLister, recorder, ts)
Expand Down Expand Up @@ -130,7 +133,7 @@ func (s *transactionSyncer) syncInternal() error {
return nil
}

targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister, s.NegSyncerKey.SubsetLabels)
targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister, s.NegSyncerKey.SubsetLabels, s.networkEndpointType)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,7 +181,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {

var errList []error
for _, zone := range zones {
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.cloud, s.serviceLister, s.recorder); err != nil {
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.networkEndpointType, s.cloud, s.serviceLister, s.recorder); err != nil {
errList = append(errList, err)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud) (negty

negsyncer := NewTransactionSyncer(svcPort,
testNegName,
negtypes.VMNetworkEndpointType,
record.NewFakeRecorder(100),
fakeGCE,
negtypes.NewFakeZoneGetter(),
Expand Down
35 changes: 24 additions & 11 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ const (
MAX_NETWORK_ENDPOINTS_PER_BATCH = 500
// For each NEG, only retries 15 times to process it.
// This is a convention in kube-controller-manager.
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
separator = "||"
negIPPortNetworkEndpointType = "GCE_VM_IP_PORT"
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
separator = "||"
)

// encodeEndpoint encodes ip and instance into a single string
Expand Down Expand Up @@ -113,7 +112,7 @@ func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Serv
}

// ensureNetworkEndpointGroup ensures corresponding NEG is configured correctly in the specified zone.
func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error {
func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, networkEndpointType negtypes.NetworkEndpointType, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider 2 things:

  1. refactor the code to structure like this:
    A. construct expected NEG resource
    B. Check if the existing NEG is the same as expected
    C. Delete/Create/No-Op

  2. Or add a TODO on top and leave it later.

neg, err := cloud.GetNetworkEndpointGroup(negName, zone)
if err != nil {
// Most likely to be caused by non-existed NEG
Expand All @@ -123,8 +122,11 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
needToCreate := false
if neg == nil {
needToCreate = true
} else if !utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL()) {
} else if networkEndpointType != negtypes.NonGCPPrivateEndpointType &&
// Only perform the following checks when the NEGs are not Non-GCP NEGs.
// Non-GCP NEGs do not have associated network and subnetwork.
(!utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL())) {
needToCreate = true
klog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", negName, zone)
err = cloud.DeleteNetworkEndpointGroup(negName, zone)
Expand All @@ -141,11 +143,18 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService

if needToCreate {
klog.V(2).Infof("Creating NEG %q for %s in %q.", negName, negServicePortName, zone)
var subnetwork string
switch networkEndpointType {
case negtypes.NonGCPPrivateEndpointType:
subnetwork = ""
default:
subnetwork = cloud.SubnetworkURL()
}
err = cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: negName,
NetworkEndpointType: negIPPortNetworkEndpointType,
NetworkEndpointType: string(networkEndpointType),
Network: cloud.NetworkURL(),
Subnetwork: cloud.SubnetworkURL(),
Subnetwork: subnetwork,
}, zone)
if err != nil {
return err
Expand All @@ -161,7 +170,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
}

// toZoneNetworkEndpointMap translates addresses in endpoints object and Istio:DestinationRule subset into zone and endpoints map
func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer, subsetLables string) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer, subsetLables string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
if endpoints == nil {
Expand Down Expand Up @@ -227,6 +236,10 @@ func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.Zo

if includeAllEndpoints || shouldPodBeInNeg(podLister, address.TargetRef.Namespace, address.TargetRef.Name) {
networkEndpoint := negtypes.NetworkEndpoint{IP: address.IP, Port: matchPort, Node: *address.NodeName}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: address.TargetRef.Namespace, Name: address.TargetRef.Name}
}
Expand Down
124 changes: 115 additions & 9 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"fmt"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -293,6 +295,85 @@ func TestNetworkEndpointCalculateDifference(t *testing.T) {
}
}

func TestEnsureNetworkEndpointGroup(t *testing.T) {
var (
testZone = "test-zone"
testNamedPort = "named-port"
testServiceName = "test-svc"
testServiceNameSpace = "test-ns"
testNetwork = cloud.ResourcePath("network", &meta.Key{Zone: testZone, Name: "test-network"})
testSubnetwork = cloud.ResourcePath("subnetwork", &meta.Key{Zone: testZone, Name: "test-subnetwork"})
)

fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork)

testCases := []struct {
description string
negName string
enableNonGCPMode bool
networkEndpointType negtypes.NetworkEndpointType
expectedSubnetwork string
}{
{
description: "Create NEG of type GCE_VM_IP_PORT",
negName: "gcp-neg",
enableNonGCPMode: false,
networkEndpointType: negtypes.VMNetworkEndpointType,
expectedSubnetwork: testSubnetwork,
},
{
description: "Create NEG of type NON_GCP_PRIVATE_IP_PORT",
negName: "non-gcp-neg",
enableNonGCPMode: true,
networkEndpointType: negtypes.NonGCPPrivateEndpointType,
expectedSubnetwork: "",
},
}
for _, tc := range testCases {
ensureNetworkEndpointGroup(
testServiceNameSpace,
testServiceName,
tc.negName,
testZone,
testNamedPort,
tc.networkEndpointType,
fakeCloud,
nil,
nil,
)

neg, err := fakeCloud.GetNetworkEndpointGroup(tc.negName, testZone)
if err != nil {
t.Errorf("Failed to retrieve NEG %q: %v", tc.negName, err)
}

if neg.NetworkEndpointType != string(tc.networkEndpointType) {
t.Errorf("Unexpected NetworkEndpointType, expecting %q but got %q", tc.networkEndpointType, neg.NetworkEndpointType)
}

if neg.Subnetwork != tc.expectedSubnetwork {
t.Errorf("Unexpected Subnetwork, expecting %q but got %q", tc.expectedSubnetwork, neg.Subnetwork)
}

// Call ensureNetworkEndpointGroup with the same NEG.
err = ensureNetworkEndpointGroup(
testServiceNameSpace,
testServiceName,
tc.negName,
testZone,
testNamedPort,
tc.networkEndpointType,
fakeCloud,
nil,
nil,
)

if err != nil {
t.Errorf("Unexpected error when called with duplicated NEG: %v", err)
}
}
}

func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
t.Parallel()
_, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())))
Expand Down Expand Up @@ -326,16 +407,18 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {

zoneGetter := negtypes.NewFakeZoneGetter()
testCases := []struct {
desc string
targetPort string
endpointSets map[string]negtypes.NetworkEndpointSet
expectMap negtypes.EndpointPodMap
desc string
targetPort string
endpointSets map[string]negtypes.NetworkEndpointSet
expectMap negtypes.EndpointPodMap
networkEndpointType negtypes.NetworkEndpointType
}{
{
desc: "non exist target port",
targetPort: "8888",
endpointSets: map[string]negtypes.NetworkEndpointSet{},
expectMap: negtypes.EndpointPodMap{},
desc: "non exist target port",
targetPort: "8888",
endpointSets: map[string]negtypes.NetworkEndpointSet{},
expectMap: negtypes.EndpointPodMap{},
networkEndpointType: negtypes.VMNetworkEndpointType,
},
{
desc: "target port number",
Expand All @@ -356,6 +439,7 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"},
networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"},
},
networkEndpointType: negtypes.VMNetworkEndpointType,
},
{
desc: "named target port",
Expand All @@ -376,11 +460,33 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod10"},
networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod11"},
},
networkEndpointType: negtypes.VMNetworkEndpointType,
},
{
desc: "Non-GCP network endpoints",
targetPort: "80",
endpointSets: map[string]negtypes.NetworkEndpointSet{
negtypes.TestZone1: negtypes.NewNetworkEndpointSet(
networkEndpointFromEncodedEndpoint("10.100.1.1||||80"),
networkEndpointFromEncodedEndpoint("10.100.1.2||||80"),
networkEndpointFromEncodedEndpoint("10.100.2.1||||80"),
networkEndpointFromEncodedEndpoint("10.100.1.3||||80")),
negtypes.TestZone2: negtypes.NewNetworkEndpointSet(
networkEndpointFromEncodedEndpoint("10.100.3.1||||80")),
},
expectMap: negtypes.EndpointPodMap{
networkEndpointFromEncodedEndpoint("10.100.1.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod1"},
networkEndpointFromEncodedEndpoint("10.100.1.2||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod2"},
networkEndpointFromEncodedEndpoint("10.100.2.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"},
networkEndpointFromEncodedEndpoint("10.100.3.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"},
networkEndpointFromEncodedEndpoint("10.100.1.3||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"},
},
networkEndpointType: negtypes.NonGCPPrivateEndpointType,
},
}

for _, tc := range testCases {
retSet, retMap, err := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, podLister, "")
retSet, retMap, err := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, podLister, "", tc.networkEndpointType)
if err != nil {
t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err)
}
Expand Down
Loading