Skip to content

Commit

Permalink
Merge pull request kubernetes#110125 from wojtek-t/fix_resource_quota…
Browse files Browse the repository at this point in the history
…_shutdown

Fix resource quota shutdown
  • Loading branch information
k8s-ci-robot committed May 23, 2022
2 parents cfd6946 + f8211d7 commit 1131fb9
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 41 deletions.
2 changes: 1 addition & 1 deletion plugin/pkg/admission/gc/gc_admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func newGCPermissionsEnforcement() (*gcPermissionsEnforcement, error) {
whiteList: whiteList,
}

genericPluginInitializer := initializer.New(nil, nil, fakeAuthorizer{}, nil)
genericPluginInitializer := initializer.New(nil, nil, fakeAuthorizer{}, nil, nil)
fakeDiscoveryClient := &fakediscovery.FakeDiscovery{Fake: &coretesting.Fake{}}
fakeDiscoveryClient.Resources = []*metav1.APIResourceList{
{
Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/admission/limitranger/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func newHandlerForTest(c clientset.Interface) (*LimitRanger, informers.SharedInf
if err != nil {
return nil, f, err
}
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.ValidateInitialization(handler)
return handler, f, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.MutationInterface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewProvision()
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.ValidateInitialization(handler)
return handler, f, err
Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/admission/namespace/exists/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func newHandlerForTest(c kubernetes.Interface) (admission.ValidationInterface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewExists()
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.ValidateInitialization(handler)
return handler, f, err
Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/admission/podnodeselector/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestHandles(t *testing.T) {
func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewPodNodeSelector(nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.ValidateInitialization(handler)
return handler, f, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInforme
return nil, nil, err
}
handler := NewPodTolerationsPlugin(pluginConfig)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.ValidateInitialization(handler)
return handler, f, err
Expand Down
15 changes: 10 additions & 5 deletions plugin/pkg/admission/resourcequota/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
resourcequotaapi "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota"
"k8s.io/client-go/informers"
Expand All @@ -36,6 +37,7 @@ import (
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/quota/v1/install"
)

Expand Down Expand Up @@ -99,15 +101,18 @@ func createHandlerWithConfig(kubeClient kubernetes.Interface, informerFactory in
}
quotaConfiguration := install.NewQuotaConfigurationForAdmission()

handler, err := resourcequota.NewResourceQuota(config, 5, stopCh)
handler, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
return nil, err
}
handler.SetExternalKubeClientSet(kubeClient)
handler.SetExternalKubeInformerFactory(informerFactory)
handler.SetQuotaConfiguration(quotaConfiguration)

return handler, nil
initializers := admission.PluginInitializers{
genericadmissioninitializer.New(kubeClient, informerFactory, nil, nil, stopCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, quotaConfiguration),
}
initializers.Initialize(handler)

return handler, admission.ValidateInitialization(handler)
}

// TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type pluginInitializer struct {
externalInformers informers.SharedInformerFactory
authorizer authorizer.Authorizer
featureGates featuregate.FeatureGate
stopCh <-chan struct{}
}

// New creates an instance of admission plugins initializer.
Expand All @@ -39,19 +40,26 @@ func New(
extInformers informers.SharedInformerFactory,
authz authorizer.Authorizer,
featureGates featuregate.FeatureGate,
stopCh <-chan struct{},
) pluginInitializer {
return pluginInitializer{
externalClient: extClientset,
externalInformers: extInformers,
authorizer: authz,
featureGates: featureGates,
stopCh: stopCh,
}
}

// Initialize checks the initialization interfaces implemented by a plugin
// and provide the appropriate initialization data
func (i pluginInitializer) Initialize(plugin admission.Interface) {
// First tell the plugin about enabled features, so it can decide whether to start informers or not
// First tell the plugin about drained notification, so it can pass it to further initializations.
if wants, ok := plugin.(WantsDrainedNotification); ok {
wants.SetDrainedNotification(i.stopCh)
}

// Second tell the plugin about enabled features, so it can decide whether to start informers or not
if wants, ok := plugin.(WantsFeatures); ok {
wants.InspectFeatureGates(i.featureGates)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// TestWantsAuthorizer ensures that the authorizer is injected
// when the WantsAuthorizer interface is implemented by a plugin.
func TestWantsAuthorizer(t *testing.T) {
target := initializer.New(nil, nil, &TestAuthorizer{}, nil)
target := initializer.New(nil, nil, &TestAuthorizer{}, nil, nil)
wantAuthorizerAdmission := &WantAuthorizerAdmission{}
target.Initialize(wantAuthorizerAdmission)
if wantAuthorizerAdmission.auth == nil {
Expand All @@ -44,7 +44,7 @@ func TestWantsAuthorizer(t *testing.T) {
// when the WantsExternalKubeClientSet interface is implemented by a plugin.
func TestWantsExternalKubeClientSet(t *testing.T) {
cs := &fake.Clientset{}
target := initializer.New(cs, nil, &TestAuthorizer{}, nil)
target := initializer.New(cs, nil, &TestAuthorizer{}, nil, nil)
wantExternalKubeClientSet := &WantExternalKubeClientSet{}
target.Initialize(wantExternalKubeClientSet)
if wantExternalKubeClientSet.cs != cs {
Expand All @@ -57,14 +57,26 @@ func TestWantsExternalKubeClientSet(t *testing.T) {
func TestWantsExternalKubeInformerFactory(t *testing.T) {
cs := &fake.Clientset{}
sf := informers.NewSharedInformerFactory(cs, time.Duration(1)*time.Second)
target := initializer.New(cs, sf, &TestAuthorizer{}, nil)
target := initializer.New(cs, sf, &TestAuthorizer{}, nil, nil)
wantExternalKubeInformerFactory := &WantExternalKubeInformerFactory{}
target.Initialize(wantExternalKubeInformerFactory)
if wantExternalKubeInformerFactory.sf != sf {
t.Errorf("expected informer factory to be initialized")
}
}

// TestWantsShutdownSignal ensures that the shutdown signal is injected
// when the WantsShutdownSignal interface is implemented by a plugin.
func TestWantsShutdownNotification(t *testing.T) {
stopCh := make(chan struct{})
target := initializer.New(nil, nil, &TestAuthorizer{}, nil, stopCh)
wantDrainedNotification := &WantDrainedNotification{}
target.Initialize(wantDrainedNotification)
if wantDrainedNotification.stopCh == nil {
t.Errorf("expected stopCh to be initialized but found nil")
}
}

// WantExternalKubeInformerFactory is a test stub that fulfills the WantsExternalKubeInformerFactory interface
type WantExternalKubeInformerFactory struct {
sf informers.SharedInformerFactory
Expand Down Expand Up @@ -114,6 +126,23 @@ func (self *WantAuthorizerAdmission) ValidateInitialization() error { retur
var _ admission.Interface = &WantAuthorizerAdmission{}
var _ initializer.WantsAuthorizer = &WantAuthorizerAdmission{}

// WantDrainedNotification is a test stub that filfills the WantsDrainedNotification interface.
type WantDrainedNotification struct {
stopCh <-chan struct{}
}

func (self *WantDrainedNotification) SetDrainedNotification(stopCh <-chan struct{}) {
self.stopCh = stopCh
}
func (self *WantDrainedNotification) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
return nil
}
func (self *WantDrainedNotification) Handles(o admission.Operation) bool { return false }
func (self *WantDrainedNotification) ValidateInitialization() error { return nil }

var _ admission.Interface = &WantDrainedNotification{}
var _ initializer.WantsDrainedNotification = &WantDrainedNotification{}

// TestAuthorizer is a test stub that fulfills the WantsAuthorizer interface.
type TestAuthorizer struct{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ type WantsQuotaConfiguration interface {
admission.InitializationValidator
}

// WantsDrainedNotification defines a function which sets the notification of where the apiserver
// has already been drained for admission plugins that need it.
// After receiving that notification, Admit/Validate calls won't be called anymore.
type WantsDrainedNotification interface {
SetDrainedNotification(<-chan struct{})
admission.InitializationValidator
}

// WantsFeatureGate defines a function which passes the featureGates for inspection by an admission plugin.
// Admission plugins should not hold a reference to the featureGates. Instead, they should query a particular one
// and assign it to a simple bool in the admission plugin struct.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (
if err != nil {
return nil, f, err
}
pluginInitializer := kubeadmission.New(c, f, nil, nil)
pluginInitializer := kubeadmission.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.ValidateInitialization(handler)
return handler, f, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import (
// PluginName is a string with the name of the plugin
const PluginName = "ResourceQuota"

var namespaceGVK = v1.SchemeGroupVersion.WithKind("Namespace").GroupKind()
var (
namespaceGVK = v1.SchemeGroupVersion.WithKind("Namespace").GroupKind()
stopChUnconfiguredErr = fmt.Errorf("quota configuration configured between stop channel")
)

// Register registers a plugin
func Register(plugins *admission.Plugins) {
Expand All @@ -54,7 +57,7 @@ func Register(plugins *admission.Plugins) {
return nil, errs.ToAggregate()
}
}
return NewResourceQuota(configuration, 5, make(chan struct{}))
return NewResourceQuota(configuration, 5)
})
}

Expand All @@ -67,12 +70,14 @@ type QuotaAdmission struct {
numEvaluators int
quotaAccessor *quotaAccessor
evaluator Evaluator
initializationErr error
}

var _ admission.ValidationInterface = &QuotaAdmission{}
var _ = genericadmissioninitializer.WantsExternalKubeInformerFactory(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsExternalKubeClientSet(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsQuotaConfiguration(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsDrainedNotification(&QuotaAdmission{})

type liveLookupEntry struct {
expiry time.Time
Expand All @@ -82,21 +87,26 @@ type liveLookupEntry struct {
// NewResourceQuota configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewResourceQuota(config *resourcequotaapi.Configuration, numEvaluators int, stopCh <-chan struct{}) (*QuotaAdmission, error) {
func NewResourceQuota(config *resourcequotaapi.Configuration, numEvaluators int) (*QuotaAdmission, error) {
quotaAccessor, err := newQuotaAccessor()
if err != nil {
return nil, err
}

return &QuotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
stopCh: stopCh,
stopCh: nil,
numEvaluators: numEvaluators,
config: config,
quotaAccessor: quotaAccessor,
}, nil
}

// SetDrainedNotification sets the stop channel into QuotaAdmission.
func (a *QuotaAdmission) SetDrainedNotification(stopCh <-chan struct{}) {
a.stopCh = stopCh
}

// SetExternalKubeClientSet registers the client into QuotaAdmission
func (a *QuotaAdmission) SetExternalKubeClientSet(client kubernetes.Interface) {
a.quotaAccessor.client = client
Expand All @@ -110,11 +120,21 @@ func (a *QuotaAdmission) SetExternalKubeInformerFactory(f informers.SharedInform
// SetQuotaConfiguration assigns and initializes configuration and evaluator for QuotaAdmission
func (a *QuotaAdmission) SetQuotaConfiguration(c quota.Configuration) {
a.quotaConfiguration = c
if a.stopCh == nil {
a.initializationErr = stopChUnconfiguredErr
return
}
a.evaluator = NewQuotaEvaluator(a.quotaAccessor, a.quotaConfiguration.IgnoredResources(), generic.NewRegistry(a.quotaConfiguration.Evaluators()), nil, a.config, a.numEvaluators, a.stopCh)
}

// ValidateInitialization ensures an authorizer is set.
func (a *QuotaAdmission) ValidateInitialization() error {
if a.initializationErr != nil {
return a.initializationErr
}
if a.stopCh == nil {
return fmt.Errorf("missing stopCh")
}
if a.quotaAccessor == nil {
return fmt.Errorf("missing quotaAccessor")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
v1 "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/v1"
"k8s.io/apiserver/pkg/quota/v1/generic"
)

func TestPrettyPrint(t *testing.T) {
Expand Down Expand Up @@ -161,3 +162,14 @@ func TestExcludedOperations(t *testing.T) {
}
}
}

func TestInitializationOrder(t *testing.T) {
a := &QuotaAdmission{}

qca := generic.NewConfiguration(nil, nil)
a.SetQuotaConfiguration(qca)

if err := a.ValidateInitialization(); err != stopChUnconfiguredErr {
t.Errorf("unexpected error: %v", err)
}
}
5 changes: 2 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/server/options/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ func (a *AdmissionOptions) ApplyTo(
if err != nil {
return err
}
genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
initializersChain := admission.PluginInitializers{}
pluginInitializers = append(pluginInitializers, genericInitializer)
genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features, c.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, pluginInitializers...)

admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
Expand Down

0 comments on commit 1131fb9

Please sign in to comment.