Skip to content

Commit

Permalink
Gracefully handle permission issues when handling firewall rules
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Aug 29, 2017
1 parent 0d17e9d commit f9eed82
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 37 deletions.
14 changes: 14 additions & 0 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ import (

"cloud.google.com/go/compute/metadata"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -99,7 +104,10 @@ type GCECloud struct {
serviceAlpha *computealpha.Service
containerService *container.Service
cloudkmsService *cloudkms.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
Expand Down Expand Up @@ -511,6 +519,12 @@ func tryConvertToProjectNames(configProject, configNetworkProject string, servic
// This must be called before utilizing the funcs of gce.ClusterID
func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) {
gce.clientBuilder = clientBuilder
gce.client = clientBuilder.ClientOrDie("cloud-provider")

gce.eventBroadcaster = record.NewBroadcaster()
gce.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(gce.client.Core().RESTClient()).Events("")})
gce.eventRecorder = gce.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "gce-cloudprovider"})

go gce.watchClusterID()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/providers/gce/gce_clusterid.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ClusterID struct {
func (gce *GCECloud) watchClusterID() {
gce.ClusterID = ClusterID{
cfgMapKey: fmt.Sprintf("%v/%v", UIDNamespace, UIDConfigMapName),
client: gce.clientBuilder.ClientOrDie("cloud-provider"),
client: gce.client,
}

mapEventHandler := cache.ResourceEventHandlerFuncs{
Expand Down
57 changes: 36 additions & 21 deletions pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
// without needing to be deleted and recreated.
if firewallExists {
glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName)
if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.updateFirewall(apiService, makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName)
if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.createFirewall(apiService, makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
Expand Down Expand Up @@ -260,7 +260,7 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddressToUse, gce.region, clusterID, createInstances, affinityType, hcToCreate); err != nil {
if err := gce.createTargetPool(apiService, loadBalancerName, serviceName.String(), ipAddressToUse, gce.region, clusterID, createInstances, affinityType, hcToCreate); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if hcToCreate != nil {
Expand Down Expand Up @@ -342,7 +342,14 @@ func (gce *GCECloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID st
}

errs := utilerrors.AggregateGoroutines(
func() error { return ignoreNotFound(gce.DeleteFirewall(makeFirewallName(loadBalancerName))) },
func() error {
err := ignoreNotFound(gce.DeleteFirewall(makeFirewallName(loadBalancerName)))
if isForbidden(err) && gce.OnXPN() {
glog.V(4).Infof("ensureExternalLoadBalancerDeleted(%v): do not have permission to delete firewall rule (on XPN).", loadBalancerName)
return nil
}
return err
},
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
Expand Down Expand Up @@ -459,7 +466,7 @@ func verifyUserRequestedIP(s CloudAddressService, region, requestedIP, fwdRuleIP
return false, fmt.Errorf("requested ip %q is neither static nor assigned to the LB", requestedIP)
}

func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
func (gce *GCECloud) createTargetPool(svc *v1.Service, name, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
// health check management is coupled with targetPools to prevent leaks. A
// target pool is the only thing that requires a health check, so we delete
// associated checks on teardown, and ensure checks on setup.
Expand All @@ -472,10 +479,9 @@ func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region, clus
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
if !gce.OnXPN() {
if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, region, clusterID, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
}

if err := gce.ensureHttpHealthCheckFirewall(svc, serviceName, ipAddress, region, clusterID, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
}
var err error
if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil {
Expand Down Expand Up @@ -724,11 +730,6 @@ func translateAffinityType(affinityType v1.ServiceAffinity) string {
}

func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []v1.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
if gce.OnXPN() {
glog.V(2).Infoln("firewallNeedsUpdate: Cluster is on XPN network - skipping firewall creation")
return false, false, nil
}

fw, err := gce.service.Firewalls.Get(gce.NetworkProjectID(), makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
Expand Down Expand Up @@ -766,7 +767,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
return true, false, nil
}

func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error {
func (gce *GCECloud) ensureHttpHealthCheckFirewall(svc *v1.Service, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error {
// Prepare the firewall params for creating / checking.
desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID)
if !isNodesHealthCheck {
Expand All @@ -782,7 +783,7 @@ func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, regio
return fmt.Errorf("error getting firewall for health checks: %v", err)
}
glog.Infof("Creating firewall %v for health checks.", fwName)
if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.createFirewall(svc, fwName, region, desc, sourceRanges, ports, hosts); err != nil {
return err
}
glog.Infof("Created firewall %v for health checks.", fwName)
Expand All @@ -795,7 +796,7 @@ func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, regio
!equalStringSets(fw.Allowed[0].Ports, []string{strconv.Itoa(int(ports[0].Port))}) ||
!equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) {
glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName)
if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.updateFirewall(svc, fwName, region, desc, sourceRanges, ports, hosts); err != nil {
glog.Warningf("Failed to reconcile firewall %v parameters.", fwName)
return err
}
Expand Down Expand Up @@ -824,24 +825,38 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
return nil
}

func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
func (gce *GCECloud) createFirewall(svc *v1.Service, name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
if err = gce.CreateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
if err = gce.CreateFirewall(firewall); err != nil {
if isHTTPErrorCode(err, http.StatusConflict) {
return nil
} else if isForbidden(err) && gce.OnXPN() {
glog.V(4).Infof("createFirewall(%v): do not have permission to create firewall rule (on XPN). Raising event.", firewall.Name)
gce.raiseFirewallChangeNeededEvent(svc, FirewallToGCloudCreateCmd(firewall, gce.NetworkProjectID()))
return nil
}
return err
}
return nil
}

func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
func (gce *GCECloud) updateFirewall(svc *v1.Service, name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}

if err = gce.UpdateFirewall(firewall); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
if err = gce.UpdateFirewall(firewall); err != nil {
if isHTTPErrorCode(err, http.StatusConflict) {
return nil
} else if isForbidden(err) && gce.OnXPN() {
glog.V(4).Infof("updateFirewall(%v): do not have permission to update firewall rule (on XPN). Raising event.", firewall.Name)
gce.raiseFirewallChangeNeededEvent(svc, FirewallToGCloudUpdateCmd(firewall, gce.NetworkProjectID()))
return nil
}
return err
}
return nil
Expand Down
42 changes: 27 additions & 15 deletions pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ const (
allInstances = "ALL"
)

type lbBalancingMode string

func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
ports, protocol := getPortsAndProtocol(svc.Spec.Ports)
Expand Down Expand Up @@ -80,12 +78,8 @@ func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, s
}

// Ensure firewall rules if necessary
if gce.OnXPN() {
glog.V(2).Infof("ensureInternalLoadBalancer: cluster is on a cross-project network (XPN) network project %v, compute project %v - skipping firewall creation", gce.networkProjectID, gce.projectID)
} else {
if err = gce.ensureInternalFirewalls(loadBalancerName, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
return nil, err
}
if err = gce.ensureInternalFirewalls(loadBalancerName, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
return nil, err
}

expectedFwdRule := &compute.ForwardingRule{
Expand Down Expand Up @@ -211,7 +205,11 @@ func (gce *GCECloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID st

glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting firewall for traffic", loadBalancerName)
if err := gce.DeleteFirewall(loadBalancerName); err != nil {
return err
if isForbidden(err) && gce.OnXPN() {
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): could not delete traffic firewall on XPN cluster. Ignoring.", loadBalancerName)
} else {
return err
}
}

hcName := makeHealthCheckName(loadBalancerName, clusterID, sharedHealthCheck)
Expand Down Expand Up @@ -261,13 +259,17 @@ func (gce *GCECloud) teardownInternalHealthCheckAndFirewall(hcName string) error

hcFirewallName := makeHealthCheckFirewallNameFromHC(hcName)
if err := gce.DeleteFirewall(hcFirewallName); err != nil && !isNotFound(err) {
return fmt.Errorf("failed to delete health check firewall: %v, err: %v", hcFirewallName, err)
if isForbidden(err) && gce.OnXPN() {
glog.V(2).Infof("teardownInternalHealthCheckAndFirewall(%v) could not delete health check firewall on XPN cluster. Ignoring.", hcName)
} else {
return fmt.Errorf("failed to delete health check firewall: %v, err: %v", hcFirewallName, err)
}
}
glog.V(2).Infof("teardownInternalHealthCheckAndFirewall(%v): health check firewall deleted", hcFirewallName)
return nil
}

func (gce *GCECloud) ensureInternalFirewall(fwName, fwDesc string, sourceRanges []string, ports []string, protocol v1.Protocol, nodes []*v1.Node) error {
func (gce *GCECloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc string, sourceRanges []string, ports []string, protocol v1.Protocol, nodes []*v1.Node) error {
glog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName)
targetTags, err := gce.GetNodeTags(nodeNames(nodes))
if err != nil {
Expand Down Expand Up @@ -295,15 +297,25 @@ func (gce *GCECloud) ensureInternalFirewall(fwName, fwDesc string, sourceRanges

if existingFirewall == nil {
glog.V(2).Infof("ensureInternalFirewall(%v): creating firewall", fwName)
return gce.CreateFirewall(expectedFirewall)
err = gce.CreateFirewall(expectedFirewall)
if err != nil && isForbidden(err) && gce.OnXPN() {
glog.V(2).Infof("ensureInternalFirewall(%v): do not have permission to create firewall rule (on XPN). Raising event.", fwName)
gce.raiseFirewallChangeNeededEvent(svc, FirewallToGCloudCreateCmd(expectedFirewall, gce.NetworkProjectID()))
}
return err
}

if firewallRuleEqual(expectedFirewall, existingFirewall) {
return nil
}

glog.V(2).Infof("ensureInternalFirewall(%v): updating firewall", fwName)
return gce.UpdateFirewall(expectedFirewall)
err = gce.UpdateFirewall(expectedFirewall)
if err != nil && isForbidden(err) && gce.OnXPN() {
glog.V(2).Infof("ensureInternalFirewall(%v): do not have permission to update firewall rule (on XPN). Raising event.", fwName)
gce.raiseFirewallChangeNeededEvent(svc, FirewallToGCloudUpdateCmd(expectedFirewall, gce.NetworkProjectID()))
}
return err
}

func (gce *GCECloud) ensureInternalFirewalls(loadBalancerName, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, sharedHealthCheck bool, nodes []*v1.Node) error {
Expand All @@ -314,15 +326,15 @@ func (gce *GCECloud) ensureInternalFirewalls(loadBalancerName, clusterID string,
if err != nil {
return err
}
err = gce.ensureInternalFirewall(loadBalancerName, fwDesc, sourceRanges.StringSlice(), ports, protocol, nodes)
err = gce.ensureInternalFirewall(svc, loadBalancerName, fwDesc, sourceRanges.StringSlice(), ports, protocol, nodes)
if err != nil {
return err
}

// Second firewall is for health checking nodes / services
fwHCName := makeHealthCheckFirewallName(loadBalancerName, clusterID, sharedHealthCheck)
hcSrcRanges := LoadBalancerSrcRanges()
return gce.ensureInternalFirewall(fwHCName, "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes)
return gce.ensureInternalFirewall(svc, fwHCName, "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes)
}

func (gce *GCECloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) {
Expand Down
46 changes: 46 additions & 0 deletions pkg/cloudprovider/providers/gce/gce_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package gce

import (
"bytes"
"errors"
"fmt"
"net/http"
"regexp"
"strings"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

Expand Down Expand Up @@ -58,6 +60,46 @@ func getProjectAndZone() (string, string, error) {
return projectID, zone, nil
}

func (gce *GCECloud) raiseFirewallChangeNeededEvent(svc *v1.Service, cmd string) {
msg := fmt.Sprintf("Firewall change required by network admin: `%v`", cmd)
gce.eventRecorder.Event(svc, v1.EventTypeNormal, "LoadBalancerManualChange", msg)
}

// FirewallToGCloudCreateCmd generates a gcloud command to create a firewall with specified params
func FirewallToGCloudCreateCmd(fw *compute.Firewall, projectID string) string {
args := firewallToGcloudArgs(fw, projectID)
return fmt.Sprintf("gcloud compute firewall-rules create %v --network %v %v", fw.Name, fw.Network, args)
}

// FirewallToGCloudCreateCmd generates a gcloud command to update a firewall to specified params
func FirewallToGCloudUpdateCmd(fw *compute.Firewall, projectID string) string {
args := firewallToGcloudArgs(fw, projectID)
return fmt.Sprintf("gcloud compute firewall-rules update %v %v", fw.Name, args)
}

// FirewallToGCloudCreateCmd generates a gcloud command to update a firewall to specified params
func FirewallToGCloudDeleteCmd(fw *compute.Firewall, projectID string) string {
args := firewallToGcloudArgs(fw, projectID)
return fmt.Sprintf("gcloud compute firewall-rules update %v %v", fw.Name, args)
}

func firewallToGcloudArgs(fw *compute.Firewall, projectID string) string {
var allowedBuffer bytes.Buffer
for i, a := range fw.Allowed {
for x, p := range a.Ports {
sep := ","
if i == len(fw.Allowed)-1 && x == len(a.Ports)-1 {
sep = ""
}
allowedBuffer.WriteString(fmt.Sprintf("%v:%v%v", a.IPProtocol, p, sep))
}
}
allow := allowedBuffer.String()
srcRngs := strings.Join(fw.SourceRanges, ",")
targets := strings.Join(fw.TargetTags, ",")
return fmt.Sprintf("--description %q --allow %v --source-ranges %v --target-tags %v --project %v", fw.Description, allow, srcRngs, targets, projectID)
}

// Take a GCE instance 'hostname' and break it down to something that can be fed
// to the GCE API client library. Basically this means reducing 'kubernetes-
// node-2.c.my-proj.internal' to 'kubernetes-node-2' if necessary.
Expand Down Expand Up @@ -150,6 +192,10 @@ func isNotFoundOrInUse(err error) bool {
return isNotFound(err) || isInUsedByError(err)
}

func isForbidden(err error) bool {
return isHTTPErrorCode(err, http.StatusForbidden)
}

func makeGoogleAPINotFoundError(message string) error {
return &googleapi.Error{Code: http.StatusNotFound, Message: message}
}

0 comments on commit f9eed82

Please sign in to comment.