diff --git a/federation/BUILD b/federation/BUILD index b529dba87735..f7b6f5d169c0 100644 --- a/federation/BUILD +++ b/federation/BUILD @@ -30,6 +30,7 @@ filegroup( "//federation/pkg/federatedtypes:all-srcs", "//federation/pkg/federation-controller:all-srcs", "//federation/pkg/kubefed:all-srcs", + "//federation/plugin/pkg/admission/schedulingpolicy:all-srcs", "//federation/registry/cluster:all-srcs", ], tags = ["automanaged"], diff --git a/federation/cmd/federation-apiserver/app/BUILD b/federation/cmd/federation-apiserver/app/BUILD index 830442e65d1e..4da80885a90b 100644 --- a/federation/cmd/federation-apiserver/app/BUILD +++ b/federation/cmd/federation-apiserver/app/BUILD @@ -28,6 +28,7 @@ go_library( "//federation/apis/federation/install:go_default_library", "//federation/apis/federation/v1beta1:go_default_library", "//federation/cmd/federation-apiserver/app/options:go_default_library", + "//federation/plugin/pkg/admission/schedulingpolicy:go_default_library", "//federation/registry/cluster/etcd:go_default_library", "//pkg/api:go_default_library", "//pkg/api/install:go_default_library", diff --git a/federation/cmd/federation-apiserver/app/plugins.go b/federation/cmd/federation-apiserver/app/plugins.go index 168a0ef6e647..31dbf31ccbf8 100644 --- a/federation/cmd/federation-apiserver/app/plugins.go +++ b/federation/cmd/federation-apiserver/app/plugins.go @@ -25,6 +25,7 @@ import ( // Admission policies "k8s.io/apiserver/pkg/admission" + "k8s.io/kubernetes/federation/plugin/pkg/admission/schedulingpolicy" "k8s.io/kubernetes/plugin/pkg/admission/admit" "k8s.io/kubernetes/plugin/pkg/admission/deny" "k8s.io/kubernetes/plugin/pkg/admission/gc" @@ -37,4 +38,5 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) { deny.Register(plugins) gc.Register(plugins) initialization.Register(plugins) + schedulingpolicy.Register(plugins) } diff --git a/federation/plugin/pkg/admission/schedulingpolicy/BUILD b/federation/plugin/pkg/admission/schedulingpolicy/BUILD new file mode 100644 index 000000000000..a1e7224f6d74 --- /dev/null +++ b/federation/plugin/pkg/admission/schedulingpolicy/BUILD @@ -0,0 +1,70 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = [ + "admission_test.go", + "merge_test.go", + ], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/apis/extensions/v1beta1:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "admission.go", + "merge.go", + "query.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/ref:go_default_library", + "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/yaml:go_default_library", + "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", + "//vendor/k8s.io/client-go/dynamic:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/federation/plugin/pkg/admission/schedulingpolicy/admission.go b/federation/plugin/pkg/admission/schedulingpolicy/admission.go new file mode 100644 index 000000000000..0397554bc768 --- /dev/null +++ b/federation/plugin/pkg/admission/schedulingpolicy/admission.go @@ -0,0 +1,213 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package schedulingpolicy implements a webhook that queries an external API +// to obtain scheduling decisions for Federated sources. +package schedulingpolicy + +import ( + "fmt" + "io" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/apiserver/pkg/admission" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/ref" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" +) + +const ( + pluginName = "SchedulingPolicy" + configKey = "schedulingPolicy" + policyConfigMapNamespace = "kube-federation-scheduling-policy" + + // Default backoff delay for policy engine query retries. The actual + // backoff implementation is handled by k8s.io/apiserver/pkg/util/webhook. + // If the admission controller config file does not specify a backoff, this + // one is used. + defaultRetryBackoff = time.Millisecond * 100 +) + +type admissionConfig struct { + Kubeconfig string `json:"kubeconfig"` + RetryBackoff time.Duration `json:"retryBackoff"` +} + +type admissionController struct { + *admission.Handler + policyEngineClient *rest.RESTClient // client to communicate with policy engine + policyEngineRetryBackoff time.Duration // backoff for policy engine queries + client internalclientset.Interface // client to communicate with federation-apiserver +} + +// Register registers the plugin. +func Register(plugins *admission.Plugins) { + plugins.Register(pluginName, func(file io.Reader) (admission.Interface, error) { + return newAdmissionController(file) + }) +} + +func newAdmissionController(file io.Reader) (*admissionController, error) { + config, err := loadConfig(file) + if err != nil { + return nil, err + } + + policyEngineClient, err := loadRestClient(config.Kubeconfig) + if err != nil { + return nil, err + } + + c := &admissionController{ + Handler: admission.NewHandler(admission.Create, admission.Update), + policyEngineClient: policyEngineClient, + policyEngineRetryBackoff: config.RetryBackoff, + } + + return c, nil +} + +func (c *admissionController) Validate() error { + if c.client == nil { + return fmt.Errorf("%s requires a client", pluginName) + } + return nil +} + +func (c *admissionController) SetInternalKubeClientSet(client internalclientset.Interface) { + c.client = client +} + +func (c *admissionController) Admit(a admission.Attributes) (err error) { + exists, err := c.policyExists() + if err != nil { + return c.handleError(a, err) + } + + if !exists { + return nil + } + + obj := a.GetObject() + decision, err := newPolicyEngineQuery(c.policyEngineClient, c.policyEngineRetryBackoff, obj, a.GetKind()).Do() + + if err != nil { + return c.handleError(a, err) + } + + if err := decision.Error(); err != nil { + return c.handleError(a, err) + } + + mergeAnnotations(obj, decision.Annotations) + + return nil +} + +func (c *admissionController) handleError(a admission.Attributes, err error) error { + + c.publishEvent(a, err.Error()) + + return admission.NewForbidden(a, err) +} + +func (c *admissionController) publishEvent(a admission.Attributes, msg string) { + + obj := a.GetObject() + + ref, err := ref.GetReference(api.Scheme, obj) + if err != nil { + runtime.HandleError(err) + return + } + + event := &api.Event{ + InvolvedObject: *ref, + Message: msg, + Source: api.EventSource{ + Component: fmt.Sprintf("schedulingpolicy"), + }, + Type: "Warning", + } + + if _, err := c.client.Core().Events(a.GetNamespace()).Create(event); err != nil { + runtime.HandleError(err) + return + } +} + +func (c *admissionController) policyExists() (bool, error) { + lst, err := c.client.Core().ConfigMaps(policyConfigMapNamespace).List(metav1.ListOptions{}) + if err != nil { + return true, err + } + return len(lst.Items) > 0, nil +} + +func loadConfig(file io.Reader) (*admissionConfig, error) { + var cfg admissionConfig + if file == nil { + return nil, fmt.Errorf("--admission-control-config-file not specified or invalid") + } + + if err := yaml.NewYAMLOrJSONDecoder(file, 4096).Decode(&cfg); err != nil { + return nil, err + } + + if len(cfg.Kubeconfig) == 0 { + return nil, fmt.Errorf("kubeconfig path must not be empty") + } + + if cfg.RetryBackoff == 0 { + cfg.RetryBackoff = defaultRetryBackoff + } else { + // Scale up value from config (which is unmarshalled as ns). + cfg.RetryBackoff *= time.Millisecond + } + + if cfg.RetryBackoff.Nanoseconds() < 0 { + return nil, fmt.Errorf("retryBackoff must not be negative") + } + + return &cfg, nil +} + +func loadRestClient(kubeConfigFile string) (*rest.RESTClient, error) { + + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.ExplicitPath = kubeConfigFile + loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{}) + + clientConfig, err := loader.ClientConfig() + if err != nil { + return nil, err + } + + clientConfig.ContentConfig.NegotiatedSerializer = dynamic.ContentConfig().NegotiatedSerializer + + restClient, err := rest.UnversionedRESTClientFor(clientConfig) + if err != nil { + return nil, err + } + + return restClient, nil +} diff --git a/federation/plugin/pkg/admission/schedulingpolicy/admission_test.go b/federation/plugin/pkg/admission/schedulingpolicy/admission_test.go new file mode 100644 index 000000000000..1f6eb66b8a4f --- /dev/null +++ b/federation/plugin/pkg/admission/schedulingpolicy/admission_test.go @@ -0,0 +1,473 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulingpolicy + +import ( + "bytes" + "encoding/json" + "fmt" + "html/template" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/authentication/user" + core "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/api" + extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" +) + +func TestNewAdmissionController(t *testing.T) { + tempfile, err := ioutil.TempFile("", "") + if err != nil { + t.Fatalf("Unexpected error while creating temporary file: %v", err) + } + p := tempfile.Name() + defer os.Remove(p) + + kubeconfig := ` +clusters: + - name: foo + cluster: + server: https://example.com +users: + - name: alice + user: + token: deadbeef +contexts: + - name: default + context: + cluster: foo + user: alice +current-context: default +` + + if _, err := tempfile.WriteString(kubeconfig); err != nil { + t.Fatalf("Unexpected error while writing test kubeconfig file: %v", err) + } + + tests := []struct { + note string + input string + wantErr bool + }{ + {"no config", "", true}, + {"bad json", `{"foo": `, true}, + {"bad yaml", `{foo" `, true}, + { + "missing kubeconfig", + `{"foo": {}}`, + true, + }, + { + "kubeconfig not found", + `{ + "kubeconfig": "/kube-federation-scheduling-policy-file-not-found-test" + }`, + true, + }, + { + "bad retry backoff", + fmt.Sprintf(` + { + "kubeconfig": %q, + "retryBackoff": -1 + } + `, p), + true, + }, + { + "a valid config", + fmt.Sprintf(` + { + "kubeconfig": %q + } + `, p), + false, + }, + { + "a valid config with retry backoff", + fmt.Sprintf(` + { + "kubeconfig": %q, + "retryBackoff": 200 + } + `, p), + false, + }, + } + + for _, tc := range tests { + var file io.Reader + if tc.input == "" { + file = nil + } else { + file = bytes.NewBufferString(tc.input) + } + + _, err := newAdmissionController(file) + + if tc.wantErr && err == nil { + t.Errorf("%v: Expected error", tc.note) + } else if !tc.wantErr && err != nil { + t.Errorf("%v: Unexpected error: %v", tc.note, err) + } + } +} + +func TestAdmitQueryPayload(t *testing.T) { + var body interface{} + + serve := func(w http.ResponseWriter, r *http.Request) { + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("Unexpected error reading admission payload: %v", err) + } + + // No errors or annotations. + w.Write([]byte(`{}`)) + } + + controller, err := newControllerWithTestServer(serve, true) + if err != nil { + t.Fatalf("Unexpected error while creating test admission controller/server: %v", err) + } + + rs := makeReplicaSet() + rs.Spec.MinReadySeconds = 100 + attrs := makeAdmissionRecord(rs) + err = controller.Admit(attrs) + + if err != nil { + t.Fatalf("Unexpected error from admission controller: %v", err) + } + + obj := body.(map[string]interface{}) + metadata := obj["metadata"].(map[string]interface{}) + spec := obj["spec"].(map[string]interface{}) + name := metadata["name"].(string) + minReadySeconds := spec["minReadySeconds"].(float64) + + expectedName := "myapp" + if name != expectedName { + t.Fatalf("Expected replicaset.metadata.name to be %v but got: %v", expectedName, name) + } + + expectedMinReadySeconds := float64(100) + if minReadySeconds != expectedMinReadySeconds { + t.Fatalf("Expected replicaset.spec.minReadySeconds to be %v but got: %v", expectedMinReadySeconds, minReadySeconds) + } +} + +func TestAdmitFailInternal(t *testing.T) { + serve := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + } + + controller, err := newControllerWithTestServer(serve, false) + if err != nil { + t.Fatalf("Unexpected error while creating test admission controller/server: %v", err) + } + + mockClient := &fake.Clientset{} + mockClient.AddReactor("list", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("unknown error") + }) + + controller.SetInternalKubeClientSet(mockClient) + + attrs := makeAdmissionRecord(makeReplicaSet()) + err = controller.Admit(attrs) + + if err == nil { + t.Fatalf("Expected admission controller to fail closed") + } +} + +func TestAdmitPolicyDoesNotExist(t *testing.T) { + + controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + }, false) + if err != nil { + t.Fatalf("Unexpected error while creating test admission controller/server: %v", err) + } + + attrs := makeAdmissionRecord(makeReplicaSet()) + err = controller.Admit(attrs) + + if err != nil { + t.Fatalf("Expected admission controller to fail open but got error: %v", err) + } +} + +func TestAdmitFailClosed(t *testing.T) { + tests := []struct { + note string + statusCode int + body string + }{ + {"server error", 500, ""}, + {"unmarshal error", 200, "{"}, + {"undefined result", 404, ``}, + {"policy errors", 200, `{"errors": ["conflicting replica-set-preferences"]}`}, + } + + for _, tc := range tests { + + serve := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.statusCode) + if len(tc.body) > 0 { + w.Write([]byte(tc.body)) + } + } + + controller, err := newControllerWithTestServer(serve, true) + + if err != nil { + t.Errorf("%v: Unexpected error while creating test admission controller/server: %v", tc.note, err) + continue + } + + obj := makeReplicaSet() + attrs := admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{}) + err = controller.Admit(attrs) + if err == nil { + t.Errorf("%v: Expected admission controller to fail closed", tc.note) + } + + } +} + +func TestAdmitRetries(t *testing.T) { + var numQueries int + + controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + numQueries++ + }, true) + + if err != nil { + t.Fatalf("Unexpected error while creating test admission controller/server: %v", err) + } + + err = controller.Admit(makeAdmissionRecord(makeReplicaSet())) + + if err == nil { + t.Fatalf("Expected admission controller to fail closed") + } + + if numQueries <= 1 { + t.Fatalf("Expected multiple queries/retries but got (numQueries): %v", numQueries) + } +} + +func TestAdmitSuccessWithAnnotationMerge(t *testing.T) { + controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(` + { + "annotations": { + "foo": "bar-2" + } + } + `)) + }, true) + if err != nil { + t.Fatalf("Unexpected error while creating test admission controller/server: %v", err) + } + + obj := makeReplicaSet() + obj.Annotations = map[string]string{} + obj.Annotations["foo"] = "bar" + obj.Annotations["bar"] = "baz" + + attrs := admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{}) + err = controller.Admit(attrs) + if err != nil { + t.Fatalf("Unexpected error from admission controller: %v", err) + } + + annotations := attrs.GetObject().(*extensionsv1.ReplicaSet).Annotations + expected := map[string]string{ + "foo": "bar-2", + "bar": "baz", + } + + if !reflect.DeepEqual(annotations, expected) { + t.Fatalf("Expected annotations to be %v but got: %v", expected, annotations) + } +} + +func newControllerWithTestServer(f func(w http.ResponseWriter, r *http.Request), policiesExist bool) (*admissionController, error) { + server, err := newTestServer(f) + if err != nil { + return nil, err + } + + kubeConfigFile, err := makeKubeConfigFile(server.URL, "/some/path/to/decision") + if err != nil { + return nil, err + } + + defer os.Remove(kubeConfigFile) + + configFile, err := makeAdmissionControlConfigFile(kubeConfigFile) + if err != nil { + return nil, err + } + + defer os.Remove(configFile) + + file, err := os.Open(configFile) + if err != nil { + return nil, err + } + + controller, err := newAdmissionController(file) + if err != nil { + return nil, err + } + + mockClient := &fake.Clientset{} + + var items []api.ConfigMap + + if policiesExist { + items = append(items, api.ConfigMap{}) + } + + mockClient.AddReactor("list", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + if action.GetNamespace() == policyConfigMapNamespace { + return true, &api.ConfigMapList{Items: items}, nil + } + return true, nil, nil + }) + + controller.SetInternalKubeClientSet(mockClient) + + return controller, nil +} + +func newTestServer(f func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, error) { + server := httptest.NewUnstartedServer(http.HandlerFunc(f)) + server.Start() + return server, nil +} + +func makeAdmissionControlConfigFile(kubeConfigFile string) (string, error) { + tempfile, err := ioutil.TempFile("", "") + if err != nil { + return "", err + } + + p := tempfile.Name() + + configFileTmpl := ` +kubeconfig: {{ .KubeConfigFile }} +retryBackoff: {{ .RetryBackoff }} +` + type configFileTemplateInput struct { + KubeConfigFile string + RetryBackoff int + } + + input := configFileTemplateInput{ + KubeConfigFile: kubeConfigFile, + RetryBackoff: 1, + } + + tmpl, err := template.New("scheduling-policy-config").Parse(configFileTmpl) + if err != nil { + return "", err + } + + if err := tmpl.Execute(tempfile, input); err != nil { + return "", err + } + + return p, nil +} + +func makeKubeConfigFile(baseURL, path string) (string, error) { + tempfile, err := ioutil.TempFile("", "") + if err != nil { + return "", err + } + + p := tempfile.Name() + + kubeConfigTmpl := ` +clusters: + - name: test + cluster: + server: {{ .BaseURL }}{{ .Path }} +users: + - name: alice + user: + token: deadbeef +contexts: + - name: default + context: + cluster: test + user: alice +current-context: default` + + type kubeConfigTemplateInput struct { + BaseURL string + Path string + } + + input := kubeConfigTemplateInput{ + BaseURL: baseURL, + Path: path, + } + + tmpl, err := template.New("kubeconfig").Parse(kubeConfigTmpl) + if err != nil { + return "", err + } + + if err := tmpl.Execute(tempfile, input); err != nil { + return "", err + } + + return p, nil +} + +func makeAdmissionRecord(obj *extensionsv1.ReplicaSet) admission.Attributes { + return admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{}) +} + +func makeReplicaSet() *extensionsv1.ReplicaSet { + return &extensionsv1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "ReplicaSet", + APIVersion: "extensions/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "myapp", + }, + Spec: extensionsv1.ReplicaSetSpec{}, + } +} diff --git a/federation/plugin/pkg/admission/schedulingpolicy/merge.go b/federation/plugin/pkg/admission/schedulingpolicy/merge.go new file mode 100644 index 000000000000..69c06e80b651 --- /dev/null +++ b/federation/plugin/pkg/admission/schedulingpolicy/merge.go @@ -0,0 +1,45 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulingpolicy + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +// mergeAnnotations updates obj so that the provided annotations supersede the +// existing annotations. +func mergeAnnotations(obj runtime.Object, annotations map[string]string) error { + if len(annotations) == 0 { + return nil + } + + accessor, err := meta.Accessor(obj) + if err != nil { + return err + } + + orig := accessor.GetAnnotations() + for k := range orig { + if _, ok := annotations[k]; !ok { + annotations[k] = orig[k] + } + } + + accessor.SetAnnotations(annotations) + return nil +} diff --git a/federation/plugin/pkg/admission/schedulingpolicy/merge_test.go b/federation/plugin/pkg/admission/schedulingpolicy/merge_test.go new file mode 100644 index 000000000000..6218f2e93043 --- /dev/null +++ b/federation/plugin/pkg/admission/schedulingpolicy/merge_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulingpolicy + +import ( + "encoding/json" + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestMergeAnnotations(t *testing.T) { + tests := []struct { + note string + input *v1.Pod + annotations string + expected string + }{ + {"nil annotations", &v1.Pod{}, `{"foo": "bar"}`, `{"foo": "bar"}`}, + {"empty annotations", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}}, `{"foo": "bar"}`, `{"foo": "bar"}`}, + {"existing annotation", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "baz"}}}, `{"foo": "bar"}`, `{"foo": "bar"}`}, + {"different annotation", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"baz": "qux"}}}, `{"foo": "bar"}`, `{"baz": "qux", "foo": "bar"}`}, + } + + for _, tc := range tests { + + annotations := map[string]string{} + + if err := json.Unmarshal([]byte(tc.annotations), &annotations); err != nil { + panic(err) + } + + expected := map[string]string{} + + if err := json.Unmarshal([]byte(tc.expected), &expected); err != nil { + panic(err) + } + + err := mergeAnnotations(tc.input, annotations) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if !reflect.DeepEqual(tc.input.ObjectMeta.Annotations, expected) { + t.Errorf("%v: Expected annotations to equal %v but got: %v", tc.note, expected, tc.input.ObjectMeta.Annotations) + } + } + +} diff --git a/federation/plugin/pkg/admission/schedulingpolicy/query.go b/federation/plugin/pkg/admission/schedulingpolicy/query.go new file mode 100644 index 000000000000..206f2c404a95 --- /dev/null +++ b/federation/plugin/pkg/admission/schedulingpolicy/query.go @@ -0,0 +1,145 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulingpolicy + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/api" +) + +// policyUndefinedError represents an undefined response from the policy +// engine. This typically means the relevant policy has not been loaded into +// the engine. +type policyUndefinedError struct{} + +func (policyUndefinedError) Error() string { + return "policy decision is undefined" +} + +// policyEngineQuery represents a single query against the policy engine. +type policyEngineQuery struct { + client *rest.RESTClient + retryBackoff time.Duration + obj runtime.Object + gvk schema.GroupVersionKind +} + +// newPolicyEngineQuery returns a policyEngineQuery that can be executed. +func newPolicyEngineQuery(client *rest.RESTClient, retryBackoff time.Duration, obj runtime.Object, gvk schema.GroupVersionKind) *policyEngineQuery { + return &policyEngineQuery{ + client: client, + retryBackoff: retryBackoff, + obj: obj, + gvk: gvk, + } +} + +// Do returns the result of the policy engine query. If the policy decision is +// undefined or an unknown error occurs, err is non-nil. Otherwise, result is +// non-nil and contains the result of policy evaluation. +func (query *policyEngineQuery) Do() (decision *policyDecision, err error) { + + bs, err := query.encode() + if err != nil { + return nil, err + } + + var result rest.Result + + err = webhook.WithExponentialBackoff(query.retryBackoff, func() error { + result = query.client.Post(). + Body(bs). + Do() + return result.Error() + }) + + if err != nil { + if errors.IsNotFound(err) { + return nil, policyUndefinedError{} + } + return nil, err + } + + return decodeResult(result) +} + +// encode returns the encoded version of the query's runtime.Object. +func (query *policyEngineQuery) encode() ([]byte, error) { + + var info runtime.SerializerInfo + infos := api.Codecs.SupportedMediaTypes() + + for i := range infos { + if infos[i].MediaType == "application/json" { + info = infos[i] + } + } + + if info.Serializer == nil { + return nil, fmt.Errorf("serialization not supported") + } + + codec := api.Codecs.EncoderForVersion(info.Serializer, query.gvk.GroupVersion()) + + var buf bytes.Buffer + if err := codec.Encode(query.obj, &buf); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// policyDecision represents a response from the policy engine. +type policyDecision struct { + Errors []string `json:"errors,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` +} + +// Error returns an error if the policy raised an error. +func (d *policyDecision) Error() error { + if len(d.Errors) == 0 { + return nil + } + return fmt.Errorf("reason(s): %v", strings.Join(d.Errors, "; ")) +} + +func decodeResult(result rest.Result) (*policyDecision, error) { + + bs, err := result.Raw() + if err != nil { + return nil, err + } + + buf := bytes.NewBuffer(bs) + var decision policyDecision + + if err := json.NewDecoder(buf).Decode(&decision); err != nil { + return nil, err + } + + return &decision, nil +} diff --git a/hack/.linted_packages b/hack/.linted_packages index 59299ce5536b..a12daf35edc3 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -51,6 +51,7 @@ federation/cmd/federation-controller-manager federation/cmd/genfeddocs federation/cmd/kubefed federation/pkg/federation-controller/util/replicapreferences +federation/plugin/pkg/admission/schedulingpolicy hack hack/boilerplate/test hack/cmd/teststale