Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit number of instances in single update to GCE target pool #87881

Merged
merged 1 commit into from Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions staging/src/k8s.io/legacy-cloud-providers/gce/gce.go
Expand Up @@ -68,8 +68,9 @@ const (
// AffinityTypeClientIP - affinity based on Client IP.
gceAffinityTypeClientIP = "CLIENT_IP"

operationPollInterval = time.Second
maxTargetPoolCreateInstances = 200
operationPollInterval = time.Second
maxTargetPoolCreateInstances = 200
maxInstancesPerTargetPoolUpdate = 1000

// HTTP Load Balancer parameters
// Configure 8 second period for external health checks.
Expand Down
Expand Up @@ -591,16 +591,32 @@ func (g *Cloud) updateTargetPool(loadBalancerName string, hosts []*gceInstance)
toRemove = append(toRemove, &compute.InstanceReference{Instance: link})
}

if len(toAdd) > 0 {
if err := g.AddInstancesToTargetPool(loadBalancerName, g.region, toAdd); err != nil {
for len(toAdd) > 0 {
// Do not remove more than maxInstancesPerTargetPoolUpdate in a single call.
instancesCount := len(toAdd)
if instancesCount > maxInstancesPerTargetPoolUpdate {
instancesCount = maxInstancesPerTargetPoolUpdate
}
// The operation to add 1000 instances is fairly long (may take minutes), so
// we don't need to worry about saturating QPS limits.
if err := g.AddInstancesToTargetPool(loadBalancerName, g.region, toAdd[:instancesCount]); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this operation blocks long enough to avoid QPS issues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - it's pretty long operation (for 1k instance can take up to minutes) - added a comment.

return err
}
toAdd = toAdd[instancesCount:]
}

if len(toRemove) > 0 {
if err := g.RemoveInstancesFromTargetPool(loadBalancerName, g.region, toRemove); err != nil {
for len(toRemove) > 0 {
// Do not remove more than maxInstancesPerTargetPoolUpdate in a single call.
instancesCount := len(toRemove)
if instancesCount > maxInstancesPerTargetPoolUpdate {
instancesCount = maxInstancesPerTargetPoolUpdate
}
// The operation to remove 1000 instances is fairly long (may take minutes), so
// we don't need to worry about saturating QPS limits.
if err := g.RemoveInstancesFromTargetPool(loadBalancerName, g.region, toRemove[:instancesCount]); err != nil {
return err
}
toRemove = toRemove[instancesCount:]
}

// Try to verify that the correct number of nodes are now in the target pool.
Expand Down
Expand Up @@ -568,6 +568,53 @@ func TestForwardingRuleNeedsUpdate(t *testing.T) {
}
}

func TestTargetPoolAddsAndRemoveInstancesInBatches(t *testing.T) {
t.Parallel()

vals := DefaultTestClusterValues()
gce, err := fakeGCECloud(DefaultTestClusterValues())
require.NoError(t, err)

addInstanceCalls := 0
addInstanceHook := func(req *compute.TargetPoolsAddInstanceRequest) {
addInstanceCalls++
}
removeInstanceCalls := 0
removeInstanceHook := func(req *compute.TargetPoolsRemoveInstanceRequest) {
removeInstanceCalls++
}

err = registerTargetPoolAddInstanceHook(gce, addInstanceHook)
assert.NoError(t, err)
err = registerTargetPoolRemoveInstanceHook(gce, removeInstanceHook)
assert.NoError(t, err)

svc := fakeLoadbalancerService("")
nodeName := "default-node"
_, err = createExternalLoadBalancer(gce, svc, []string{nodeName}, vals.ClusterName, vals.ClusterID, vals.ZoneName)
assert.NoError(t, err)

// Insert large number of nodes to test batching.
additionalNodeNames := []string{}
for i := 0; i < 2*maxInstancesPerTargetPoolUpdate+2; i++ {
additionalNodeNames = append(additionalNodeNames, fmt.Sprintf("node-%d", i))
}
allNodes, err := createAndInsertNodes(gce, append([]string{nodeName}, additionalNodeNames...), vals.ZoneName)
assert.NoError(t, err)
err = gce.updateExternalLoadBalancer("", svc, allNodes)
assert.NoError(t, err)

assert.Equal(t, 3, addInstanceCalls)

// Remove large number of nodes to test batching.
allNodes, err = createAndInsertNodes(gce, []string{nodeName}, vals.ZoneName)
assert.NoError(t, err)
err = gce.updateExternalLoadBalancer("", svc, allNodes)
assert.NoError(t, err)

assert.Equal(t, 3, removeInstanceCalls)
}

func TestTargetPoolNeedsRecreation(t *testing.T) {
t.Parallel()

Expand Down
29 changes: 29 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/gce/gce_util.go
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package gce

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -81,6 +82,34 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) {
return gce, nil
}

func registerTargetPoolAddInstanceHook(gce *Cloud, callback func(*compute.TargetPoolsAddInstanceRequest)) error {
mockGCE, ok := gce.c.(*cloud.MockGCE)
if !ok {
return fmt.Errorf("couldn't cast cloud to mockGCE: %#v", gce)
}
existingHandler := mockGCE.MockTargetPools.AddInstanceHook
hook := func(ctx context.Context, key *meta.Key, req *compute.TargetPoolsAddInstanceRequest, m *cloud.MockTargetPools) error {
callback(req)
return existingHandler(ctx, key, req, m)
}
mockGCE.MockTargetPools.AddInstanceHook = hook
return nil
}

func registerTargetPoolRemoveInstanceHook(gce *Cloud, callback func(*compute.TargetPoolsRemoveInstanceRequest)) error {
mockGCE, ok := gce.c.(*cloud.MockGCE)
if !ok {
return fmt.Errorf("couldn't cast cloud to mockGCE: %#v", gce)
}
existingHandler := mockGCE.MockTargetPools.RemoveInstanceHook
hook := func(ctx context.Context, key *meta.Key, req *compute.TargetPoolsRemoveInstanceRequest, m *cloud.MockTargetPools) error {
callback(req)
return existingHandler(ctx, key, req, m)
}
mockGCE.MockTargetPools.RemoveInstanceHook = hook
return nil
}

type gceInstance struct {
Zone string
Name string
Expand Down