Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

federation: Add admission controller for policy-based placement #44786

Merged
merged 1 commit into from Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions federation/BUILD
Expand Up @@ -30,6 +30,7 @@ filegroup(
"//federation/pkg/federatedtypes:all-srcs",
"//federation/pkg/federation-controller:all-srcs",
"//federation/pkg/kubefed:all-srcs",
"//federation/plugin/pkg/admission/schedulingpolicy:all-srcs",
"//federation/registry/cluster:all-srcs",
],
tags = ["automanaged"],
Expand Down
1 change: 1 addition & 0 deletions federation/cmd/federation-apiserver/app/BUILD
Expand Up @@ -28,6 +28,7 @@ go_library(
"//federation/apis/federation/install:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/cmd/federation-apiserver/app/options:go_default_library",
"//federation/plugin/pkg/admission/schedulingpolicy:go_default_library",
"//federation/registry/cluster/etcd:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions federation/cmd/federation-apiserver/app/plugins.go
Expand Up @@ -25,6 +25,7 @@ import (

// Admission policies
"k8s.io/apiserver/pkg/admission"
"k8s.io/kubernetes/federation/plugin/pkg/admission/schedulingpolicy"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
"k8s.io/kubernetes/plugin/pkg/admission/deny"
"k8s.io/kubernetes/plugin/pkg/admission/gc"
Expand All @@ -37,4 +38,5 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) {
deny.Register(plugins)
gc.Register(plugins)
initialization.Register(plugins)
schedulingpolicy.Register(plugins)
}
70 changes: 70 additions & 0 deletions federation/plugin/pkg/admission/schedulingpolicy/BUILD
@@ -0,0 +1,70 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)

go_test(
name = "go_default_test",
srcs = [
"admission_test.go",
"merge_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)

go_library(
name = "go_default_library",
srcs = [
"admission.go",
"merge.go",
"query.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/ref:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
213 changes: 213 additions & 0 deletions federation/plugin/pkg/admission/schedulingpolicy/admission.go
@@ -0,0 +1,213 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package schedulingpolicy implements a webhook that queries an external API
// to obtain scheduling decisions for Federated sources.
package schedulingpolicy

import (
"fmt"
"io"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/ref"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)

const (
pluginName = "SchedulingPolicy"
configKey = "schedulingPolicy"
policyConfigMapNamespace = "kube-federation-scheduling-policy"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didnt we decide to keep system in the name?
"federation-system-scheduling-policy" maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what you want to do regarding the name. In kubernetes/community#292 this one was suggested and specified in the proposal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, reread the discussion there, thanks for the link.
lg.


// Default backoff delay for policy engine query retries. The actual
// backoff implementation is handled by k8s.io/apiserver/pkg/util/webhook.
// If the admission controller config file does not specify a backoff, this
// one is used.
defaultRetryBackoff = time.Millisecond * 100
)

type admissionConfig struct {
Kubeconfig string `json:"kubeconfig"`
RetryBackoff time.Duration `json:"retryBackoff"`
}

type admissionController struct {
*admission.Handler
policyEngineClient *rest.RESTClient // client to communicate with policy engine
policyEngineRetryBackoff time.Duration // backoff for policy engine queries
client internalclientset.Interface // client to communicate with federation-apiserver
}

// Register registers the plugin.
func Register(plugins *admission.Plugins) {
plugins.Register(pluginName, func(file io.Reader) (admission.Interface, error) {
return newAdmissionController(file)
})
}

func newAdmissionController(file io.Reader) (*admissionController, error) {
config, err := loadConfig(file)
if err != nil {
return nil, err
}

policyEngineClient, err := loadRestClient(config.Kubeconfig)
if err != nil {
return nil, err
}

c := &admissionController{
Handler: admission.NewHandler(admission.Create, admission.Update),
policyEngineClient: policyEngineClient,
policyEngineRetryBackoff: config.RetryBackoff,
}

return c, nil
}

func (c *admissionController) Validate() error {
if c.client == nil {
return fmt.Errorf("%s requires a client", pluginName)
}
return nil
}

func (c *admissionController) SetInternalKubeClientSet(client internalclientset.Interface) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why internal clientset and not the versioned clientset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had updated all our controllers to use versioned clientset instead of internal ones.
Looks like we still need to do that work for our admission controllers.
cc @caesarxuchao to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nikhiljindal. It's ok for admission controllers to use the internal clientset. The admission controller runs in the same process as the apiserver, and iirc objects are supplied by the apiserver to the admission controller, and the objects are already converted to the internal version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline with @caesarxuchao, admission controllers (like this one) also talk to apiserver directly to fetch other resources (like configmaps and events in this case).
Admission controllers should be using versioned clientsets for that.

This PR is fine as is since we do not have a way to provide versioned clients today, but adding support for it should not be too difficult. We can add another method SetVersionedKubeClientSet similar to SetInternalKubeClientSet.
This will become critical as we move admission controllers out of repo without moving the internal API structs.

c.client = client
}

func (c *admissionController) Admit(a admission.Attributes) (err error) {
exists, err := c.policyExists()
if err != nil {
return c.handleError(a, err)
}

if !exists {
return nil
}

obj := a.GetObject()
decision, err := newPolicyEngineQuery(c.policyEngineClient, c.policyEngineRetryBackoff, obj, a.GetKind()).Do()

if err != nil {
return c.handleError(a, err)
}

if err := decision.Error(); err != nil {
return c.handleError(a, err)
}

mergeAnnotations(obj, decision.Annotations)

return nil
}

func (c *admissionController) handleError(a admission.Attributes, err error) error {

c.publishEvent(a, err.Error())

return admission.NewForbidden(a, err)
}

func (c *admissionController) publishEvent(a admission.Attributes, msg string) {

obj := a.GetObject()

ref, err := ref.GetReference(api.Scheme, obj)
if err != nil {
runtime.HandleError(err)
return
}

event := &api.Event{
InvolvedObject: *ref,
Message: msg,
Source: api.EventSource{
Component: fmt.Sprintf("schedulingpolicy"),
},
Type: "Warning",
}

if _, err := c.client.Core().Events(a.GetNamespace()).Create(event); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our controllers we use EventRecorder. We can use the same here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, I will look into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make the admission controller depend on the controller manager code. Is this desirable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the EventRecorder utility is not compatible with the internalclientset. Let me know how we should proceed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh, ok.

runtime.HandleError(err)
return
}
}

func (c *admissionController) policyExists() (bool, error) {
lst, err := c.client.Core().ConfigMaps(policyConfigMapNamespace).List(metav1.ListOptions{})
if err != nil {
return true, err
}
return len(lst.Items) > 0, nil
}

func loadConfig(file io.Reader) (*admissionConfig, error) {
var cfg admissionConfig
if file == nil {
return nil, fmt.Errorf("--admission-control-config-file not specified or invalid")
}

if err := yaml.NewYAMLOrJSONDecoder(file, 4096).Decode(&cfg); err != nil {
return nil, err
}

if len(cfg.Kubeconfig) == 0 {
return nil, fmt.Errorf("kubeconfig path must not be empty")
}

if cfg.RetryBackoff == 0 {
cfg.RetryBackoff = defaultRetryBackoff
} else {
// Scale up value from config (which is unmarshalled as ns).
cfg.RetryBackoff *= time.Millisecond
}

if cfg.RetryBackoff.Nanoseconds() < 0 {
return nil, fmt.Errorf("retryBackoff must not be negative")
}

return &cfg, nil
}

func loadRestClient(kubeConfigFile string) (*rest.RESTClient, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about being generic shareable code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about admission controllers handling their own config.


loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = kubeConfigFile
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{})

clientConfig, err := loader.ClientConfig()
if err != nil {
return nil, err
}

clientConfig.ContentConfig.NegotiatedSerializer = dynamic.ContentConfig().NegotiatedSerializer

restClient, err := rest.UnversionedRESTClientFor(clientConfig)
if err != nil {
return nil, err
}

return restClient, nil
}