Skip to content

Commit

Permalink
Changed GetAllZones to only get zones with nodes that are currently
Browse files Browse the repository at this point in the history
running (renamed to GetAllCurrentZones). Added E2E test to confirm this
behavior.
  • Loading branch information
davidz627 committed Sep 12, 2017
1 parent 507af4b commit 547f0ec
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 39 deletions.
81 changes: 45 additions & 36 deletions pkg/cloudprovider/providers/gce/gce_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gce
import (
"errors"
"fmt"
"log"
"net"
"net/http"
"strconv"
Expand All @@ -32,9 +33,11 @@ import (
compute "google.golang.org/api/compute/v1"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
)
Expand Down Expand Up @@ -250,50 +253,56 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
})
}

// GetAllZones returns all the zones in which nodes are running
func (gce *GCECloud) GetAllZones() (sets.String, error) {
// Fast-path for non-multizone
if len(gce.managedZones) == 1 {
return sets.NewString(gce.managedZones...), nil
}

// TODO: Caching, but this is currently only called when we are creating a volume,
// which is a relatively infrequent operation, and this is only # zones API calls
// GetAllCurrentZones returns all the zones in which nodes are currently running
func (gce *GCECloud) GetAllCurrentZones() (sets.String, error) {
zones := sets.NewString()
client := gce.client
if client == nil {
return nil, fmt.Errorf("Client not set on GCECloud object")
}
nodeList, err := client.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
log.Fatalf("Failed to list nodes: %v", err)
return nil, err
}

// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
mc := newInstancesMetricContext("list", zone)
// We only retrieve one page in each zone - we only care about existence
listCall := gce.service.Instances.List(gce.projectID, zone)

// No filter: We assume that a zone is either used or unused
// We could only consider running nodes (like we do in List above),
// but probably if instances are starting we still want to consider them.
// I think we should wait until we have a reason to make the
// call one way or the other; we generally can't guarantee correct
// volume spreading if the set of zones is changing
// (and volume spreading is currently only a heuristic).
// Long term we want to replace GetAllZones (which primarily supports volume
// spreading) with a scheduler policy that is able to see the global state of
// volumes and the health of zones.

// Just a minimal set of fields - we only care about existence
listCall = listCall.Fields("items(name)")
res, err := listCall.Do()
if err != nil {
return nil, mc.Observe(err)
}
mc.Observe(nil)

if len(res.Items) != 0 {
for _, node := range nodeList.Items {
labels := node.ObjectMeta.Labels
zone, ok := labels[kubeletapis.LabelZoneFailureDomain]
if ok {
zones.Insert(zone)
} else {
return nil, fmt.Errorf("Could not find zone for node %v", node.ObjectMeta.Name)
}
}

return zones, nil
}

// GetAllManagedZones gets all the zones managed by k8s
func (gce *GCECloud) GetAllManagedZones() (sets.String, error) {
if gce.managedZones != nil {
return sets.NewString(gce.managedZones...), nil
}
return nil, fmt.Errorf("Could not get k8s managed zones, it was nil")

}

// SetClientSet sets the ClientSet on the GCECloud object for e2e tests
func (gce *GCECloud) SetClientSet(c clientset.Interface) {
gce.client = c
return
}

// InsertInstance creates a new instance on GCP
func (gce *GCECloud) InsertInstance(project string, zone string, rb *compute.Instance) (*compute.Operation, error) {
return gce.service.Instances.Insert(project, zone, rb).Do()
}

// DeleteInstance deletes an instance by name on GCP
func (gce *GCECloud) DeleteInstance(project, zone, instance string) (*compute.Operation, error) {
return gce.service.Instances.Delete(project, zone, instance).Do()
}

// Implementation of Instances.CurrentNodeName
func (gce *GCECloud) CurrentNodeName(hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/gce_pd/gce_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin
// 000 - neither "zone", "zones", or "replica-zones" specified
// Pick a zone randomly selected from all active zones where
// Kubernetes cluster has a node.
zones, err = cloud.GetAllZones()
zones, err = cloud.GetAllCurrentZones()
if err != nil {
glog.V(2).Infof("error getting zone information from GCE: %v", err)
return "", 0, nil, "", err
Expand Down
1 change: 1 addition & 0 deletions test/e2e/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//vendor/github.com/onsi/ginkgo/config:go_default_library",
"//vendor/github.com/onsi/ginkgo/reporters:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/rbac/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
9 changes: 8 additions & 1 deletion test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@ func setupProviderConfig() error {
return fmt.Errorf("Error building GCE/GKE provider: %v", err)
}

cs, err := framework.LoadClientset()
if err != nil {
glog.Fatal("Error loading client: ", err)
}
gceCloud.SetClientSet(cs)

cloudConfig.Provider = gceCloud

// Arbitrarily pick one of the zones we have nodes in
if cloudConfig.Zone == "" && framework.TestContext.CloudConfig.MultiZone {
zones, err := gceCloud.GetAllZones()
zones, err := gceCloud.GetAllCurrentZones()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/storage/volume_provisioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ var _ = SIGDescribe("Dynamic Provisioning", func() {
Expect(err).NotTo(HaveOccurred())

// Get all k8s managed zones
managedZones, err = gceCloud.GetAllZones()
managedZones, err = gceCloud.GetAllManagedZones()
Expect(err).NotTo(HaveOccurred())

// Get a list of all zones in the project
Expand Down
78 changes: 78 additions & 0 deletions test/e2e/ubernetes_lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
compute "google.golang.org/api/compute/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -61,8 +62,85 @@ var _ = framework.KubeDescribe("Multi-AZ Clusters", func() {
It("should schedule pods in the same zones as statically provisioned PVs", func() {
PodsUseStaticPVsOrFail(f, (2*zoneCount)+1, image)
})

It("should only be allowed to provision PDs in zones where nodes exist", func() {
OnlyAllowNodeZones(f, zoneCount, image)
})
})

// OnlyAllowNodeZones tests that GetAllCurrentZones returns only zones with Nodes
func OnlyAllowNodeZones(f *framework.Framework, zoneCount int, image string) {
gceCloud, err := framework.GetGCECloud()
Expect(err).NotTo(HaveOccurred())

// Get all the zones that the nodes are in
expectedZones, err := gceCloud.GetAllCurrentZones()
Expect(err).NotTo(HaveOccurred())
framework.Logf("Expected zones: %v\n", expectedZones)
// Get all the zones in this current region
region := gceCloud.Region()
allZonesInRegion, err := gceCloud.ListZonesInRegion(region)
Expect(err).NotTo(HaveOccurred())
var extraZone string
for _, zone := range allZonesInRegion {
if !expectedZones.Has(zone.Name) {
extraZone = zone.Name
break
}
}

// If no zones left to create an extra instance we screwed and we stop the test right here
Expect(extraZone).NotTo(Equal(""), fmt.Sprintf("No extra zones available in region %s", region))
By(fmt.Sprintf("starting a compute instance in unused zone: %v\n", extraZone))
// create a compute instance in an unused zone
project := framework.TestContext.CloudConfig.ProjectID
zone := extraZone
name := "compute-" + string(uuid.NewUUID())
imageURL := "https://www.googleapis.com/compute/v1/projects/debian-cloud/global/images/debian-7-wheezy-v20140606"

rb := &compute.Instance{
MachineType: "zones/" + zone + "/machineTypes/f1-micro",
Disks: []*compute.AttachedDisk{
{
AutoDelete: true,
Boot: true,
Type: "PERSISTENT",
InitializeParams: &compute.AttachedDiskInitializeParams{
DiskName: "my-root-pd",
SourceImage: imageURL,
},
},
},
NetworkInterfaces: []*compute.NetworkInterface{
{
AccessConfigs: []*compute.AccessConfig{
{
Type: "ONE_TO_ONE_NAT",
Name: "External NAT",
},
},
Network: "/global/networks/default",
},
},
Name: name,
}
resp, err := gceCloud.InsertInstance(project, zone, rb)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Compute creation response: %v\n", resp)
defer func() {
// Teardown of the compute instance
framework.Logf("Deleting compute resource: %v", name)
resp, err = gceCloud.DeleteInstance(project, zone, name)
framework.Logf("Compute deletion response: %v\n", resp)
}()
// GetAllZones functionality checking.
gotZones, err := gceCloud.GetAllCurrentZones()
Expect(err).NotTo(HaveOccurred())

Expect(gotZones.Equal(expectedZones)).To(BeTrue(), fmt.Sprintf("Expected zones: %v, Got Zones: %v", expectedZones, gotZones))

}

// Check that the pods comprising a service get spread evenly across available zones
func SpreadServiceOrFail(f *framework.Framework, replicaCount int, image string) {
// First create the service
Expand Down

0 comments on commit 547f0ec

Please sign in to comment.