From a88e0187f9f6083ed68d18e939a776c44c728e4b Mon Sep 17 00:00:00 2001 From: Eric Chiang Date: Thu, 25 May 2017 11:16:56 -0700 Subject: [PATCH] apiserver: add a webhook implementation of the audit backend --- cmd/kube-apiserver/app/options/options.go | 4 +- .../app/options/options.go | 4 +- hack/.linted_packages | 1 + hack/verify-flags/known-flags.txt | 2 + staging/src/k8s.io/apiserver/pkg/audit/BUILD | 14 + .../src/k8s.io/apiserver/pkg/audit/union.go | 51 +++ .../k8s.io/apiserver/pkg/audit/union_test.go | 73 ++++ .../k8s.io/apiserver/pkg/server/options/BUILD | 2 + .../apiserver/pkg/server/options/audit.go | 138 ++++++-- .../pkg/server/options/recommended.go | 4 +- .../apiserver/plugin/pkg/audit/webhook/BUILD | 46 +++ .../plugin/pkg/audit/webhook/webhook.go | 285 +++++++++++++++ .../plugin/pkg/audit/webhook/webhook_test.go | 332 ++++++++++++++++++ .../test/integration/testserver/start.go | 2 +- 14 files changed, 926 insertions(+), 32 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/audit/union.go create mode 100644 staging/src/k8s.io/apiserver/pkg/audit/union_test.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go create mode 100644 staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index ee69ed645b03..99ca76f92e87 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -45,7 +45,7 @@ type ServerRunOptions struct { Etcd *genericoptions.EtcdOptions SecureServing *genericoptions.SecureServingOptions InsecureServing *kubeoptions.InsecureServingOptions - Audit *genericoptions.AuditLogOptions + Audit *genericoptions.AuditOptions Features *genericoptions.FeatureOptions Admission *genericoptions.AdmissionOptions Authentication *kubeoptions.BuiltInAuthenticationOptions @@ -79,7 +79,7 @@ func NewServerRunOptions() *ServerRunOptions { Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)), SecureServing: kubeoptions.NewSecureServingOptions(), InsecureServing: kubeoptions.NewInsecureServingOptions(), - Audit: genericoptions.NewAuditLogOptions(), + Audit: genericoptions.NewAuditOptions(), Features: genericoptions.NewFeatureOptions(), Admission: genericoptions.NewAdmissionOptions(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), diff --git a/federation/cmd/federation-apiserver/app/options/options.go b/federation/cmd/federation-apiserver/app/options/options.go index 36fabdebe4ed..bda3c6c9707d 100644 --- a/federation/cmd/federation-apiserver/app/options/options.go +++ b/federation/cmd/federation-apiserver/app/options/options.go @@ -37,7 +37,7 @@ type ServerRunOptions struct { Etcd *genericoptions.EtcdOptions SecureServing *genericoptions.SecureServingOptions InsecureServing *kubeoptions.InsecureServingOptions - Audit *genericoptions.AuditLogOptions + Audit *genericoptions.AuditOptions Features *genericoptions.FeatureOptions Admission *genericoptions.AdmissionOptions Authentication *kubeoptions.BuiltInAuthenticationOptions @@ -56,7 +56,7 @@ func NewServerRunOptions() *ServerRunOptions { Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, api.Scheme, nil)), SecureServing: kubeoptions.NewSecureServingOptions(), InsecureServing: kubeoptions.NewInsecureServingOptions(), - Audit: genericoptions.NewAuditLogOptions(), + Audit: genericoptions.NewAuditOptions(), Features: genericoptions.NewFeatureOptions(), Admission: genericoptions.NewAdmissionOptions(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), diff --git a/hack/.linted_packages b/hack/.linted_packages index f0f818422d98..02184fdd6003 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -358,6 +358,7 @@ staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/aes staging/src/k8s.io/apiserver/pkg/util/flushwriter staging/src/k8s.io/apiserver/pkg/util/logs +staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook staging/src/k8s.io/apiserver/plugin/pkg/authenticator staging/src/k8s.io/apiserver/plugin/pkg/authenticator/password staging/src/k8s.io/apiserver/plugin/pkg/authenticator/password/allow diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 6b1ac5740818..babeb6b2fbc9 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -46,6 +46,8 @@ audit-log-maxage audit-log-maxbackup audit-log-maxsize audit-log-path +audit-webhook-config-file +audit-webhook-mode authentication-kubeconfig authentication-token-webhook authentication-token-webhook-cache-ttl diff --git a/staging/src/k8s.io/apiserver/pkg/audit/BUILD b/staging/src/k8s.io/apiserver/pkg/audit/BUILD index 9e67642f37af..4e67f4299351 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/audit/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -13,6 +14,7 @@ go_library( "request.go", "scheme.go", "types.go", + "union.go", ], tags = ["automanaged"], deps = [ @@ -23,6 +25,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", @@ -30,3 +33,14 @@ go_library( "//vendor/k8s.io/client-go/pkg/apis/authentication/v1:go_default_library", ], ) + +go_test( + name = "go_default_test", + srcs = ["union_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + ], +) diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union.go b/staging/src/k8s.io/apiserver/pkg/audit/union.go new file mode 100644 index 000000000000..ba969cec98b6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/audit/union.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +import ( + "k8s.io/apimachinery/pkg/util/errors" + auditinternal "k8s.io/apiserver/pkg/apis/audit" +) + +// Union returns an audit Backend which logs events to a set of backends. The returned +// Sink implementation blocks in turn for each call to ProcessEvents. +func Union(backends ...Backend) Backend { + if len(backends) == 1 { + return backends[0] + } + return union{backends} +} + +type union struct { + backends []Backend +} + +func (u union) ProcessEvents(events ...*auditinternal.Event) { + for _, backend := range u.backends { + backend.ProcessEvents(events...) + } +} + +func (u union) Run(stopCh <-chan struct{}) error { + var funcs []func() error + for _, backend := range u.backends { + funcs = append(funcs, func() error { + return backend.Run(stopCh) + }) + } + return errors.AggregateGoroutines(funcs...) +} diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go new file mode 100644 index 000000000000..c016f3d07c71 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package audit + +import ( + "strconv" + "testing" + + "k8s.io/apimachinery/pkg/types" + auditinternal "k8s.io/apiserver/pkg/apis/audit" +) + +type fakeBackend struct { + events []*auditinternal.Event +} + +func (f *fakeBackend) ProcessEvents(events ...*auditinternal.Event) { + f.events = append(f.events, events...) +} + +func (f *fakeBackend) Run(stopCh <-chan struct{}) error { + return nil +} + +func TestUnion(t *testing.T) { + backends := []Backend{ + new(fakeBackend), + new(fakeBackend), + new(fakeBackend), + } + + b := Union(backends...) + + n := 5 + + var events []*auditinternal.Event + for i := 0; i < n; i++ { + events = append(events, &auditinternal.Event{ + AuditID: types.UID(strconv.Itoa(i)), + }) + } + b.ProcessEvents(events...) + + for i, b := range backends { + // so we can inspect the underlying events. + backend := b.(*fakeBackend) + + if got := len(backend.events); got != n { + t.Errorf("backend %d wanted %d events, got %d", i, n, got) + continue + } + for j, event := range backend.events { + wantID := types.UID(strconv.Itoa(j)) + if event.AuditID != wantID { + t.Errorf("backend %d event %d wanted id %s, got %s", i, j, wantID, event.AuditID) + } + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD index 4e5c44a12de7..8e616203fb68 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/options/BUILD @@ -54,6 +54,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission/initializer:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", "//vendor/k8s.io/apiserver/pkg/audit/policy:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/authenticatorfactory:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", @@ -66,6 +67,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library", + "//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/authentication/v1beta1:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index e4ab7da80a68..b5f310af0411 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -20,28 +20,106 @@ import ( "fmt" "io" "os" + "strings" "github.com/spf13/pflag" "gopkg.in/natefinch/lumberjack.v2" + "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/audit/policy" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" pluginlog "k8s.io/apiserver/plugin/pkg/audit/log" + pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" ) +func appendBackend(existing, newBackend audit.Backend) audit.Backend { + if existing == nil { + return newBackend + } + return audit.Union(existing, newBackend) +} + +func advancedAuditingEnabled() bool { + return utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) +} + +type AuditOptions struct { + // Policy configuration file for filtering audit events that are captured. + // If unspecified, a default is provided. + PolicyFile string + + // Plugin options + + LogOptions AuditLogOptions + WebhookOptions AuditWebhookOptions +} + +// AuditLogOptions holds the legacy audit log writer. If the AdvancedAuditing feature +// is enabled, these options determine the output of the structured audit log. type AuditLogOptions struct { Path string MaxAge int MaxBackups int MaxSize int +} - PolicyFile string +// AuditWebhookOptions control the webhook configuration for audit events. +type AuditWebhookOptions struct { + ConfigFile string + // Should the webhook asynchronous batch events to the webhook backend or + // should the webhook block responses? + // + // Defaults to asynchronous batch events. + Mode string +} + +func NewAuditOptions() *AuditOptions { + return &AuditOptions{ + WebhookOptions: AuditWebhookOptions{Mode: pluginwebhook.ModeBatch}, + } +} + +func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&o.PolicyFile, "audit-policy-file", o.PolicyFile, + "Path to the file that defines the audit policy configuration. Requires the 'AdvancedAuditing' feature gate."+ + " With AdvancedAuditing, a profile is required to enable auditing.") + + o.LogOptions.AddFlags(fs) + o.WebhookOptions.AddFlags(fs) +} + +func (o *AuditOptions) ApplyTo(c *server.Config) error { + // Apply generic options. + if err := o.applyTo(c); err != nil { + return err + } + + // Apply plugin options. + if err := o.LogOptions.applyTo(c); err != nil { + return err + } + if err := o.WebhookOptions.applyTo(c); err != nil { + return err + } + return nil } -func NewAuditLogOptions() *AuditLogOptions { - return &AuditLogOptions{} +func (o *AuditOptions) applyTo(c *server.Config) error { + if o.PolicyFile == "" { + return nil + } + + if !advancedAuditingEnabled() { + return fmt.Errorf("feature '%s' must be enabled to set an audit policy", features.AdvancedAuditing) + } + p, err := policy.LoadPolicyFromFile(o.PolicyFile) + if err != nil { + return fmt.Errorf("loading audit policy file: %v", err) + } + c.AuditPolicyChecker = policy.NewChecker(p) + return nil } func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) { @@ -53,29 +131,10 @@ func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) { "The maximum number of old audit log files to retain.") fs.IntVar(&o.MaxSize, "audit-log-maxsize", o.MaxSize, "The maximum size in megabytes of the audit log file before it gets rotated.") - - fs.StringVar(&o.PolicyFile, "audit-policy-file", o.PolicyFile, - "Path to the file that defines the audit policy configuration. Requires the 'AdvancedAuditing' feature gate."+ - " With AdvancedAuditing, a profile is required to enable auditing.") } -func (o *AuditLogOptions) ApplyTo(c *server.Config) error { - if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) { - if o.PolicyFile != "" { - p, err := policy.LoadPolicyFromFile(o.PolicyFile) - if err != nil { - return err - } - c.AuditPolicyChecker = policy.NewChecker(p) - } - } else { - if o.PolicyFile != "" { - return fmt.Errorf("feature '%s' must be enabled to set an audit policy", features.AdvancedAuditing) - } - } - - // TODO: Generalize for alternative audit backends. - if len(o.Path) == 0 { +func (o *AuditLogOptions) applyTo(c *server.Config) error { + if o.Path == "" { return nil } @@ -89,6 +148,35 @@ func (o *AuditLogOptions) ApplyTo(c *server.Config) error { } } c.LegacyAuditWriter = w - c.AuditBackend = pluginlog.NewBackend(w) + + if advancedAuditingEnabled() { + c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w)) + } + return nil +} + +func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile, + "Path to a kubeconfig formatted file that defines the audit webhook configuration."+ + " Requires the 'AdvancedAuditing' feature gate.") + fs.StringVar(&o.Mode, "audit-webhook-mode", o.Mode, + "Strategy for sending audit events. Blocking indicates sending events should block"+ + " server responses. Batch causes the webhook to buffer and send events"+ + " asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".") +} + +func (o *AuditWebhookOptions) applyTo(c *server.Config) error { + if o.ConfigFile == "" { + return nil + } + + if !advancedAuditingEnabled() { + return fmt.Errorf("feature '%s' must be enabled to set an audit webhook", features.AdvancedAuditing) + } + webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode) + if err != nil { + return fmt.Errorf("initializing audit webhook: %v", err) + } + c.AuditBackend = appendBackend(c.AuditBackend, webhook) return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index cf0ba4016e6e..92ec3e8e22f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -31,7 +31,7 @@ type RecommendedOptions struct { SecureServing *SecureServingOptions Authentication *DelegatingAuthenticationOptions Authorization *DelegatingAuthorizationOptions - Audit *AuditLogOptions + Audit *AuditOptions Features *FeatureOptions } @@ -41,7 +41,7 @@ func NewRecommendedOptions(prefix string, copier runtime.ObjectCopier, codec run SecureServing: NewSecureServingOptions(), Authentication: NewDelegatingAuthenticationOptions(), Authorization: NewDelegatingAuthorizationOptions(), - Audit: NewAuditLogOptions(), + Audit: NewAuditOptions(), Features: NewFeatureOptions(), } } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD new file mode 100644 index 000000000000..d11c970f1834 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/BUILD @@ -0,0 +1,46 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["webhook_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + "//vendor/k8s.io/client-go/tools/clientcmd/api/v1:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = ["webhook.go"], + tags = ["automanaged"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apimachinery/announced:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apimachinery/registered:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/install:go_default_library", + "//vendor/k8s.io/apiserver/pkg/apis/audit/v1alpha1:go_default_library", + "//vendor/k8s.io/apiserver/pkg/audit:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library", + ], +) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go new file mode 100644 index 000000000000..7dfc1b7158a4 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -0,0 +1,285 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package webhook implements the audit.Backend interface using HTTP webhooks. +package webhook + +import ( + "fmt" + "reflect" + "strings" + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/apimachinery/announced" + "k8s.io/apimachinery/pkg/apimachinery/registered" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/runtime" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/apis/audit/install" + auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/util/webhook" +) + +const ( + // ModeBatch indicates that the webhook should buffer audit events + // internally, sending batch updates either once a certain number of + // events have been received or a certain amount of time has passed. + ModeBatch = "batch" + // ModeBlocking causes the webhook to block on every attempt to process + // a set of events. This causes requests to the API server to wait for a + // round trip to the external audit service before sending a response. + ModeBlocking = "blocking" +) + +// AllowedModes is the modes known by this webhook. +var AllowedModes = []string{ + ModeBatch, + ModeBlocking, +} + +const ( + // Default configuration values for ModeBatch. + // + // TODO(ericchiang): Make these value configurable. Maybe through a + // kubeconfig extension? + defaultBatchBufferSize = 1000 // Buffer up to 1000 events before blocking. + defaultBatchMaxSize = 100 // Only send 100 events at a time. + defaultBatchMaxWait = time.Minute // Send events at least once a minute. +) + +// NewBackend returns an audit backend that sends events over HTTP to an external service. +// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking) +// or buffered with batch POSTs (ModeBatch). +func NewBackend(kubeConfigFile string, mode string) (audit.Backend, error) { + switch mode { + case ModeBatch: + return newBatchWebhook(kubeConfigFile) + case ModeBlocking: + return newBlockingWebhook(kubeConfigFile) + default: + return nil, fmt.Errorf("webhook mode %q is not in list of known modes (%s)", + mode, strings.Join(AllowedModes, ",")) + } +} + +var ( + // NOTE: Copied from other webhook implementations + // + // Can we make these passable to NewGenericWebhook? + groupFactoryRegistry = make(announced.APIGroupFactoryRegistry) + groupVersions = []schema.GroupVersion{auditv1alpha1.SchemeGroupVersion} + registry = registered.NewOrDie("") +) + +func init() { + registry.RegisterVersions(groupVersions) + if err := registry.EnableVersions(groupVersions...); err != nil { + panic(fmt.Sprintf("failed to enable version %v", groupVersions)) + } + install.Install(groupFactoryRegistry, registry, audit.Scheme) +} + +func loadWebhook(configFile string) (*webhook.GenericWebhook, error) { + return webhook.NewGenericWebhook(registry, audit.Codecs, configFile, groupVersions, 0) +} + +func newBlockingWebhook(configFile string) (*blockingBackend, error) { + w, err := loadWebhook(configFile) + if err != nil { + return nil, err + } + return &blockingBackend{w}, nil +} + +type blockingBackend struct { + w *webhook.GenericWebhook +} + +func (b *blockingBackend) Run(stopCh <-chan struct{}) error { + return nil +} + +func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) { + if err := b.processEvents(ev...); err != nil { + glog.Errorf("failed to POST webhook events: %v", err) + } +} + +func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error { + var list auditinternal.EventList + for _, e := range ev { + list.Items = append(list.Items, *e) + } + // NOTE: No exponential backoff because this is the blocking webhook + // mode. Any attempts to retry will block API server requests. + return b.w.RestClient.Post().Body(&list).Do().Error() +} + +// Copied from generated code in k8s.io/apiserver/pkg/apis/audit. +// +// TODO(ericchiang): Have the generated code expose these methods like metav1.GetGeneratedDeepCopyFuncs(). +var auditDeepCopyFuncs = []conversion.GeneratedDeepCopyFunc{ + {Fn: auditinternal.DeepCopy_audit_Event, InType: reflect.TypeOf(&auditinternal.Event{})}, + {Fn: auditinternal.DeepCopy_audit_EventList, InType: reflect.TypeOf(&auditinternal.EventList{})}, + {Fn: auditinternal.DeepCopy_audit_GroupResources, InType: reflect.TypeOf(&auditinternal.GroupResources{})}, + {Fn: auditinternal.DeepCopy_audit_ObjectReference, InType: reflect.TypeOf(&auditinternal.ObjectReference{})}, + {Fn: auditinternal.DeepCopy_audit_Policy, InType: reflect.TypeOf(&auditinternal.Policy{})}, + {Fn: auditinternal.DeepCopy_audit_PolicyList, InType: reflect.TypeOf(&auditinternal.PolicyList{})}, + {Fn: auditinternal.DeepCopy_audit_PolicyRule, InType: reflect.TypeOf(&auditinternal.PolicyRule{})}, + {Fn: auditinternal.DeepCopy_audit_UserInfo, InType: reflect.TypeOf(&auditinternal.UserInfo{})}, +} + +func newBatchWebhook(configFile string) (*batchBackend, error) { + w, err := loadWebhook(configFile) + if err != nil { + return nil, err + } + + c := conversion.NewCloner() + for _, f := range metav1.GetGeneratedDeepCopyFuncs() { + if err := c.RegisterGeneratedDeepCopyFunc(f); err != nil { + return nil, fmt.Errorf("registering meta deep copy method: %v", err) + } + } + + for _, f := range auditDeepCopyFuncs { + if err := c.RegisterGeneratedDeepCopyFunc(f); err != nil { + return nil, fmt.Errorf("registering audit deep copy method: %v", err) + } + } + + return &batchBackend{ + w: w, + buffer: make(chan *auditinternal.Event, defaultBatchBufferSize), + maxBatchSize: defaultBatchMaxSize, + maxBatchWait: defaultBatchMaxWait, + cloner: c, + }, nil +} + +type batchBackend struct { + w *webhook.GenericWebhook + + // Cloner is used to deep copy events as they are buffered. + cloner *conversion.Cloner + + // Channel to buffer events in memory before sending them on the webhook. + buffer chan *auditinternal.Event + // Maximum number of events that can be sent at once. + maxBatchSize int + // Amount of time to wait after sending events before force sending another set. + // + // Receiving maxBatchSize events will always trigger a send, regardless of + // if this amount of time has been reached. + maxBatchWait time.Duration +} + +func (b *batchBackend) Run(stopCh <-chan struct{}) error { + f := func() { + // Recover from any panics caused by this method so a panic in the + // goroutine can't bring down the main routine. + defer runtime.HandleCrash() + + t := time.NewTimer(b.maxBatchWait) + defer t.Stop() // Release ticker resources + + b.sendBatchEvents(stopCh, t.C) + } + + go func() { + for { + f() + + select { + case <-stopCh: + return + default: + } + } + }() + return nil +} + +// sendBatchEvents attempts to batch some number of events to the backend. It POSTs events +// in a goroutine and logging any error encountered during the POST. +// +// The following things can cause sendBatchEvents to exit: +// +// * Some maximum number of events are received. +// * Timer has passed, all queued events are sent. +// * StopCh is closed, all queued events are sent. +// +func (b *batchBackend) sendBatchEvents(stopCh <-chan struct{}, timer <-chan time.Time) { + var events []auditinternal.Event + +L: + for i := 0; i < b.maxBatchSize; i++ { + select { + case ev := <-b.buffer: + events = append(events, *ev) + case <-timer: + // Timer has expired. Send whatever events are in the queue. + break L + case <-stopCh: + // Webhook has shut down. Send the last events. + break L + } + } + + if len(events) == 0 { + return + } + + list := auditinternal.EventList{Items: events} + go func() { + // Execute the webhook POST in a goroutine to keep it from blocking. + // This lets the webhook continue to drain the queue immediatly. + + defer runtime.HandleCrash() + + err := webhook.WithExponentialBackoff(0, func() error { + return b.w.RestClient.Post().Body(&list).Do().Error() + }) + if err != nil { + glog.Errorf("failed to POST webhook events: %v", err) + } + }() + return +} + +func (b *batchBackend) ProcessEvents(ev ...*auditinternal.Event) { + for i, e := range ev { + // Per the audit.Backend interface these events are reused after being + // sent to the Sink. Deep copy and send the copy to the queue. + event := new(auditinternal.Event) + if err := auditinternal.DeepCopy_audit_Event(e, event, b.cloner); err != nil { + glog.Errorf("failed to clone audit event: %v: %#v", err, e) + return + } + + select { + case b.buffer <- event: + default: + glog.Errorf("audit webhook queue blocked, failed to send %d event(s)", len(ev)-i) + return + } + } +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go new file mode 100644 index 000000000000..728e209c4a47 --- /dev/null +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook_test.go @@ -0,0 +1,332 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + stdjson "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/json" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1alpha1 "k8s.io/apiserver/pkg/apis/audit/v1alpha1" + "k8s.io/apiserver/pkg/audit" + "k8s.io/client-go/tools/clientcmd/api/v1" +) + +// newWebhookHandler returns a handler which recieves webhook events and decodes the +// request body. The caller passes a callback which is called on each webhook POST. +func newWebhookHandler(t *testing.T, cb func(events *auditv1alpha1.EventList)) http.Handler { + s := json.NewSerializer(json.DefaultMetaFactory, audit.Scheme, audit.Scheme, false) + return &testWebhookHandler{ + t: t, + onEvents: cb, + serializer: s, + } +} + +type testWebhookHandler struct { + t *testing.T + + onEvents func(events *auditv1alpha1.EventList) + + serializer runtime.Serializer +} + +func (t *testWebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + err := func() error { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return fmt.Errorf("read webhook request body: %v", err) + } + + obj, _, err := t.serializer.Decode(body, nil, &auditv1alpha1.EventList{}) + if err != nil { + return fmt.Errorf("decode request body: %v", err) + } + list, ok := obj.(*auditv1alpha1.EventList) + if !ok { + return fmt.Errorf("expected *v1alpha1.EventList got %T", obj) + } + t.onEvents(list) + return nil + }() + + if err == nil { + io.WriteString(w, "{}") + return + } + // In a goroutine, can't call Fatal. + assert.NoError(t.t, err, "failed to read request body") + http.Error(w, err.Error(), http.StatusInternalServerError) +} + +func newTestBlockingWebhook(t *testing.T, endpoint string) *blockingBackend { + return newWebhook(t, endpoint, ModeBlocking).(*blockingBackend) +} + +func newTestBatchWebhook(t *testing.T, endpoint string) *batchBackend { + return newWebhook(t, endpoint, ModeBatch).(*batchBackend) +} + +func newWebhook(t *testing.T, endpoint string, mode string) audit.Backend { + config := v1.Config{ + Clusters: []v1.NamedCluster{ + {Cluster: v1.Cluster{Server: endpoint, InsecureSkipTLSVerify: true}}, + }, + } + f, err := ioutil.TempFile("", "k8s_audit_webhook_test_") + require.NoError(t, err, "creating temp file") + + defer func() { + f.Close() + os.Remove(f.Name()) + }() + + // NOTE(ericchiang): Do we need to use a proper serializer? + require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig") + + backend, err := NewBackend(f.Name(), mode) + require.NoError(t, err, "initializing backend") + + return backend +} + +func TestWebhook(t *testing.T) { + gotEvents := false + defer func() { require.True(t, gotEvents, "no events received") }() + + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + gotEvents = true + })) + defer s.Close() + + backend := newTestBlockingWebhook(t, s.URL) + + // Ensure this doesn't return a serialization error. + event := &auditinternal.Event{} + require.NoError(t, backend.processEvents(event), "failed to send events") +} + +// waitForEmptyBuffer indicates when the sendBatchEvents method has read from the +// existing buffer. This lets test coordinate closing a timer and stop channel +// until the for loop has read from the buffer. +func waitForEmptyBuffer(b *batchBackend) { + for len(b.buffer) != 0 { + time.Sleep(time.Millisecond) + } +} + +func TestBatchWebhookMaxEvents(t *testing.T) { + nRest := 10 + events := make([]*auditinternal.Event, defaultBatchMaxSize+nRest) // greater than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := make(chan int, 2) + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + got <- len(events.Items) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL) + + backend.ProcessEvents(events...) + + stopCh := make(chan struct{}) + timer := make(chan time.Time, 1) + + backend.sendBatchEvents(stopCh, timer) + require.Equal(t, defaultBatchMaxSize, <-got, "did not get batch max size") + + go func() { + waitForEmptyBuffer(backend) // wait for the buffer to empty + timer <- time.Now() // Trigger the wait timeout + }() + + backend.sendBatchEvents(stopCh, timer) + require.Equal(t, nRest, <-got, "failed to get the rest of the events") +} + +func TestBatchWebhookStopCh(t *testing.T) { + events := make([]*auditinternal.Event, 1) // less than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + expected := len(events) + got := make(chan int, 2) + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + got <- len(events.Items) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL) + backend.ProcessEvents(events...) + + stopCh := make(chan struct{}) + timer := make(chan time.Time) + + go func() { + waitForEmptyBuffer(backend) + close(stopCh) // stop channel has stopped + }() + backend.sendBatchEvents(stopCh, timer) + require.Equal(t, expected, <-got, "get queued events after timer expires") +} + +func TestBatchWebhookEmptyBuffer(t *testing.T) { + events := make([]*auditinternal.Event, 1) // less than max size. + for i := range events { + events[i] = &auditinternal.Event{} + } + + expected := len(events) + got := make(chan int, 2) + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + got <- len(events.Items) + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL) + + stopCh := make(chan struct{}) + timer := make(chan time.Time, 1) + + timer <- time.Now() // Timer is done. + + // Buffer is empty, no events have been queued. This should exit but send no events. + backend.sendBatchEvents(stopCh, timer) + + // Send additional events after the sendBatchEvents has been called. + backend.ProcessEvents(events...) + go func() { + waitForEmptyBuffer(backend) + timer <- time.Now() + }() + + backend.sendBatchEvents(stopCh, timer) + + // Make sure we didn't get a POST with zero events. + require.Equal(t, expected, <-got, "expected one event") +} + +func TestBatchBufferFull(t *testing.T) { + events := make([]*auditinternal.Event, defaultBatchBufferSize+1) // More than buffered size + for i := range events { + events[i] = &auditinternal.Event{} + } + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + // Do nothing. + })) + defer s.Close() + + backend := newTestBatchWebhook(t, s.URL) + + // Make sure this doesn't block. + backend.ProcessEvents(events...) +} + +func TestBatchRun(t *testing.T) { + + // Divisable by max batch size so we don't have to wait for a minute for + // the test to finish. + events := make([]*auditinternal.Event, defaultBatchMaxSize*3) + for i := range events { + events[i] = &auditinternal.Event{} + } + + got := new(int64) + want := len(events) + + wg := new(sync.WaitGroup) + wg.Add(want) + done := make(chan struct{}) + + go func() { + wg.Wait() + // When the expected number of events have been received, close the channel. + close(done) + }() + + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + atomic.AddInt64(got, int64(len(events.Items))) + wg.Add(-len(events.Items)) + })) + defer s.Close() + + stopCh := make(chan struct{}) + defer close(stopCh) + + backend := newTestBatchWebhook(t, s.URL) + + // Test the Run codepath. E.g. that the spawned goroutines behave correctly. + backend.Run(stopCh) + + backend.ProcessEvents(events...) + + select { + case <-done: + // Received all the events. + case <-time.After(2 * time.Minute): + t.Errorf("expected %d events got %d", want, atomic.LoadInt64(got)) + } +} + +func TestBatchConcurrentRequests(t *testing.T) { + events := make([]*auditinternal.Event, defaultBatchBufferSize) // Don't drop events + for i := range events { + events[i] = &auditinternal.Event{} + } + + wg := new(sync.WaitGroup) + wg.Add(len(events)) + + s := httptest.NewServer(newWebhookHandler(t, func(events *auditv1alpha1.EventList) { + wg.Add(-len(events.Items)) + + // Since the webhook makes concurrent requests, blocking on the webhook response + // shouldn't block the webhook from sending more events. + // + // Wait for all responses to be received before sending the response. + wg.Wait() + })) + defer s.Close() + + stopCh := make(chan struct{}) + defer close(stopCh) + + backend := newTestBatchWebhook(t, s.URL) + backend.Run(stopCh) + + backend.ProcessEvents(events...) + // Wait for the webhook to receive all events. + wg.Wait() +} diff --git a/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/start.go b/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/start.go index 88eca08502ef..3e51475b7add 100644 --- a/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/start.go +++ b/staging/src/k8s.io/kube-apiextensions-server/test/integration/testserver/start.go @@ -42,7 +42,7 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) { } options := server.NewCustomResourceDefinitionsServerOptions(os.Stdout, os.Stderr) - options.RecommendedOptions.Audit.Path = "-" + options.RecommendedOptions.Audit.LogOptions.Path = "-" options.RecommendedOptions.SecureServing.BindPort = port options.RecommendedOptions.Authentication.SkipInClusterLookup = true options.RecommendedOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")