Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: add limited resource for critical pod in all namespaces - part1 #82575

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/registry/scheduling/rest/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/apis/scheduling/v1alpha1:go_default_library",
"//pkg/apis/scheduling/v1beta1:go_default_library",
"//pkg/registry/scheduling/priorityclass/storage:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
Expand All @@ -24,6 +25,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/scheduling/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
Expand Down
132 changes: 101 additions & 31 deletions pkg/registry/scheduling/rest/storage_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/klog"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -30,6 +31,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
clientset "k8s.io/client-go/kubernetes"
schedulingclient "k8s.io/client-go/kubernetes/typed/scheduling/v1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/scheduling"
Expand All @@ -39,7 +41,13 @@ import (
priorityclassstore "k8s.io/kubernetes/pkg/registry/scheduling/priorityclass/storage"
)

const PostStartHookName = "scheduling/bootstrap-system-priority-classes"
const (
PostStartHookName = "scheduling/bootstrap-scheduler-defaults"

// DefaultSystemQuotaName is the name of default system quota which allows unlimited
// system critical pods to be created
DefaultSystemQuotaName = "default-system-quota"
)

type RESTStorageProvider struct{}

Expand Down Expand Up @@ -108,48 +116,110 @@ func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.API
}

func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return PostStartHookName, AddSystemPriorityClasses(), nil
return PostStartHookName, addSchedulingDefaults(), nil
}

func AddSystemPriorityClasses() genericapiserver.PostStartHookFunc {
func (p RESTStorageProvider) GroupName() string {
return scheduling.GroupName
}

// addSchedulingDefaults adds the default cluster critical priorityClasses and clusterResourceQuota which allows
// unlimited number of critical pods be created in `kube-system` namespace.
func addSchedulingDefaults() genericapiserver.PostStartHookFunc {
return func(hookContext genericapiserver.PostStartHookContext) error {
// Adding system priority classes is important. If they fail to add, many critical system
// components may fail and cluster may break.
err := wait.Poll(1*time.Second, 30*time.Second, func() (done bool, err error) {
schedClientSet, err := schedulingclient.NewForConfig(hookContext.LoopbackClientConfig)
if err := addSystemPriorityClasses(hookContext); err != nil {
return fmt.Errorf("unable to add default system priority classes: %v", err)
}
// Add default resource quota which allows unlimited number of critical pods to be created in kube-system
// namespace. This is to ensure backwards compatibility with existing system where we limit critical pods
// to be created in `kube-system`.
if err := addDefaultSystemQuota(hookContext); err != nil {
return fmt.Errorf("unable to add default system resource quota: %v", err)
}
return nil
}
}

func addSystemPriorityClasses(hookContext genericapiserver.PostStartHookContext) error {
return wait.Poll(1*time.Second, 30*time.Second, func() (done bool, err error) {
schedClientSet, err := schedulingclient.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to initialize client: %v", err))
return false, nil
}

for _, pc := range schedulingapiv1.SystemPriorityClasses() {
_, err := schedClientSet.PriorityClasses().Get(pc.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := schedClientSet.PriorityClasses().Create(pc)
if err != nil && !apierrors.IsAlreadyExists(err) {
return false, err
}

klog.Infof("created PriorityClass %s with value %v", pc.Name, pc.Value)
}

if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to initialize client: %v", err))
// Unable to get the priority class for reasons other than "not found".
klog.Warningf("unable to get PriorityClass %v: %v. Retrying...", pc.Name, err)
return false, nil
}
}
klog.Infof("all system priority classes are created successfully or already exist.")
return true, nil
})
}

for _, pc := range schedulingapiv1.SystemPriorityClasses() {
_, err := schedClientSet.PriorityClasses().Get(pc.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_, err := schedClientSet.PriorityClasses().Create(pc)
if err != nil && !apierrors.IsAlreadyExists(err) {
return false, err
} else {
klog.Infof("created PriorityClass %s with value %v", pc.Name, pc.Value)
}
} else {
// Unable to get the priority class for reasons other than "not found".
klog.Warningf("unable to get PriorityClass %v: %v. Retrying...", pc.Name, err)
return false, nil
}
}
// addDefaultSystemQuota adds a quota which allows unlimited critical pods to be created in `kube-system` namespace
// This quota can be deleted/updated by cluster-admin. If the cluster-admin wants critical pods to be created in
// namespace other than `kube-system`, he/she can configure quotas which allows these critical pods to be created
// in that namespace.
func addDefaultSystemQuota(hookContext genericapiserver.PostStartHookContext) error {
return wait.Poll(1*time.Second, 30*time.Second, func() (done bool, err error) {
client := clientset.NewForConfigOrDie(hookContext.LoopbackClientConfig)
_, err = client.CoreV1().ResourceQuotas(metav1.NamespaceSystem).Get(DefaultSystemQuotaName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := client.CoreV1().ResourceQuotas(metav1.NamespaceSystem).Create(systemDefaultQuota())
if err != nil && !apierrors.IsAlreadyExists(err) {
klog.V(5).Infof("Unable to create system default resourceQuota %v: %v. Retrying..", DefaultSystemQuotaName, err)
return false, nil
}
klog.Infof("all system priority classes are created successfully or already exist.")
return true, nil
})
// if we're never able to make it through initialization, kill the API server.

klog.V(5).Infof("Created defaultResourceQuota %v which allows unlimited critical pods to be created in kube-system namespace", DefaultSystemQuotaName)
}

if err != nil {
return fmt.Errorf("unable to add default system priority classes: %v", err)
// Unable to get the default resource quota for reasons other than "not found".
klog.Warningf("Unable to get system default resourceQuota %v: %v. Retrying..", DefaultSystemQuotaName, err)
return false, nil
}
return nil
}
klog.Info("Required default resource quota is created or already exist")
return true, nil
})
}

func (p RESTStorageProvider) GroupName() string {
return scheduling.GroupName
// systemDefaultQuota returns a default system quota. This default quota allows unlimited number of critical pods
// to be created in `kube-system` namespace. This is needed for backwards compatibility with current system
// as we allow unlimited critical pods to be created in `kube-system` namespace. The DefaultSystemQuotaName gets
// automatically created when we start kube-apiserver. The cluster-admin is allowed to create/delete or update
// this quota. In order to create critical pods in other namespaces, cluster-admin can create quotas
// in those namespaces.
func systemDefaultQuota() *v1.ResourceQuota {
defaultQuota := &v1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{Name: DefaultSystemQuotaName, Namespace: metav1.NamespaceSystem},
Spec: v1.ResourceQuotaSpec{
ScopeSelector: &v1.ScopeSelector{
MatchExpressions: []v1.ScopedResourceSelectorRequirement{
{
ScopeName: v1.ResourceQuotaScopePriorityClass,
Operator: v1.ScopeSelectorOpIn,
Values: []string{"system-cluster-critical", "system-node-critical"},
},
},
},
},
}
return defaultQuota
}
23 changes: 17 additions & 6 deletions plugin/pkg/admission/priority/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Plugin struct {
*admission.Handler
client kubernetes.Interface
lister schedulingv1listers.PriorityClassLister
// We are initializing them here because of performance impact of checking featuregates.
resourceQuotaFeatureGateEnabled bool
}

var _ admission.MutationInterface = &Plugin{}
Expand All @@ -66,7 +68,8 @@ var _ = genericadmissioninitializers.WantsExternalKubeClientSet(&Plugin{})
// NewPlugin creates a new priority admission plugin.
func NewPlugin() *Plugin {
return &Plugin{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
resourceQuotaFeatureGateEnabled: utilfeature.DefaultFeatureGate.Enabled(features.ResourceQuotaScopeSelectors),
}
}

Expand Down Expand Up @@ -177,19 +180,27 @@ func (p *Plugin) admitPod(a admission.Attributes) error {
}

if operation == admission.Create {
var priority int32
var preemptionPolicy *apiv1.PreemptionPolicy
var (
priority int32
preemptionPolicy *apiv1.PreemptionPolicy
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugly

if len(pod.Spec.PriorityClassName) == 0 {
var err error
var pcName string
var (
err error
pcName string
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugly


pcName, priority, preemptionPolicy, err = p.getDefaultPriority()
if err != nil {
return fmt.Errorf("failed to get default priority class: %v", err)
}

pod.Spec.PriorityClassName = pcName
} else {
pcName := pod.Spec.PriorityClassName
if !priorityClassPermittedInNamespace(pcName, a.GetNamespace()) {
// If ResourceQuotaScopeSelectors is enabled, we should let pods with critical priorityClass to be created
// any namespace where administrator wants it to be created.
if !p.resourceQuotaFeatureGateEnabled && !priorityClassPermittedInNamespace(pcName, a.GetNamespace()) {
return admission.NewForbidden(a, fmt.Errorf("pods with %v priorityClass is not permitted in %v namespace", pcName, a.GetNamespace()))
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/admission/priority/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func TestPodAdmission(t *testing.T) {
[]*scheduling.PriorityClass{systemClusterCritical},
*pods[7],
scheduling.SystemCriticalPriority,
true,
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

nil,
},
{
Expand Down
1 change: 0 additions & 1 deletion plugin/pkg/admission/resourcequota/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ go_library(
"//pkg/quota/v1/generic:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/install:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/v1beta1:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/validation:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
42 changes: 32 additions & 10 deletions plugin/pkg/admission/resourcequota/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"io"
"io/ioutil"

v1 "k8s.io/api/core/v1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corev1

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/install"
resourcequotav1beta1 "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/v1beta1"
)

var (
Expand All @@ -37,19 +37,38 @@ func init() {
install.Install(scheme)
}

// AddCriticalPodLimitedResources limits the number of critical pods that can be created. We're creating a default
// resource quota in `kube-system` namespace to allow unlimited number of critical pods to be created in that
// namespace. Making this function public for easy testing
func AddCriticalPodLimitedResources() []resourcequotaapi.LimitedResource {
return []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchScopes: []v1.ScopedResourceSelectorRequirement{
{
ScopeName: v1.ResourceQuotaScopePriorityClass,
Operator: v1.ScopeSelectorOpIn,
Values: []string{"system-cluster-critical", "system-node-critical"},
},
},
},
}
}

// LoadConfiguration loads the provided configuration.
func LoadConfiguration(config io.Reader) (*resourcequotaapi.Configuration, error) {
// if no config is provided, return a default configuration
// if no config is provided, return a default configuration with critical pods as limited resources
if config == nil {
externalConfig := &resourcequotav1beta1.Configuration{}
scheme.Default(externalConfig)
internalConfig := &resourcequotaapi.Configuration{}
if err := scheme.Convert(externalConfig, internalConfig, nil); err != nil {
return nil, err
}
return internalConfig, nil
config := &resourcequotaapi.Configuration{}
scheme.Default(config)
// append pods as limited resources so that we can limit the number of critical pods to be created. We have
// a matching default quota in `kube-system` namespace which allows unlimited pods to be created in that
// namespace
config.LimitedResources = append(config.LimitedResources, AddCriticalPodLimitedResources()...)
return config, nil
}
// we have a config so parse it.

// we have a config so parse it and limit the critical pods that can be created
data, err := ioutil.ReadAll(config)
if err != nil {
return nil, err
Expand All @@ -63,5 +82,8 @@ func LoadConfiguration(config io.Reader) (*resourcequotaapi.Configuration, error
if !ok {
return nil, fmt.Errorf("unexpected type: %T", decodedObj)
}
// append pods as limited resources so that we can limit the number of critical pods to be created. We have
// a matching default quota in `kube-system` namespace which allows unlimited pods to be created in that namespace
resourceQuotaConfiguration.LimitedResources = append(resourceQuotaConfiguration.LimitedResources, AddCriticalPodLimitedResources()...)
return resourceQuotaConfiguration, nil
}
5 changes: 3 additions & 2 deletions test/integration/master/graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func TestGracefulShutdown(t *testing.T) {
}
body.Close()
respErr := <-respErrCh
if err != nil {
t.Fatal(err)
t.Logf("respErr %v", respErr)
if respErr.err != nil {
t.Fatal(respErr.err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this into a separate PR? It was obviously wrong.

}
defer respErr.resp.Body.Close()
bs, err := ioutil.ReadAll(respErr.resp.Body)
Expand Down