Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exponential backoff to openstack loadbalancer functions #45330

Merged
merged 1 commit into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cloudprovider/providers/openstack/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/util/cert:go_default_library",
],
)
Expand All @@ -78,6 +79,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)

Expand Down
117 changes: 56 additions & 61 deletions pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,34 @@ import (
neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
"github.com/gophercloud/gophercloud/pagination"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/cloudprovider"
)

// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
const loadbalancerActiveTimeoutSeconds = 120
const loadbalancerDeleteTimeoutSeconds = 30
const (
// loadbalancerActive* is configuration of exponential backoff for
// going into ACTIVE loadbalancer provisioning status. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
// it will time out after 128s, which roughly corresponds to 120s
loadbalancerActiveInitDealy = 1 * time.Second
loadbalancerActiveFactor = 1.2
loadbalancerActiveSteps = 19

// loadbalancerDelete* is configuration of exponential backoff for
// waiting for delete operation to complete. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
// it will time out after 32s, which roughly corresponds to 30s
loadbalancerDeleteInitDealy = 1 * time.Second
loadbalancerDeleteFactor = 1.2
loadbalancerDeleteSteps = 13

activeStatus = "ACTIVE"
errorStatus = "ERROR"
)

// LoadBalancer implementation for LBaaS v1
type LbaasV1 struct {
Expand Down Expand Up @@ -337,44 +356,6 @@ func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools
return members, nil
}

// Each pool has exactly one or zero monitors. ListOpts does not seem to filter anything.
func getMonitorByPoolID(client *gophercloud.ServiceClient, id string) (*v2monitors.Monitor, error) {
var monitorList []v2monitors.Monitor
err := v2monitors.List(client, v2monitors.ListOpts{PoolID: id}).EachPage(func(page pagination.Page) (bool, error) {
monitorsList, err := v2monitors.ExtractMonitors(page)
if err != nil {
return false, err
}

for _, monitor := range monitorsList {
// bugfix, filter by poolid
for _, pool := range monitor.Pools {
if pool.ID == id {
monitorList = append(monitorList, monitor)
}
}
}
if len(monitorList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}

if len(monitorList) == 0 {
return nil, ErrNotFound
} else if len(monitorList) > 1 {
return nil, ErrMultipleResults
}

return &monitorList[0], nil
}

// Check if a member exists for node
func memberExists(members []v2pools.Member, addr string, port int) bool {
for _, member := range members {
Expand Down Expand Up @@ -436,45 +417,59 @@ func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpt
}

func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
start := time.Now().Second()
for {
backoff := wait.Backoff{
Duration: loadbalancerActiveInitDealy,
Factor: loadbalancerActiveFactor,
Steps: loadbalancerActiveSteps,
}

var provisioningStatus string
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
return "", err
return false, err
}
if loadbalancer.ProvisioningStatus == "ACTIVE" {
return "ACTIVE", nil
} else if loadbalancer.ProvisioningStatus == "ERROR" {
return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state")
provisioningStatus = loadbalancer.ProvisioningStatus
if loadbalancer.ProvisioningStatus == activeStatus {
return true, nil
} else if loadbalancer.ProvisioningStatus == errorStatus {
return true, fmt.Errorf("Loadbalancer has gone into ERROR state")
} else {
return false, nil
}

time.Sleep(1 * time.Second)
})

if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
}
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
}
return provisioningStatus, err
}

func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
start := time.Now().Second()
for {
backoff := wait.Backoff{
Duration: loadbalancerDeleteInitDealy,
Factor: loadbalancerDeleteFactor,
Steps: loadbalancerDeleteSteps,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
if err == ErrNotFound {
return nil
return true, nil
} else {
return err
return false, err
}
} else {
return false, nil
}
})

time.Sleep(1 * time.Second)

if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds {
return fmt.Errorf("Loadbalancer failed to delete within the alloted time")
}

if err == wait.ErrWaitTimeout {
err = fmt.Errorf("Loadbalancer failed to delete within the alloted time")
}

return err
}

func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {
Expand Down
64 changes: 40 additions & 24 deletions pkg/cloudprovider/providers/openstack/openstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,55 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
)

const volumeAvailableStatus = "available"
const volumeInUseStatus = "in-use"
const volumeCreateTimeoutSeconds = 30
const testClusterName = "testCluster"

func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string, timeoutSeconds int) {
timeout := timeoutSeconds
start := time.Now().Second()
for {
time.Sleep(1 * time.Second)

if timeout >= 0 && time.Now().Second()-start >= timeout {
t.Logf("Volume (%s) status did not change to %s after %v seconds\n",
volumeName,
status,
timeout)
return
}
const (
volumeAvailableStatus = "available"
volumeInUseStatus = "in-use"
testClusterName = "testCluster"

volumeStatusTimeoutSeconds = 30
// volumeStatus* is configuration of exponential backoff for
// waiting for specified volume status. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
// it will time out after 32s, which roughly corresponds to 30s
volumeStatusInitDealy = 1 * time.Second
volumeStatusFactor = 1.2
volumeStatusSteps = 13
)

func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string) {
backoff := wait.Backoff{
Duration: volumeStatusInitDealy,
Factor: volumeStatusFactor,
Steps: volumeStatusSteps,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
getVol, err := os.getVolume(volumeName)
if err != nil {
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err)
return false, err
}
if getVol.Status == status {
t.Logf("Volume (%s) status changed to %s after %v seconds\n",
volumeName,
status,
timeout)
return
volumeStatusTimeoutSeconds)
return true, nil
} else {
return false, nil
}
})
if err == wait.ErrWaitTimeout {
t.Logf("Volume (%s) status did not change to %s after %v seconds\n",
volumeName,
status,
volumeStatusTimeoutSeconds)
return
}
if err != nil {
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err)
}
}

Expand Down Expand Up @@ -360,15 +376,15 @@ func TestVolumes(t *testing.T) {
}
t.Logf("Volume (%s) created\n", vol)

WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds)
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)

diskId, err := os.AttachDisk(os.localInstanceID, vol)
if err != nil {
t.Fatalf("Cannot AttachDisk Cinder volume %s: %v", vol, err)
}
t.Logf("Volume (%s) attached, disk ID: %s\n", vol, diskId)

WaitForVolumeStatus(t, os, vol, volumeInUseStatus, volumeCreateTimeoutSeconds)
WaitForVolumeStatus(t, os, vol, volumeInUseStatus)

devicePath := os.GetDevicePath(diskId)
if !strings.HasPrefix(devicePath, "/dev/disk/by-id/") {
Expand All @@ -382,7 +398,7 @@ func TestVolumes(t *testing.T) {
}
t.Logf("Volume (%s) detached\n", vol)

WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds)
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)

err = os.DeleteVolume(vol)
if err != nil {
Expand Down