Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unrevert #14608 and decrease the latency of GCE load balancer deletions #14964

Merged
merged 4 commits into from
Oct 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
147 changes: 126 additions & 21 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

Expand Down Expand Up @@ -349,6 +350,27 @@ func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}

func (gce *GCECloud) getAddress(name, region string) (string, bool, error) {
address, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
if err == nil {
return address.Address, true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return "", false, nil
}
return "", false, err
}

func ownsAddress(ip net.IP, addrs []*compute.Address) bool {
ipStr := ip.String()
for _, addr := range addrs {
if addr.Address == ipStr {
return true
}
}
return false
}

// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
// owned by the project and available to be used, and use them if they are.
Expand All @@ -357,7 +379,44 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
return nil, fmt.Errorf("Cannot EnsureTCPLoadBalancer() with no hosts")
}

glog.V(2).Infof("Checking if load balancer already exists: %s", name)
if loadBalancerIP == nil {
glog.V(2).Info("Checking if the static IP address already exists: %s", name)
address, exists, err := gce.getAddress(name, region)
if err != nil {
return nil, fmt.Errorf("error looking for gce address: %v", err)
}
if !exists {
// Note, though static addresses that _aren't_ in use cost money, ones that _are_ in use don't.
// However, quota is limited to only 7 addresses per region by default.
op, err := gce.service.Addresses.Insert(gce.projectID, region, &compute.Address{Name: name}).Do()
if err != nil {
return nil, fmt.Errorf("error creating gce static IP address: %v", err)
}
if err := gce.waitForRegionOp(op, region); err != nil {
return nil, fmt.Errorf("error waiting for gce static IP address to complete: %v", err)
}
address, exists, err = gce.getAddress(name, region)
if err != nil {
return nil, fmt.Errorf("error re-getting gce static IP address: %v", err)
}
if !exists {
return nil, fmt.Errorf("failed to re-get gce static IP address for %s", name)
}
}
if loadBalancerIP = net.ParseIP(address); loadBalancerIP == nil {
return nil, fmt.Errorf("error parsing gce static IP address: %s", address)
}
} else {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil {
return nil, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
if !ownsAddress(loadBalancerIP, addresses.Items) {
return nil, fmt.Errorf("this gce project don't own the IP address: %s", loadBalancerIP.String())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure, can one static IP be matched to multiple forwarding rules? I assume the answer is yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer to my own question, the answer is no, one static IP can only be bound to one forwarding rule.

This doesn't seem to cause any problem, because GCE's API will fail the forwarding rule creation anyway. I'm considering if we can maintain a list of static IPs that have already bound to a forwarding rule, so that we can return an error before making a remote call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - each static IP can only be bound to one forwarding rule. However, because we name the address and forwarding rule the same (based on the service UID), we shouldn't hit a case where a particular static IP is already bound to a different forwarding rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. loadBalancerIP is parsed from service.Spec.LoadBalancerIP, it can be any value specified by the user, so it's possible to be a static IP that has already bound to a different forwarding rule.
i.e, if a user calls EnsureTCPLoadBalancer with a new name but existing loadBalancerIP, then it will hit the case described above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, this may not be a problem. In such a case, the service controller will keep failing creating the load balancer, which is a fair behavior. I'm just suggesting maybe we can fail such requests before making a remote call to save a roundtrip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're absolutely right, I forgot we had added that option.

Forwarding rule creation will fail with the error that the IP address is already in use. Relying on that seems fine since it should fail quickly, but we could short-circuit things by using the attributes of the address returned from the list request. The attributes include whether it's in use and by what.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know the attributes already contain the information. It sounds like a good idea. I will not block the PR on this possible improvement. I will LGTM it when you delete line 403.

}

glog.V(2).Info("Checking if load balancer already exists: %s", name)
_, exists, err := gce.GetTCPLoadBalancer(name, region)
if err != nil {
return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err)
Expand Down Expand Up @@ -395,13 +454,11 @@ func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, loadBalancerIP n
}
req := &compute.ForwardingRule{
Name: name,
IPAddress: loadBalancerIP.String(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete L403, as loadBalancerIP won't be nil now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, good catch. I should have caught that in the initial review

IPProtocol: "TCP",
PortRange: fmt.Sprintf("%d-%d", minPort, maxPort),
Target: gce.targetPoolURL(name, region),
}
if loadBalancerIP != nil {
req.IPAddress = loadBalancerIP.String()
}

op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
Expand Down Expand Up @@ -556,45 +613,93 @@ func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string)

// EnsureTCPLoadBalancerDeleted is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancerDeleted.
func (gce *GCECloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
err := errors.AggregateGoroutines(
func() error { return gce.deleteFirewall(name, region) },
func() error {
if err := gce.deleteForwardingRule(name, region); err != nil {
return err
}
// The forwarding rule must be deleted before either the target pool or
// static IP address can, unfortunately.
err := errors.AggregateGoroutines(
func() error { return gce.deleteTargetPool(name, region) },
func() error { return gce.deleteStaticIP(name, region) },
)
if err != nil {
return err
}
return nil
},
)
if err != nil {
return errors.Flatten(err)
}
return nil
}

func (gce *GCECloud) deleteForwardingRule(name, region string) error {
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Forwarding rule %s already deleted. Continuing to delete target pool.", name)
glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete Forwarding Rules %s: got error %s.", name, err.Error())
glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error())
return err
} else {
err = gce.waitForRegionOp(op, region)
if err != nil {
glog.Warningf("Failed waiting for Forwarding Rule %s to be deleted: got error %s.", name, err.Error())
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", name, err.Error())
return err
}
}
op, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
return nil
}

func (gce *GCECloud) deleteTargetPool(name, region string) error {
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted.", name)
return nil
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete Target Pool %s, got error %s.", name, err.Error())
glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error())
return err
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", name, err.Error())
return err
}
}
err = gce.waitForRegionOp(op, region)
if err != nil {
glog.Warningf("Failed waiting for Target Pool %s to be deleted: got error %s.", name, err.Error())
}
return nil
}

func (gce *GCECloud) deleteFirewall(name, region string) error {
fwName := makeFirewallName(name)
op, err = gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Firewall doesn't exist, moving on to deleting target pool.")
glog.Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err)
return err
} else {
if err = gce.waitForGlobalOp(op); err != nil {
if err := gce.waitForGlobalOp(op); err != nil {
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err)
return err
}
}
return err
return nil
}

func (gce *GCECloud) deleteStaticIP(name, region string) error {
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Static IP address %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
return err
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err)
return err
}
}
return nil
}

// UrlMap management
Expand Down
59 changes: 59 additions & 0 deletions pkg/cloudprovider/providers/gce/gce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,68 @@ limitations under the License.
package gce_cloud

import (
"net"
"testing"

compute "google.golang.org/api/compute/v1"
)

func TestOwnsAddress(t *testing.T) {
tests := []struct {
ip net.IP
addrs []*compute.Address
expectOwn bool
}{
{
ip: net.ParseIP("1.2.3.4"),
addrs: []*compute.Address{},
expectOwn: false,
},
{
ip: net.ParseIP("1.2.3.4"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: false,
},
{
ip: net.ParseIP("2.3.4.5"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
{
ip: net.ParseIP("2.3.4.6"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
{
ip: net.ParseIP("2.3.4.7"),
addrs: []*compute.Address{
{Address: "2.3.4.5"},
{Address: "2.3.4.6"},
{Address: "2.3.4.7"},
},
expectOwn: true,
},
}
for _, test := range tests {
own := ownsAddress(test.ip, test.addrs)
if own != test.expectOwn {
t.Errorf("expected: %v, got %v for %v", test.expectOwn, own, test)
}
}
}

func TestGetRegion(t *testing.T) {
gce := &GCECloud{
zone: "us-central1-b",
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,20 @@ func Flatten(agg Aggregate) Aggregate {
}
return NewAggregate(result)
}

// AggregateGoroutines runs the provided functions in parallel, stuffing all
// non-nil errors into the returned Aggregate.
// Returns nil if all the functions complete successfully.
func AggregateGoroutines(funcs ...func() error) Aggregate {
errChan := make(chan error, len(funcs))
for _, f := range funcs {
go func(f func() error) { errChan <- f() }(f)
}
errs := make([]error, 0)
for i := 0; i < cap(errChan); i++ {
if err := <-errChan; err != nil {
errs = append(errs, err)
}
}
return NewAggregate(errs)
}
63 changes: 63 additions & 0 deletions pkg/util/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,66 @@ func TestFlatten(t *testing.T) {
}
}
}

func TestAggregateGoroutines(t *testing.T) {
testCases := []struct {
errs []error
expected map[string]bool // can't compare directly to Aggregate due to non-deterministic ordering
}{
{
[]error{},
nil,
},
{
[]error{nil},
nil,
},
{
[]error{nil, nil},
nil,
},
{
[]error{fmt.Errorf("1")},
map[string]bool{"1": true},
},
{
[]error{fmt.Errorf("1"), nil},
map[string]bool{"1": true},
},
{
[]error{fmt.Errorf("1"), fmt.Errorf("267")},
map[string]bool{"1": true, "267": true},
},
{
[]error{fmt.Errorf("1"), nil, fmt.Errorf("1234")},
map[string]bool{"1": true, "1234": true},
},
{
[]error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")},
map[string]bool{"1": true, "1234": true, "22": true},
},
}
for i, testCase := range testCases {
funcs := make([]func() error, len(testCase.errs))
for i := range testCase.errs {
err := testCase.errs[i]
funcs[i] = func() error { return err }
}
agg := AggregateGoroutines(funcs...)
if agg == nil {
if len(testCase.expected) > 0 {
t.Errorf("%d: expected %v, got nil", i, testCase.expected)
}
continue
}
if len(agg.Errors()) != len(testCase.expected) {
t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg)
continue
}
for _, err := range agg.Errors() {
if !testCase.expected[err.Error()] {
t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err)
}
}
}
}