Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #52322 upstream release 1.8 #56517

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild
}
}

if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(sharedInformers)
}

ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
],
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/cloudprovider/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/controller"
)

Expand All @@ -49,6 +50,11 @@ type Interface interface {
HasClusterID() bool
}

type InformerUser interface {
// SetInformers sets the informer on the cloud object.
SetInformers(informerFactory informers.SharedInformerFactory)
}

// Clusters is an abstract, pluggable interface for clusters of containers.
type Clusters interface {
// ListClusters lists the names of the available clusters.
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/providers/gce/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options/encryptionconfig:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/value/encrypt/envelope:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand Down
101 changes: 88 additions & 13 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"

"github.com/golang/glog"
"golang.org/x/oauth2"
Expand Down Expand Up @@ -100,19 +103,22 @@ type GCECloud struct {
// for the cloudprovider to start watching the configmap.
ClusterID ClusterID

service *compute.Service
serviceBeta *computebeta.Service
serviceAlpha *computealpha.Service
containerService *container.Service
cloudkmsService *cloudkms.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
managedZones []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master)
service *compute.Service
serviceBeta *computebeta.Service
serviceAlpha *computealpha.Service
containerService *container.Service
cloudkmsService *cloudkms.Service
client clientset.Interface
clientBuilder controller.ControllerClientBuilder
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
projectID string
region string
localZone string // The zone in which we are running
// managedZones will be set to the 1 zone if running a single zone cluster
// it will be set to ALL zones in region for any multi-zone cluster
// Use GetAllCurrentZones to get only zones that contain nodes
managedZones []string
networkURL string
isLegacyNetwork bool
subnetworkURL string
Expand All @@ -127,6 +133,12 @@ type GCECloud struct {
useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter
manager ServiceManager
// Lock for access to nodeZones
nodeZonesLock sync.Mutex
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
nodeInformerSynced cache.InformerSynced
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
// prevent inconsistencies. For example, load balancers manipulation methods will take the
// lock to prevent shared resources from being prematurely deleted while the operation is
Expand Down Expand Up @@ -536,6 +548,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) {
useMetadataServer: config.UseMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
AlphaFeatureGate: config.AlphaFeatureGate,
nodeZones: map[string]sets.String{},
}

gce.manager = &GCEServiceManager{gce}
Expand Down Expand Up @@ -656,6 +669,68 @@ func (gce *GCECloud) IsLegacyNetwork() bool {
return gce.isLegacyNetwork
}

func (gce *GCECloud) SetInformers(informerFactory informers.SharedInformerFactory) {
glog.Infof("Setting up informers for GCECloud")
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
gce.updateNodeZones(nil, node)
},
UpdateFunc: func(prev, obj interface{}) {
prevNode := prev.(*v1.Node)
newNode := obj.(*v1.Node)
if newNode.Labels[kubeletapis.LabelZoneFailureDomain] ==
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
return
}
gce.updateNodeZones(prevNode, newNode)
},
DeleteFunc: func(obj interface{}) {
node, isNode := obj.(*v1.Node)
// We can get DeletedFinalStateUnknown instead of *v1.Node here
// and we need to handle that correctly.
if !isNode {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Received unexpected object: %v", obj)
return
}
node, ok = deletedState.Obj.(*v1.Node)
if !ok {
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
gce.updateNodeZones(node, nil)
},
})
gce.nodeInformerSynced = nodeInformer.HasSynced
}

func (gce *GCECloud) updateNodeZones(prevNode, newNode *v1.Node) {
gce.nodeZonesLock.Lock()
defer gce.nodeZonesLock.Unlock()
if prevNode != nil {
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok {
gce.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
if gce.nodeZones[prevZone].Len() == 0 {
gce.nodeZones[prevZone] = nil
}
}
}
if newNode != nil {
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok {
if gce.nodeZones[newZone] == nil {
gce.nodeZones[newZone] = sets.NewString()
}
gce.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
}
}
}

// Known-useless DNS search path.
var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)

Expand Down
42 changes: 17 additions & 25 deletions pkg/cloudprovider/providers/gce/gce_disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,14 @@ func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeNam
// JSON in Description field.
func (gce *GCECloud) CreateDisk(
name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {

// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
if isManaged := gce.verifyZoneIsManaged(zone); !isManaged {
return fmt.Errorf("kubernetes does not manage zone %q", zone)
// Do not allow creation of PDs in zones that are do not have nodes. Such PDs
// are not currently usable.
curZones, err := gce.GetAllCurrentZones()
if err != nil {
return err
}
if !curZones.Has(zone) {
return fmt.Errorf("kubernetes does not have a node in zone %q", zone)
}

tagsStr, err := gce.encodeDiskTags(tags)
Expand Down Expand Up @@ -316,17 +319,16 @@ func (gce *GCECloud) CreateDisk(
func (gce *GCECloud) CreateRegionalDisk(
name string, diskType string, replicaZones sets.String, sizeGb int64, tags map[string]string) error {

// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
unmanagedZones := []string{}
for _, zone := range replicaZones.UnsortedList() {
if isManaged := gce.verifyZoneIsManaged(zone); !isManaged {
unmanagedZones = append(unmanagedZones, zone)
}
// Do not allow creation of PDs in zones that are do not have nodes. Such PDs
// are not currently usable. This functionality should be reverted to checking
// against managed zones if we want users to be able to create RegionalDisks
// in zones where there are no nodes
curZones, err := gce.GetAllCurrentZones()
if err != nil {
return err
}

if len(unmanagedZones) > 0 {
return fmt.Errorf("kubernetes does not manage specified zones: %q. Managed Zones: %q", unmanagedZones, gce.managedZones)
if !curZones.IsSuperset(replicaZones) {
return fmt.Errorf("kubernetes does not have nodes in specified zones: %q. Zones that contain nodes: %q", replicaZones.Difference(curZones), curZones)
}

tagsStr, err := gce.encodeDiskTags(tags)
Expand Down Expand Up @@ -359,16 +361,6 @@ func (gce *GCECloud) CreateRegionalDisk(
return err
}

func (gce *GCECloud) verifyZoneIsManaged(zone string) bool {
for _, managedZone := range gce.managedZones {
if zone == managedZone {
return true
}
}

return false
}

func getDiskType(diskType string) (string, error) {
switch diskType {
case DiskTypeSSD, DiskTypeStandard:
Expand Down