Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #58368 upstream release 1.8 #60327

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 73 additions & 64 deletions pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}

affinityType := apiService.Spec.SessionAffinity

serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
loadBalancerName, gce.region, requestedIP, portStr, hostNames, serviceName, apiService.Annotations)
Expand Down Expand Up @@ -194,7 +192,7 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
}
}

tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType)
tpExists, tpNeedsRecreation, err := gce.targetPoolNeedsRecreation(loadBalancerName, gce.region, apiService.Spec.SessionAffinity)
if err != nil {
return nil, err
}
Expand All @@ -213,24 +211,24 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path)
if hcLocalTrafficExisting == nil {
// This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service
// turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the
// target pool to use local traffic health check.
glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName)
if supportsNodesHealthCheck {
hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
tpNeedsUpdate = true
tpNeedsRecreation = true
}
hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
} else {
glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name)
if hcLocalTrafficExisting != nil {
// This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service
// and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// and turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the
// target pool to use nodes health check.
glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName)
hcToDelete = hcLocalTrafficExisting
tpNeedsUpdate = true
tpNeedsRecreation = true
}
if supportsNodesHealthCheck {
hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
Expand All @@ -243,7 +241,7 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
// can't delete a target pool that's currently in use by a forwarding rule.
// Thus, we have to tear down the forwarding rule if either it or the target
// pool needs to be updated.
if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) {
if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsRecreation) {
// Begin critical section. If we have to delete the forwarding rule,
// and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later.
Expand All @@ -253,49 +251,13 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
// Pass healthchecks to DeleteExternalTargetPoolAndChecks to cleanup health checks after cleaning up the target pool itself.
var hcNames []string
if hcToDelete != nil {
hcNames = append(hcNames, hcToDelete.Name)
}
if err := gce.DeleteExternalTargetPoolAndChecks(apiService, loadBalancerName, gce.region, clusterID, hcNames...); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
}

// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsUpdate {
createInstances := hosts
if len(hosts) > maxTargetPoolCreateInstances {
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(apiService, loadBalancerName, serviceName.String(), ipAddressToUse, gce.region, clusterID, createInstances, affinityType, hcToCreate); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if hcToCreate != nil {
glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name)
}
if len(hosts) <= maxTargetPoolCreateInstances {
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
} else {
glog.Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances)

created := sets.NewString()
for _, host := range createInstances {
created.Insert(host.makeComparableHostPath())
}
if err := gce.updateTargetPool(loadBalancerName, created, hosts); err != nil {
return nil, fmt.Errorf("failed to update target pool %s: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): updated target pool (with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances)
}
if err := gce.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil {
return nil, err
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating forwarding rule, IP %s (tier: %s)", loadBalancerName, serviceName, ipAddressToUse, netTier)

if tpNeedsRecreation || fwdRuleNeedsUpdate {
glog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule, IP %s (tier: %s).", lbRefStr, ipAddressToUse, netTier)
if err := createForwardingRule(gce, loadBalancerName, serviceName.String(), gce.region, ipAddressToUse, gce.targetPoolURL(loadBalancerName), ports, netTier); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err)
}
Expand All @@ -321,16 +283,7 @@ func (gce *GCECloud) updateExternalLoadBalancer(clusterName string, service *v1.
}

loadBalancerName := cloudprovider.GetLoadBalancerName(service)
pool, err := gce.service.TargetPools.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err != nil {
return err
}
existing := sets.NewString()
for _, instance := range pool.Instances {
existing.Insert(hostURLToComparablePath(instance))
}

return gce.updateTargetPool(loadBalancerName, existing, hosts)
return gce.updateTargetPool(loadBalancerName, hosts)
}

// ensureExternalLoadBalancerDeleted is the external implementation of LoadBalancer.EnsureLoadBalancerDeleted
Expand Down Expand Up @@ -499,7 +452,54 @@ func verifyUserRequestedIP(s CloudAddressService, region, requestedIP, fwdRuleIP
return false, fmt.Errorf("requested ip %q is neither static nor assigned to the LB", requestedIP)
}

func (gce *GCECloud) createTargetPool(svc *v1.Service, name, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
func (gce *GCECloud) ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation bool, svc *v1.Service, loadBalancerName, clusterID, ipAddressToUse string, hosts []*gceInstance, hcToCreate, hcToDelete *compute.HttpHealthCheck) error {
serviceName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
lbRefStr := fmt.Sprintf("%v(%v)", loadBalancerName, serviceName)

if tpExists && tpNeedsRecreation {
// Pass healthchecks to DeleteExternalTargetPoolAndChecks to cleanup health checks after cleaning up the target pool itself.
var hcNames []string
if hcToDelete != nil {
hcNames = append(hcNames, hcToDelete.Name)
}
if err := gce.DeleteExternalTargetPoolAndChecks(svc, loadBalancerName, gce.region, clusterID, hcNames...); err != nil {
return fmt.Errorf("failed to delete existing target pool for load balancer (%s) update: %v", lbRefStr, err)
}
glog.Infof("ensureTargetPoolAndHealthCheck(%s): Deleted target pool.", lbRefStr)
}
// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsRecreation {
createInstances := hosts
if len(hosts) > maxTargetPoolCreateInstances {
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
if err := gce.createTargetPoolAndHealthCheck(svc, loadBalancerName, serviceName.String(), ipAddressToUse, gce.region, clusterID, createInstances, hcToCreate); err != nil {
return fmt.Errorf("failed to create target pool for load balancer (%s): %v", lbRefStr, err)
}
if hcToCreate != nil {
glog.Infof("ensureTargetPoolAndHealthCheck(%s): Created health checks %v.", lbRefStr, hcToCreate.Name)
}
if len(hosts) <= maxTargetPoolCreateInstances {
glog.Infof("ensureTargetPoolAndHealthCheck(%s): Created target pool.", lbRefStr)
} else {
glog.Infof("ensureTargetPoolAndHealthCheck(%s): Created initial target pool (now updating the remaining %d hosts).", lbRefStr, len(hosts)-maxTargetPoolCreateInstances)
if err := gce.updateTargetPool(loadBalancerName, hosts); err != nil {
return fmt.Errorf("failed to update target pool for load balancer (%s): %v", lbRefStr, err)
}
glog.Infof("ensureTargetPoolAndHealthCheck(%s): Updated target pool (with %d hosts).", lbRefStr, len(hosts)-maxTargetPoolCreateInstances)
}
} else if tpExists {
// Ensure hosts are updated even if there is no other changes required on target pool.
if err := gce.updateTargetPool(loadBalancerName, hosts); err != nil {
return fmt.Errorf("failed to update target pool for load balancer (%s): %v", lbRefStr, err)
}
glog.Infof("ensureTargetPoolAndHealthCheck(%s): Updated target pool (with %d hosts).", lbRefStr, len(hosts))
}
return nil
}

func (gce *GCECloud) createTargetPoolAndHealthCheck(svc *v1.Service, name, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, hc *compute.HttpHealthCheck) error {
// health check management is coupled with targetPools to prevent leaks. A
// target pool is the only thing that requires a health check, so we delete
// associated checks on teardown, and ensure checks on setup.
Expand Down Expand Up @@ -533,7 +533,7 @@ func (gce *GCECloud) createTargetPool(svc *v1.Service, name, serviceName, ipAddr
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Instances: instances,
SessionAffinity: translateAffinityType(affinityType),
SessionAffinity: translateAffinityType(svc.Spec.SessionAffinity),
HealthChecks: hcLinks,
}

Expand All @@ -543,7 +543,16 @@ func (gce *GCECloud) createTargetPool(svc *v1.Service, name, serviceName, ipAddr
return nil
}

func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.String, hosts []*gceInstance) error {
func (gce *GCECloud) updateTargetPool(loadBalancerName string, hosts []*gceInstance) error {
pool, err := gce.GetTargetPool(loadBalancerName, gce.region)
if err != nil {
return err
}
existing := sets.NewString()
for _, instance := range pool.Instances {
existing.Insert(hostURLToComparablePath(instance))
}

var toAdd []*compute.InstanceReference
var toRemove []*compute.InstanceReference
for _, host := range hosts {
Expand Down Expand Up @@ -677,8 +686,8 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancer

// Doesn't check whether the hosts have changed, since host updating is handled
// separately.
func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsUpdate bool, err error) {
tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
func (gce *GCECloud) targetPoolNeedsRecreation(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsRecreation bool, err error) {
tp, err := gce.GetTargetPool(name, region)
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
Expand Down