Skip to content

Commit

Permalink
Merge pull request #88308 from aramase/automated-cherry-pick-of-#8809…
Browse files Browse the repository at this point in the history
…4-upstream-release-1.16

Automated cherry pick of #88094: add delays between goroutines for vm instance update
  • Loading branch information
k8s-ci-robot committed Feb 21, 2020
2 parents abdce0e + ef95101 commit f0201fa
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
21 changes: 21 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go
Expand Up @@ -20,6 +20,9 @@ package azure

import (
"sync"
"time"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
)

// lockMap used to lock on entries
Expand Down Expand Up @@ -69,3 +72,21 @@ func (lm *lockMap) lockEntry(entry string) {
func (lm *lockMap) unlockEntry(entry string) {
lm.mutexMap[entry].Unlock()
}

// aggregateGoroutinesWithDelay aggregates goroutines and runs them
// in parallel with delay before starting each goroutine
func aggregateGoroutinesWithDelay(delay time.Duration, funcs ...func() error) utilerrors.Aggregate {
errChan := make(chan error, len(funcs))

for _, f := range funcs {
go func(f func() error) { errChan <- f() }(f)
time.Sleep(delay)
}
errs := make([]error, 0)
for i := 0; i < cap(errChan); i++ {
if err := <-errChan; err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package azure

import (
"fmt"
"testing"
"time"
)
Expand Down Expand Up @@ -83,3 +84,67 @@ func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool {
return true
}
}

// running same unit tests as https://github.com/kubernetes/apimachinery/blob/master/pkg/util/errors/errors_test.go#L371
func TestAggregateGoroutinesWithDelay(t *testing.T) {
testCases := []struct {
errs []error
expected map[string]bool
}{
{
[]error{},
nil,
},
{
[]error{nil},
nil,
},
{
[]error{nil, nil},
nil,
},
{
[]error{fmt.Errorf("1")},
map[string]bool{"1": true},
},
{
[]error{fmt.Errorf("1"), nil},
map[string]bool{"1": true},
},
{
[]error{fmt.Errorf("1"), fmt.Errorf("267")},
map[string]bool{"1": true, "267": true},
},
{
[]error{fmt.Errorf("1"), nil, fmt.Errorf("1234")},
map[string]bool{"1": true, "1234": true},
},
{
[]error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")},
map[string]bool{"1": true, "1234": true, "22": true},
},
}
for i, testCase := range testCases {
funcs := make([]func() error, len(testCase.errs))
for i := range testCase.errs {
err := testCase.errs[i]
funcs[i] = func() error { return err }
}
agg := aggregateGoroutinesWithDelay(100*time.Millisecond, funcs...)
if agg == nil {
if len(testCase.expected) > 0 {
t.Errorf("%d: expected %v, got nil", i, testCase.expected)
}
continue
}
if len(agg.Errors()) != len(testCase.expected) {
t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg)
continue
}
for _, err := range agg.Errors() {
if !testCase.expected[err.Error()] {
t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err)
}
}
}
}
12 changes: 10 additions & 2 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
Expand Down Expand Up @@ -53,6 +54,13 @@ var (
vmssVMProviderIDRE = regexp.MustCompile(`azure:///subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(?:\d+)`)
)

const (
// vmssVMInstanceUpdateDelay is used when updating multiple vm instances in parallel
// the optimum value is 3s to prevent any conflicts that result in concurrent vmss vm
// instances update
vmssVMInstanceUpdateDelay = 3 * time.Second
)

// scaleSet implements VMSet interface for Azure scale set.
type scaleSet struct {
*Cloud
Expand Down Expand Up @@ -1097,7 +1105,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
hostUpdates = append(hostUpdates, f)
}

errs := utilerrors.AggregateGoroutines(hostUpdates...)
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
if errs != nil {
return utilerrors.Flatten(errs)
}
Expand Down Expand Up @@ -1389,7 +1397,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
hostUpdates = append(hostUpdates, f)
}

errs := utilerrors.AggregateGoroutines(hostUpdates...)
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
if errs != nil {
return utilerrors.Flatten(errs)
}
Expand Down

0 comments on commit f0201fa

Please sign in to comment.