Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context (operation ID) to package retry #8205

Merged
merged 1 commit into from Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions lib/apiservers/engine/backends/container.go
Expand Up @@ -259,7 +259,7 @@ func (c *ContainerBackend) ContainerExecCreate(name string, config *types.ExecCo
backoffConf.InitialInterval = 500 * time.Millisecond
backoffConf.MaxElapsedTime = 20 * time.Minute

if err := retry.DoWithConfig(operation, engerr.IsLockTimeoutOrConflictError, backoffConf); err != nil {
if err := retry.DoWithConfig(op, operation, engerr.IsLockTimeoutOrConflictError, backoffConf); err != nil {
op.Errorf("Failed to start Exec task for container(%s) due to error (%s)", id, err)
return "", err
}
Expand Down Expand Up @@ -547,7 +547,7 @@ func (c *ContainerBackend) ContainerExecStart(ctx context.Context, eid string, s
return err
}

if err := retry.DoWithConfig(operation, engerr.IsLockTimeoutOrConflictError, backoffConf); err != nil {
if err := retry.DoWithConfig(ctx, operation, engerr.IsLockTimeoutOrConflictError, backoffConf); err != nil {
op.Errorf("Failed to start Exec task for container(%s) due to error (%s)", id, err)
return err
}
Expand Down Expand Up @@ -866,14 +866,14 @@ func (c *ContainerBackend) ContainerRestart(name string, seconds *int) error {
operation := func() error {
return c.containerProxy.Stop(op, vc, name, seconds, false)
}
if err := retry.Do(operation, engerr.IsConflictError); err != nil {
if err := retry.Do(op, operation, engerr.IsConflictError); err != nil {
return engerr.InternalServerError(fmt.Sprintf("Stop failed with: %s", err))
}

operation = func() error {
return c.containerStart(op, name, nil, true)
}
if err := retry.Do(operation, engerr.IsConflictError); err != nil {
if err := retry.Do(op, operation, engerr.IsConflictError); err != nil {
return engerr.InternalServerError(fmt.Sprintf("Start failed with: %s", err))
}

Expand Down Expand Up @@ -945,7 +945,7 @@ func (c *ContainerBackend) ContainerRm(name string, config *types.ContainerRmCon
return c.containerProxy.Remove(op, vc, config)
}

return retry.Do(operation, engerr.IsConflictError)
return retry.Do(op, operation, engerr.IsConflictError)
}

return c.containerProxy.Remove(op, vc, config)
Expand Down Expand Up @@ -1010,7 +1010,7 @@ func (c *ContainerBackend) ContainerStart(name string, hostConfig *containertype
operation := func() error {
return c.containerStart(op, name, hostConfig, true)
}
if err := retry.Do(operation, engerr.IsConflictError); err != nil {
if err := retry.Do(op, operation, engerr.IsConflictError); err != nil {
op.Debugf("Container start failed due to error - %s", err.Error())
return err
}
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func (c *ContainerBackend) ContainerStop(name string, seconds *int) error {

config := retry.NewBackoffConfig()
config.MaxElapsedTime = maxElapsedTime
if err := retry.DoWithConfig(operation, engerr.IsConflictError, config); err != nil {
if err := retry.DoWithConfig(op, operation, engerr.IsConflictError, config); err != nil {
return err
}

Expand Down Expand Up @@ -1656,7 +1656,7 @@ func (c *ContainerBackend) ContainerAttach(name string, ca *backend.ContainerAtt
operation := func() error {
return c.containerAttach(op, name, ca)
}
if err := retry.Do(operation, engerr.IsConflictError); err != nil {
if err := retry.Do(op, operation, engerr.IsConflictError); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1778,7 +1778,7 @@ func (c *ContainerBackend) ContainerRename(oldName, newName string) error {
return c.containerProxy.Rename(op, vc, newName)
}

if err := retry.Do(renameOp, engerr.IsConflictError); err != nil {
if err := retry.Do(op, renameOp, engerr.IsConflictError); err != nil {
log.Errorf("Rename error: %s", err)
cache.ContainerCache().ReleaseName(newName)
return err
Expand Down
2 changes: 1 addition & 1 deletion lib/apiservers/engine/backends/convert/stats_test.go
Expand Up @@ -367,7 +367,7 @@ func success(converter *ContainerStats) bool {
return false
}
// use the retry package and keep retrying until we've hit the limit
if err := retry.Do(op, wait); err != nil {
if err := retry.Do(context.Background(), op, wait); err != nil {
return false
}
return true
Expand Down
2 changes: 1 addition & 1 deletion lib/apiservers/engine/backends/network.go
Expand Up @@ -333,7 +333,7 @@ func (n *NetworkBackend) ConnectContainerToNetwork(containerName, networkName st

config := retry.NewBackoffConfig()
config.MaxElapsedTime = maxElapsedTime
err := retry.DoWithConfig(operation, isCommitConflictError, config)
err := retry.DoWithConfig(op, operation, isCommitConflictError, config)
if err != nil {
switch err := err.(type) {
case *containers.CommitNotFound:
Expand Down
4 changes: 2 additions & 2 deletions lib/install/management/appliance.go
Expand Up @@ -245,15 +245,15 @@ func (d *Dispatcher) deleteVM(vm *vm.VirtualMachine, force bool) error {
}

// Only retry VM destroy on ConcurrentAccess error
err = retry.Do(func() error {
err = retry.Do(d.op, func() error {
_, err := vm.DeleteExceptDisks(d.op)
return err
}, retryErrHandler)

if err != nil {
d.op.Warnf("Destroy VM %s failed with %s, unregister the VM instead", vm.Reference(), err)

err = retry.Do(func() error {
err = retry.Do(d.op, func() error {
return vm.Unregister(d.op)
}, retryErrHandler)

Expand Down
2 changes: 1 addition & 1 deletion lib/install/management/create.go
Expand Up @@ -188,7 +188,7 @@ func (d *Dispatcher) uploadISOs(files map[string]string) error {
return true
}

uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf)
uploadErr := retry.DoWithConfig(d.op, operationForRetry, uploadRetryDecider, backoffConf)
if uploadErr != nil {
finalMessage := fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr)
if d.force {
Expand Down
2 changes: 1 addition & 1 deletion lib/install/management/finder.go
Expand Up @@ -308,7 +308,7 @@ func (d *Dispatcher) listResourcePools(searchPath string) ([]*object.ResourcePoo
// under some circumstances, such as when there is concurrent vic-machine delete operation running in the background,
// listing resource pools might fail because some VCH pool is being destroyed at the same time.
// If that happens, we retry and list pools again
err = retry.Do(func() error {
err = retry.Do(d.op, func() error {
pools, err = d.session.Finder.ResourcePoolList(d.op, path.Join(searchPath, "..."))
if _, ok := err.(*find.NotFoundError); ok {
return nil
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/exec/commit.go
Expand Up @@ -327,7 +327,7 @@ func reloadConfig(op trace.Operation, h *Handle, c *Container) error {
return nil
}

err := retry.Do(retryFunc, isIntermittentFailure)
err := retry.Do(op, retryFunc, isIntermittentFailure)
if err != nil {
op.Debugf("Failed an exec operation with err: %s", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/logging/logging.go
Expand Up @@ -94,7 +94,7 @@ func eventCallback(ie events.Event) {
return nil
}

if err := retry.Do(operation, exec.IsConcurrentAccessError); err != nil {
if err := retry.Do(op, operation, exec.IsConcurrentAccessError); err != nil {
op.Errorf("Multiple attempts failed to commit handle after getting %s event for container %s: %s", ie, ie.Reference(), err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/portlayer.go
Expand Up @@ -196,7 +196,7 @@ func TakeCareOfSerialPorts(sess *session.Session) {
return nil
}

if err := retry.Do(operation, exec.IsConcurrentAccessError); err != nil {
if err := retry.Do(op, operation, exec.IsConcurrentAccessError); err != nil {
op.Errorf("Multiple attempts failed for committing the handle with %s", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/storage/image_cache.go
Expand Up @@ -187,7 +187,7 @@ func (c *NameLookupCache) CreateImageStore(op trace.Operation, storeName string)
config.MaxElapsedTime = time.Minute * 3

// attempt to get the image store
err = retry.DoWithConfig(getStore, isRetry, config)
err = retry.DoWithConfig(op, getStore, isRetry, config)
if err == nil {
// no error means that the image store exists and we can
// safely return
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/storage/vsphere/toolbox_datasink.go
Expand Up @@ -73,7 +73,7 @@ func (t *ToolboxDataSink) Import(op trace.Operation, spec *archive.FilterSpec, d
return client.Upload(op, buf, target, p, &types.GuestPosixFileAttributes{}, true)
}

err = retry.DoWithConfig(retryFunc, isInvalidStateError, toolboxRetryConf)
err = retry.DoWithConfig(op, retryFunc, isInvalidStateError, toolboxRetryConf)

if err != nil {
op.Debugf("Upload error: %s", err.Error())
Expand Down
4 changes: 2 additions & 2 deletions lib/portlayer/storage/vsphere/toolbox_datasource.go
Expand Up @@ -66,7 +66,7 @@ func (t *ToolboxDataSource) Export(op trace.Operation, spec *archive.FilterSpec,
return retryErr
}

err = retry.DoWithConfig(retryFunc, isInvalidStateError, toolboxRetryConf)
err = retry.DoWithConfig(op, retryFunc, isInvalidStateError, toolboxRetryConf)
if err != nil {
op.Errorf("Download error: %s", err.Error())
return nil, err
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *ToolboxDataSource) Stat(op trace.Operation, spec *archive.FilterSpec) (
return retryErr
}

err = retry.DoWithConfig(retryFunc, isInvalidStateError, toolboxRetryConf)
err = retry.DoWithConfig(op, retryFunc, isInvalidStateError, toolboxRetryConf)
if err != nil {
op.Errorf("Download error: %s", err.Error())
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion lib/portlayer/task/wait.go
Expand Up @@ -93,7 +93,7 @@ func Wait(op *trace.Operation, h interface{}, id string) error {
return tasks.IsTransientError(*op, err)
}

if err := retry.DoWithConfig(operation, retryDecider, backoffConf); err != nil {
if err := retry.DoWithConfig(op, operation, retryDecider, backoffConf); err != nil {
op.Errorf("Unable to wait for task exec task completion: task(%s) : %s", id, err)
return err
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/retry/retry.go
Expand Up @@ -15,9 +15,9 @@
package retry

import (
"context"
"time"

log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"

"github.com/vmware/vic/pkg/trace"
Expand Down Expand Up @@ -56,14 +56,15 @@ func NewBackoffConfig() *BackoffConfig {

// Do retries the given function until defaultMaxInterval time passes, while sleeping some time between unsuccessful attempts
// if retryOnError returns true, continue retry, otherwise, return error
func Do(operation func() error, retryOnError func(err error) bool) error {
func Do(ctx context.Context, operation func() error, retryOnError func(err error) bool) error {
conf := NewBackoffConfig()
return DoWithConfig(operation, retryOnError, conf)
return DoWithConfig(ctx, operation, retryOnError, conf)
}

// DoWithConfig will attempt an operation while retrying using an exponential back off based on the config supplied by the caller. The retry decider is the supplied function retryOnError
func DoWithConfig(operation func() error, retryOnError func(err error) bool, config *BackoffConfig) error {
defer trace.End(trace.Begin(""))
func DoWithConfig(ctx context.Context, operation func() error, retryOnError func(err error) bool, config *BackoffConfig) error {
op := trace.FromContext(ctx, "DoWithConfig")
defer trace.End(trace.Begin("", op))

var err error
var next time.Duration
Expand All @@ -83,17 +84,17 @@ func DoWithConfig(operation func() error, retryOnError func(err error) bool, con
}

if next = b.NextBackOff(); next == backoff.Stop {
log.Errorf("Will stop trying again. Operation failed with %#+v", err)
op.Errorf("Will stop trying again. Operation failed with %#+v", err)
return err
}

// check error
if !retryOnError(err) {
log.Errorf("Operation failed with %#+v", err)
op.Errorf("Operation failed with %#+v", err)
return err
}
// Expected error
log.Warnf("Will try again in %s. Operation failed with detected error", next)
op.Warnf("Will try again in %s. Operation failed with detected error", next)

// sleep and try again
time.Sleep(next)
Expand Down
3 changes: 2 additions & 1 deletion pkg/retry/retry_test.go
Expand Up @@ -15,6 +15,7 @@
package retry

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -35,7 +36,7 @@ func TestRetry(t *testing.T) {
}
return false
}
err := Do(operation, retryOnError)
err := Do(context.Background(), operation, retryOnError)
assert.True(t, i == 4, "should retried 4 times")
assert.True(t, err != nil, fmt.Sprintf("retry do not depend on error status"))
}
8 changes: 4 additions & 4 deletions pkg/vsphere/vm/vm.go
Expand Up @@ -388,7 +388,7 @@ func (vm *VirtualMachine) DeleteExceptDisks(ctx context.Context) (*types.TaskInf
}

firstTime := true
err = retry.Do(func() error {
err = retry.Do(op, func() error {
op.Debugf("Getting list of the devices for VM %q", vmName)
devices, err := vm.Device(op)
if err != nil {
Expand Down Expand Up @@ -429,7 +429,7 @@ func (vm *VirtualMachine) DeleteExceptDisks(ctx context.Context) (*types.TaskInf

// If destroy method is disabled on this VM, re-enable it and retry
op.Debugf("Destroy is disabled. Enabling destroy for VM %q", vmName)
err = retry.Do(func() error {
err = retry.Do(op, func() error {
return vm.EnableDestroy(op)
}, tasks.IsConcurrentAccessError)

Expand Down Expand Up @@ -907,7 +907,7 @@ func (vm *VirtualMachine) PowerOn(op trace.Operation) error {
conf.MaxInterval = 1 * time.Second
conf.MaxElapsedTime = 20 * time.Second

err = retry.DoWithConfig(f, retry.OnError, conf)
err = retry.DoWithConfig(op, f, retry.OnError, conf)
if err != nil {
return err
}
Expand All @@ -928,7 +928,7 @@ func (vm *VirtualMachine) PowerOn(op trace.Operation) error {
// if relocation or powerOn fails, something has changed and we need a new recommendation
subset = hosts[1:]

if err = retry.DoWithConfig(f, retry.OnError, conf); err != nil {
if err = retry.DoWithConfig(op, f, retry.OnError, conf); err != nil {
return err
}

Expand Down