From f2c5dc7160dbd9cc0960e1c0cbb66ea2556c11d7 Mon Sep 17 00:00:00 2001 From: Maciej Szulik Date: Mon, 24 May 2021 15:34:20 +0200 Subject: [PATCH] UPSTREAM: : Revert "UPSTREAM: 97323: fix the deadlock in priority and fairness config controller" This reverts commit 5f545b11026a71a4756c530b2a118dafe06bab44. --- .../apiserver/pkg/util/flowcontrol/BUILD | 1 - .../pkg/util/flowcontrol/apf_controller.go | 17 +-- .../pkg/util/flowcontrol/controller_test.go | 102 ------------------ 3 files changed, 1 insertion(+), 119 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD index 84af7fe0e2754..52ec0b6a57df4 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/BUILD @@ -78,7 +78,6 @@ go_test( "//staging/src/k8s.io/api/flowcontrol/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index ef55aa2f07549..2f93df73d359a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/labels" apitypes "k8s.io/apimachinery/pkg/types" apierrors "k8s.io/apimachinery/pkg/util/errors" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/authentication/user" @@ -245,28 +244,14 @@ func (cfgCtlr *configController) updateObservations() { } } -// used from the unit tests only. -func (cfgCtlr *configController) getPriorityLevelState(plName string) *priorityLevelState { - cfgCtlr.lock.Lock() - defer cfgCtlr.lock.Unlock() - return cfgCtlr.priorityLevelStates[plName] -} - func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { - defer utilruntime.HandleCrash() - - // Let the config worker stop when we are done defer cfgCtlr.configQueue.ShutDown() - klog.Info("Starting API Priority and Fairness config controller") if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok { return fmt.Errorf("Never achieved initial sync") } - klog.Info("Running API Priority and Fairness config worker") - go wait.Until(cfgCtlr.runWorker, time.Second, stopCh) - - <-stopCh + wait.Until(cfgCtlr.runWorker, time.Second, stopCh) klog.Info("Shutting down API Priority and Fairness config worker") return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index b230bbd2e7197..701bfd92e9c99 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -21,15 +21,12 @@ import ( "fmt" "math/rand" "os" - "reflect" "sync" "testing" "time" flowcontrol "k8s.io/api/flowcontrol/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" @@ -310,105 +307,6 @@ func TestConfigConsumer(t *testing.T) { } } -func TestAPFControllerWithGracefulShutdown(t *testing.T) { - const plName = "test-ps" - fs := &flowcontrol.FlowSchema{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-fs", - }, - Spec: flowcontrol.FlowSchemaSpec{ - MatchingPrecedence: 100, - PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ - Name: plName, - }, - DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ - Type: flowcontrol.FlowDistinguisherMethodByUserType, - }, - }, - } - - pl := &flowcontrol.PriorityLevelConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: plName, - }, - Spec: flowcontrol.PriorityLevelConfigurationSpec{ - Type: flowcontrol.PriorityLevelEnablementLimited, - Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: 10, - LimitResponse: flowcontrol.LimitResponse{ - Type: flowcontrol.LimitResponseTypeReject, - }, - }, - }, - } - - clientset := clientsetfake.NewSimpleClientset(fs, pl) - informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) - flowcontrolClient := clientset.FlowcontrolV1beta1() - cts := &ctlrTestState{t: t, - fcIfc: flowcontrolClient, - existingFSs: map[string]*flowcontrol.FlowSchema{}, - existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{}, - heldRequestsMap: map[string][]heldRequest{}, - queues: map[string]*ctlrTestQueueSet{}, - } - controller := newTestableController( - informerFactory, - flowcontrolClient, - 100, - time.Minute, - metrics.PriorityLevelConcurrencyObserverPairGenerator, - cts) - - stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{}) - var controllerErr error - - informerFactory.Start(stopCh) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - status := informerFactory.WaitForCacheSync(ctx.Done()) - if names := unsynced(status); len(names) > 0 { - t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) - } - - go func() { - defer close(controllerCompletedCh) - controllerErr = controller.Run(stopCh) - }() - - // ensure that the controller has run its first loop. - err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { - if controller.getPriorityLevelState(plName) == nil { - return false, nil - } - return true, nil - }) - if err != nil { - t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plName, err) - } - - close(stopCh) - t.Log("waiting for the controller Run function to shutdown gracefully") - <-controllerCompletedCh - - if controllerErr != nil { - t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr) - } -} - -func unsynced(status map[reflect.Type]bool) []string { - names := make([]string, 0) - - for objType, synced := range status { - if !synced { - names = append(names, objType.Name()) - } - } - - return names -} - func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) { t := cts.t ctlr := cts.cfgCtlr