Skip to content

Commit

Permalink
UPSTREAM: 84466: gce: ensureInternalInstanceGroups: reuse instance-gr…
Browse files Browse the repository at this point in the history
…oups for internal load balancers

UPSTREAM: 84466:  legacy-cloud-providers/gce/gce_fake.go: NewFakeGCECloud: make sure that the secondary zone is also part of managedZones

UPSTREAM: 84466:  gce: ensureInternalInstanceGroups: reuse instance-groups for internal load balancers

UPSTREAM: 84466: gce: add ExternalInstanceGroupsPrefix to filter instance groups that will be re-used for ILB backend

UPSTREAM: 84466: gce: skip ensureInstanceGroup for a zone that has no remaining nodes for k8s managed IG

OpenShift-Rebase-Source: a58245a
  • Loading branch information
abhinavdahiya authored and bertinatto committed Apr 11, 2023
1 parent 3a2691b commit 32dbe82
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 43 deletions.
69 changes: 39 additions & 30 deletions staging/src/k8s.io/legacy-cloud-providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ type Cloud struct {
useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter
manager diskServiceManager

externalInstanceGroupsPrefix string // If non-"", finds prefixed instance groups for ILB.

// Lock for access to nodeZones
nodeZonesLock sync.Mutex
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
Expand Down Expand Up @@ -193,6 +196,9 @@ type ConfigGlobal struct {
NodeInstancePrefix string `gcfg:"node-instance-prefix"`
Regional bool `gcfg:"regional"`
Multizone bool `gcfg:"multizone"`
// ExternalInstanceGroupsPrefix is the prefix that will be used to filter instance groups
// that be backend for ILB containing cluster nodes if not-empty.
ExternalInstanceGroupsPrefix string `gcfg:"external-instance-groups-prefix"`
// APIEndpoint is the GCE compute API endpoint to use. If this is blank,
// then the default endpoint is used.
APIEndpoint string `gcfg:"api-endpoint"`
Expand Down Expand Up @@ -233,12 +239,13 @@ type CloudConfig struct {
SubnetworkName string
SubnetworkURL string
// DEPRECATED: Do not rely on this value as it may be incorrect.
SecondaryRangeName string
NodeTags []string
NodeInstancePrefix string
TokenSource oauth2.TokenSource
UseMetadataServer bool
AlphaFeatureGate *AlphaFeatureGate
SecondaryRangeName string
NodeTags []string
NodeInstancePrefix string
ExternalInstanceGroupsPrefix string
TokenSource oauth2.TokenSource
UseMetadataServer bool
AlphaFeatureGate *AlphaFeatureGate
}

func init() {
Expand Down Expand Up @@ -328,6 +335,7 @@ func generateCloudConfig(configFile *ConfigFile) (cloudConfig *CloudConfig, err

cloudConfig.NodeTags = configFile.Global.NodeTags
cloudConfig.NodeInstancePrefix = configFile.Global.NodeInstancePrefix
cloudConfig.ExternalInstanceGroupsPrefix = configFile.Global.ExternalInstanceGroupsPrefix
cloudConfig.AlphaFeatureGate = NewAlphaFeatureGate(configFile.Global.AlphaFeatures)
}

Expand Down Expand Up @@ -507,30 +515,31 @@ func CreateGCECloud(config *CloudConfig) (*Cloud, error) {
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(5, 5) // 5 qps, 5 burst.

gce := &Cloud{
service: service,
serviceAlpha: serviceAlpha,
serviceBeta: serviceBeta,
containerService: containerService,
tpuService: tpuService,
projectID: projID,
networkProjectID: netProjID,
onXPN: onXPN,
region: config.Region,
regional: config.Regional,
localZone: config.Zone,
managedZones: config.ManagedZones,
networkURL: networkURL,
unsafeIsLegacyNetwork: isLegacyNetwork,
unsafeSubnetworkURL: subnetURL,
secondaryRangeName: config.SecondaryRangeName,
nodeTags: config.NodeTags,
nodeInstancePrefix: config.NodeInstancePrefix,
useMetadataServer: config.UseMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
AlphaFeatureGate: config.AlphaFeatureGate,
nodeZones: map[string]sets.String{},
metricsCollector: newLoadBalancerMetrics(),
projectsBasePath: getProjectsBasePath(service.BasePath),
service: service,
serviceAlpha: serviceAlpha,
serviceBeta: serviceBeta,
containerService: containerService,
tpuService: tpuService,
projectID: projID,
networkProjectID: netProjID,
onXPN: onXPN,
region: config.Region,
regional: config.Regional,
localZone: config.Zone,
managedZones: config.ManagedZones,
networkURL: networkURL,
unsafeIsLegacyNetwork: isLegacyNetwork,
unsafeSubnetworkURL: subnetURL,
secondaryRangeName: config.SecondaryRangeName,
nodeTags: config.NodeTags,
nodeInstancePrefix: config.NodeInstancePrefix,
externalInstanceGroupsPrefix: config.ExternalInstanceGroupsPrefix,
useMetadataServer: config.UseMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
AlphaFeatureGate: config.AlphaFeatureGate,
nodeZones: map[string]sets.String{},
metricsCollector: newLoadBalancerMetrics(),
projectsBasePath: getProjectsBasePath(service.BasePath),
}

gce.manager = &gceServiceManager{gce}
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/legacy-cloud-providers/gce/gce_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewFakeGCECloud(vals TestClusterValues) *Cloud {
gce := &Cloud{
region: vals.Region,
service: service,
managedZones: []string{vals.ZoneName},
managedZones: []string{vals.ZoneName, vals.SecondaryZoneName},
projectID: vals.ProjectID,
networkProjectID: vals.ProjectID,
ClusterID: fakeClusterID(vals.ClusterID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package gce

import (
"fmt"

compute "google.golang.org/api/compute/v1"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
Expand Down Expand Up @@ -52,14 +54,29 @@ func (g *Cloud) DeleteInstanceGroup(name string, zone string) error {

// FilterInstanceGroupsByName lists all InstanceGroups in the project and
// zone that match the name regexp.
func (g *Cloud) FilterInstanceGroupsByNamePrefix(namePrefix, zone string) ([]*compute.InstanceGroup, error) {
func (g *Cloud) FilterInstanceGroupsByName(namePrefix, zone string) ([]*compute.InstanceGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("filter", zone)
v, err := g.c.InstanceGroups().List(ctx, zone, filter.Regexp("name", namePrefix+".*"))
return v, mc.Observe(err)
}

// ListInstanceGroupsWithPrefix lists all InstanceGroups in the project and
// zone with given prefix.
func (g *Cloud) ListInstanceGroupsWithPrefix(zone string, prefix string) ([]*compute.InstanceGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()

mc := newInstanceGroupMetricContext("list", zone)
f := filter.None
if prefix != "" {
f = filter.Regexp("name", fmt.Sprintf("%s.*", prefix))
}
v, err := g.c.InstanceGroups().List(ctx, zone, f)
return v, mc.Observe(err)
}

// ListInstanceGroups lists all InstanceGroups in the project and
// zone.
func (g *Cloud) ListInstanceGroups(zone string) ([]*compute.InstanceGroup, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,17 +553,14 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
return hc, nil
}

func (g *Cloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node) (string, error) {
func (g *Cloud) ensureInternalInstanceGroup(name, zone string, nodes []string) (string, error) {
klog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes", name, zone, len(nodes))
ig, err := g.GetInstanceGroup(name, zone)
if err != nil && !isNotFound(err) {
return "", err
}

kubeNodes := sets.NewString()
for _, n := range nodes {
kubeNodes.Insert(n.Name)
}
kubeNodes := sets.NewString(nodes...)

// Individual InstanceGroup has a limit for 1000 instances in it.
// As a result, it's not possible to add more to it.
Expand Down Expand Up @@ -629,27 +626,77 @@ func (g *Cloud) ensureInternalInstanceGroups(name string, nodes []*v1.Node) ([]s
zonedNodes := splitNodesByZone(nodes)
klog.V(2).Infof("ensureInternalInstanceGroups(%v): %d nodes over %d zones in region %v", name, len(nodes), len(zonedNodes), g.region)
var igLinks []string
for zone, nodes := range zonedNodes {
if g.AlphaFeatureGate.Enabled(AlphaFeatureSkipIGsManagement) {
igs, err := g.FilterInstanceGroupsByNamePrefix(name, zone)
gceZonedNodes := map[string][]string{}

if g.AlphaFeatureGate.Enabled(AlphaFeatureSkipIGsManagement) {
for zone := range zonedNodes {
igs, err := g.FilterInstanceGroupsByName(name, zone)
if err != nil {
return nil, err
}
for _, ig := range igs {
igLinks = append(igLinks, ig.SelfLink)
}
} else {
igLink, err := g.ensureInternalInstanceGroup(name, zone, nodes)
}

return igLinks, nil
}

for zone, zNodes := range zonedNodes {
hosts, err := g.getFoundInstanceByNames(nodeNames(zNodes))
if err != nil {
return nil, err
}
names := sets.NewString()
for _, h := range hosts {
names.Insert(h.Name)
}
skip := sets.NewString()

igs, err := g.candidateExternalInstanceGroups(zone)
if err != nil {
return nil, err
}
for _, ig := range igs {
if strings.EqualFold(ig.Name, name) {
continue
}
instances, err := g.ListInstancesInInstanceGroup(ig.Name, zone, allInstances)
if err != nil {
return nil, err
}
igLinks = append(igLinks, igLink)
groupInstances := sets.NewString()
for _, ins := range instances {
parts := strings.Split(ins.Instance, "/")
groupInstances.Insert(parts[len(parts)-1])
}
if names.HasAll(groupInstances.UnsortedList()...) {
igLinks = append(igLinks, ig.SelfLink)
skip.Insert(groupInstances.UnsortedList()...)
}
}
if remaining := names.Difference(skip).UnsortedList(); len(remaining) > 0 {
gceZonedNodes[zone] = remaining
}
}
for zone, gceNodes := range gceZonedNodes {
igLink, err := g.ensureInternalInstanceGroup(name, zone, gceNodes)
if err != nil {
return []string{}, err
}
igLinks = append(igLinks, igLink)
}

return igLinks, nil
}

func (g *Cloud) candidateExternalInstanceGroups(zone string) ([]*compute.InstanceGroup, error) {
if g.externalInstanceGroupsPrefix == "" {
return nil, nil
}
return g.ListInstanceGroupsWithPrefix(zone, g.externalInstanceGroupsPrefix)
}

func (g *Cloud) ensureInternalInstanceGroupsDeleted(name string) error {
// List of nodes isn't available here - fetch all zones in region and try deleting this cluster's ig
zones, err := g.ListZonesInRegion(g.region)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,75 @@ func TestEnsureLoadBalancerDeletedSucceedsOnXPN(t *testing.T) {
checkEvent(t, recorder, FirewallChangeMsg, true)
}

func TestEnsureInternalInstanceGroupsReuseGroups(t *testing.T) {
vals := DefaultTestClusterValues()
gce, err := fakeGCECloud(vals)
require.NoError(t, err)
gce.externalInstanceGroupsPrefix = "pre-existing"

igName := makeInstanceGroupName(vals.ClusterID)
nodesA, err := createAndInsertNodes(gce, []string{"test-node-1", "test-node-2"}, vals.ZoneName)
require.NoError(t, err)
nodesB, err := createAndInsertNodes(gce, []string{"test-node-3"}, vals.SecondaryZoneName)
require.NoError(t, err)

preIGName := "pre-existing-ig"
err = gce.CreateInstanceGroup(&compute.InstanceGroup{Name: preIGName}, vals.ZoneName)
require.NoError(t, err)
err = gce.CreateInstanceGroup(&compute.InstanceGroup{Name: preIGName}, vals.SecondaryZoneName)
require.NoError(t, err)
err = gce.AddInstancesToInstanceGroup(preIGName, vals.ZoneName, gce.ToInstanceReferences(vals.ZoneName, []string{"test-node-1"}))
require.NoError(t, err)
err = gce.AddInstancesToInstanceGroup(preIGName, vals.SecondaryZoneName, gce.ToInstanceReferences(vals.SecondaryZoneName, []string{"test-node-3"}))
require.NoError(t, err)

anotherPreIGName := "another-existing-ig"
err = gce.CreateInstanceGroup(&compute.InstanceGroup{Name: anotherPreIGName}, vals.ZoneName)
require.NoError(t, err)
err = gce.AddInstancesToInstanceGroup(anotherPreIGName, vals.ZoneName, gce.ToInstanceReferences(vals.ZoneName, []string{"test-node-2"}))
require.NoError(t, err)

svc := fakeLoadbalancerService(string(LBTypeInternal))
svc, err = gce.client.CoreV1().Services(svc.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = gce.ensureInternalLoadBalancer(
vals.ClusterName, vals.ClusterID,
svc,
nil,
append(nodesA, nodesB...),
)
assert.NoError(t, err)

backendServiceName := makeBackendServiceName(gce.GetLoadBalancerName(context.TODO(), "", svc), vals.ClusterID, shareBackendService(svc), cloud.SchemeInternal, "TCP", svc.Spec.SessionAffinity)
bs, err := gce.GetRegionBackendService(backendServiceName, gce.region)
require.NoError(t, err)
assert.Equal(t, 3, len(bs.Backends), "Want three backends referencing three instances groups")

igRef := func(zone, name string) string {
return fmt.Sprintf("zones/%s/instanceGroups/%s", zone, name)
}
for _, name := range []string{igRef(vals.ZoneName, preIGName), igRef(vals.SecondaryZoneName, preIGName), igRef(vals.ZoneName, igName)} {
var found bool
for _, be := range bs.Backends {
if strings.Contains(be.Group, name) {
found = true
break
}
}
assert.True(t, found, "Expected list of backends to have group %q", name)
}

// Expect initial zone to have test-node-2
instances, err := gce.ListInstancesInInstanceGroup(igName, vals.ZoneName, "ALL")
require.NoError(t, err)
assert.Equal(t, 1, len(instances))
assert.Contains(
t,
instances[0].Instance,
fmt.Sprintf("projects/%s/zones/%s/instances/%s", vals.ProjectID, vals.ZoneName, "test-node-2"),
)
}

func TestEnsureInternalInstanceGroupsDeleted(t *testing.T) {
vals := DefaultTestClusterValues()
gce, err := fakeGCECloud(vals)
Expand Down

0 comments on commit 32dbe82

Please sign in to comment.