Skip to content

Commit

Permalink
Merge pull request #52322 from davidz627/multizoneWrongZone
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fixes issue where PVCs using `standard` StorageClass create PDs in disks in wrong zone in multi-zone GKE clusters

Fixes #50115

Changed GetAllZones to only get zones with nodes that are currently running (renamed to GetAllCurrentZones). Added E2E test to confirm this behavior.
  • Loading branch information
Kubernetes Submit Queue committed Nov 21, 2017
2 parents 44f24d2 + e5aec86 commit 80e1c79
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 112 deletions.
4 changes: 4 additions & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild
}
}

if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(sharedInformers)
}

ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
],
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/cloudprovider/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/controller"
)

Expand All @@ -49,6 +50,11 @@ type Interface interface {
HasClusterID() bool
}

type InformerUser interface {
// SetInformers sets the informer on the cloud object.
SetInformers(informerFactory informers.SharedInformerFactory)
}

// Clusters is an abstract, pluggable interface for clusters of containers.
type Clusters interface {
// ListClusters lists the names of the available clusters.
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/providers/gce/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand Down
99 changes: 87 additions & 12 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/version"

"github.com/golang/glog"
Expand Down Expand Up @@ -99,18 +102,21 @@ type GCECloud struct {
// for the cloudprovider to start watching the configmap.
ClusterID ClusterID

service *compute.Service
serviceBeta *computebeta.Service
serviceAlpha *computealpha.Service
containerService *container.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
managedZones []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master)
service *compute.Service
serviceBeta *computebeta.Service
serviceAlpha *computealpha.Service
containerService *container.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
// managedZones will be set to the 1 zone if running a single zone cluster
// it will be set to ALL zones in region for any multi-zone cluster
// Use GetAllCurrentZones to get only zones that contain nodes
managedZones []string
networkURL string
isLegacyNetwork bool
subnetworkURL string
Expand All @@ -125,6 +131,12 @@ type GCECloud struct {
useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter
manager diskServiceManager
// Lock for access to nodeZones
nodeZonesLock sync.Mutex
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
nodeInformerSynced cache.InformerSynced
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
// prevent inconsistencies. For example, load balancers manipulation methods will take the
// lock to prevent shared resources from being prematurely deleted while the operation is
Expand Down Expand Up @@ -470,6 +482,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) {
useMetadataServer: config.UseMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
AlphaFeatureGate: config.AlphaFeatureGate,
nodeZones: map[string]sets.String{},
}

gce.manager = &gceServiceManager{gce}
Expand Down Expand Up @@ -582,6 +595,68 @@ func (gce *GCECloud) IsLegacyNetwork() bool {
return gce.isLegacyNetwork
}

func (gce *GCECloud) SetInformers(informerFactory informers.SharedInformerFactory) {
glog.Infof("Setting up informers for GCECloud")
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
gce.updateNodeZones(nil, node)
},
UpdateFunc: func(prev, obj interface{}) {
prevNode := prev.(*v1.Node)
newNode := obj.(*v1.Node)
if newNode.Labels[kubeletapis.LabelZoneFailureDomain] ==
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
return
}
gce.updateNodeZones(prevNode, newNode)
},
DeleteFunc: func(obj interface{}) {
node, isNode := obj.(*v1.Node)
// We can get DeletedFinalStateUnknown instead of *v1.Node here
// and we need to handle that correctly.
if !isNode {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Received unexpected object: %v", obj)
return
}
node, ok = deletedState.Obj.(*v1.Node)
if !ok {
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
gce.updateNodeZones(node, nil)
},
})
gce.nodeInformerSynced = nodeInformer.HasSynced
}

func (gce *GCECloud) updateNodeZones(prevNode, newNode *v1.Node) {
gce.nodeZonesLock.Lock()
defer gce.nodeZonesLock.Unlock()
if prevNode != nil {
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok {
gce.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
if gce.nodeZones[prevZone].Len() == 0 {
gce.nodeZones[prevZone] = nil
}
}
}
if newNode != nil {
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok {
if gce.nodeZones[newZone] == nil {
gce.nodeZones[newZone] = sets.NewString()
}
gce.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
}
}
}

// Known-useless DNS search path.
var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)

Expand Down
42 changes: 17 additions & 25 deletions pkg/cloudprovider/providers/gce/gce_disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,14 @@ func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeNam
// JSON in Description field.
func (gce *GCECloud) CreateDisk(
name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {

// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
if isManaged := gce.verifyZoneIsManaged(zone); !isManaged {
return fmt.Errorf("kubernetes does not manage zone %q", zone)
// Do not allow creation of PDs in zones that are do not have nodes. Such PDs
// are not currently usable.
curZones, err := gce.GetAllCurrentZones()
if err != nil {
return err
}
if !curZones.Has(zone) {
return fmt.Errorf("kubernetes does not have a node in zone %q", zone)
}

tagsStr, err := gce.encodeDiskTags(tags)
Expand Down Expand Up @@ -733,17 +736,16 @@ func (gce *GCECloud) CreateDisk(
func (gce *GCECloud) CreateRegionalDisk(
name string, diskType string, replicaZones sets.String, sizeGb int64, tags map[string]string) error {

// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
unmanagedZones := []string{}
for _, zone := range replicaZones.UnsortedList() {
if isManaged := gce.verifyZoneIsManaged(zone); !isManaged {
unmanagedZones = append(unmanagedZones, zone)
}
// Do not allow creation of PDs in zones that are do not have nodes. Such PDs
// are not currently usable. This functionality should be reverted to checking
// against managed zones if we want users to be able to create RegionalDisks
// in zones where there are no nodes
curZones, err := gce.GetAllCurrentZones()
if err != nil {
return err
}

if len(unmanagedZones) > 0 {
return fmt.Errorf("kubernetes does not manage specified zones: %q. Managed Zones: %q", unmanagedZones, gce.managedZones)
if !curZones.IsSuperset(replicaZones) {
return fmt.Errorf("kubernetes does not have nodes in specified zones: %q. Zones that contain nodes: %q", replicaZones.Difference(curZones), curZones)
}

tagsStr, err := gce.encodeDiskTags(tags)
Expand Down Expand Up @@ -776,16 +778,6 @@ func (gce *GCECloud) CreateRegionalDisk(
return err
}

func (gce *GCECloud) verifyZoneIsManaged(zone string) bool {
for _, managedZone := range gce.managedZones {
if zone == managedZone {
return true
}
}

return false
}

func getDiskType(diskType string) (string, error) {
switch diskType {
case DiskTypeSSD, DiskTypeStandard:
Expand Down
Loading

0 comments on commit 80e1c79

Please sign in to comment.