Skip to content

Commit

Permalink
attempt to fix timeout flakes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianpursley committed May 28, 2020
1 parent 3a95b11 commit a8e0268
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/options/options_test.go
Expand Up @@ -163,7 +163,7 @@ func TestAddFlags(t *testing.T) {
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
},
DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
DeleteCollectionWorkers: 1,
DeleteCollectionWorkers: 5,
EnableGarbageCollection: true,
EnableWatchCache: true,
DefaultWatchCacheSize: 100,
Expand Down
54 changes: 37 additions & 17 deletions pkg/controller/namespace/deletion/namespaced_resources_deleter.go
Expand Up @@ -306,6 +306,11 @@ func (d *namespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace)
// it returns true if the operation was supported on the server.
// it returns an error if the operation was supported on the server but was unable to complete.
func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionResource, namespace string) (bool, error) {

// TODO: Remove temporary logging for troubleshooting timeout flakes
startTime := time.Now()
defer klog.V(1).Infof("***BP*** namespacedResourcesDeleter.deleteCollection for %s, gvr: %v finished with duration = %v", namespace, gvr, time.Since(startTime))

klog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)

key := operationKey{operation: operationDeleteCollection, gvr: gvr}
Expand Down Expand Up @@ -412,6 +417,11 @@ type gvrDeletionMetadata struct {
func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
gvr schema.GroupVersionResource, namespace string,
namespaceDeletedAt metav1.Time) (gvrDeletionMetadata, error) {

// TODO: Remove temporary logging for troubleshooting timeout flakes
startTime := time.Now()
defer klog.V(1).Infof("***BP*** namespacedResourcesDeleter.deleteAllContentForGroupVersionResource for %s, gvr: %v finished with duration = %v", namespace, gvr, time.Since(startTime))

klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)

// estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
Expand Down Expand Up @@ -524,27 +534,37 @@ func (d *namespacedResourcesDeleter) deleteAllContent(ns *v1.Namespace) (int64,
gvrToNumRemaining: map[schema.GroupVersionResource]int{},
finalizersToNumRemaining: map[string]int{},
}
var wg sync.WaitGroup
var mutex sync.Mutex
for gvr := range groupVersionResources {
gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(gvr, namespace, namespaceDeletedAt)
if err != nil {
// If there is an error, hold on to it but proceed with all the remaining
// groupVersionResources.
errs = append(errs, err)
conditionUpdater.ProcessDeleteContentErr(err)
}
if gvrDeletionMetadata.finalizerEstimateSeconds > estimate {
estimate = gvrDeletionMetadata.finalizerEstimateSeconds
}
if gvrDeletionMetadata.numRemaining > 0 {
numRemainingTotals.gvrToNumRemaining[gvr] = gvrDeletionMetadata.numRemaining
for finalizer, numRemaining := range gvrDeletionMetadata.finalizersToNumRemaining {
if numRemaining == 0 {
continue
gvr := gvr
wg.Add(1)
go func() {
defer wg.Done()
gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(gvr, namespace, namespaceDeletedAt)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
// If there is an error, hold on to it but proceed with all the remaining
// groupVersionResources.
errs = append(errs, err)
conditionUpdater.ProcessDeleteContentErr(err)
}
if gvrDeletionMetadata.finalizerEstimateSeconds > estimate {
estimate = gvrDeletionMetadata.finalizerEstimateSeconds
}
if gvrDeletionMetadata.numRemaining > 0 {
numRemainingTotals.gvrToNumRemaining[gvr] = gvrDeletionMetadata.numRemaining
for finalizer, numRemaining := range gvrDeletionMetadata.finalizersToNumRemaining {
if numRemaining == 0 {
continue
}
numRemainingTotals.finalizersToNumRemaining[finalizer] = numRemainingTotals.finalizersToNumRemaining[finalizer] + numRemaining
}
numRemainingTotals.finalizersToNumRemaining[finalizer] = numRemainingTotals.finalizersToNumRemaining[finalizer] + numRemaining
}
}
}()
}
wg.Wait()
conditionUpdater.ProcessContentTotals(numRemainingTotals)

// we always want to update the conditions because if we have set a condition to "it worked" after it was previously, "it didn't work",
Expand Down
Expand Up @@ -992,6 +992,12 @@ func (e *Store) DeleteReturnsDeletedObject() bool {
// possibly with storage API, but watch is not delivered correctly then).
// It will be possible to fix it with v3 etcd API.
func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {

// TODO: Remove temporary logging for troubleshooting timeout flakes
startTime := time.Now()
itemCount := 0
defer klog.V(1).Infof("***BP*** store.DeleteCollection for %d items finished with duration = %v", itemCount, time.Since(startTime))

if listOptions == nil {
listOptions = &metainternalversion.ListOptions{}
} else {
Expand All @@ -1006,15 +1012,18 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali
if err != nil {
return nil, err
}

itemCount = len(items)

// Spawn a number of goroutines, so that we can issue requests to storage
// in parallel to speed up deletion.
// TODO: Make this proportional to the number of items to delete, up to
// DeleteCollectionWorkers (it doesn't make much sense to spawn 16
// workers to delete 10 items).
workersNumber := e.DeleteCollectionWorkers
if workersNumber < 1 {
workersNumber = 1
} else if workersNumber > len(items) {
workersNumber = len(items)
}

wg := sync.WaitGroup{}
toProcess := make(chan int, 2*workersNumber)
errs := make(chan error, workersNumber+1)
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/server/options/etcd.go
Expand Up @@ -67,7 +67,7 @@ func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
options := &EtcdOptions{
StorageConfig: *backendConfig,
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
DeleteCollectionWorkers: 5,
EnableGarbageCollection: true,
EnableWatchCache: true,
DefaultWatchCacheSize: 100,
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/storage/persistent_volumes-local.go
Expand Up @@ -646,6 +646,7 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
err error
)
pvc = e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, DirectoryLocalVolumeType), config.ns)
defer framework.ExpectNoError(e2epv.DeletePersistentVolumeClaim(config.client, pvc.Name, config.ns))
ginkgo.By(fmt.Sprintf("Create a PVC %s", pvc.Name))
pvc, err = e2epv.CreatePVC(config.client, config.ns, pvc)
framework.ExpectNoError(err)
Expand All @@ -663,6 +664,18 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
framework.ExpectNoError(err)
pods[pod.Name] = pod
}
defer func() {
var wg sync.WaitGroup
for _, pod := range pods {
pod := pod
wg.Add(1)
go func() {
defer wg.Done()
framework.ExpectNoError(e2epod.DeletePodWithWait(config.client, pod))
}()
}
wg.Wait()
}()
ginkgo.By("Wait for all pods are running")
const runningTimeout = 5 * time.Minute
waitErr := wait.PollImmediate(time.Second, runningTimeout, func() (done bool, err error) {
Expand Down

0 comments on commit a8e0268

Please sign in to comment.