Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audit use buffered backend #60237

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster/gce/gci/configure-helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ function start-kube-apiserver {
params+=" --audit-webhook-batch-throttle-burst=${ADVANCED_AUDIT_WEBHOOK_THROTTLE_BURST}"
fi
if [[ -n "${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF:-}" ]]; then
params+=" --audit-webhook-batch-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}"
params+=" --audit-webhook-initial-backoff=${ADVANCED_AUDIT_WEBHOOK_INITIAL_BACKOFF}"
fi
create-master-audit-webhook-config "${audit_webhook_config_file}"
audit_webhook_config_mount="{\"name\": \"auditwebhookconfigmount\",\"mountPath\": \"${audit_webhook_config_file}\", \"readOnly\": true},"
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/options/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
)
Expand Down
42 changes: 32 additions & 10 deletions cmd/kube-apiserver/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilflag "k8s.io/apiserver/pkg/util/flag"
auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
kapi "k8s.io/kubernetes/pkg/apis/core"
Expand All @@ -54,15 +54,23 @@ func TestAddFlags(t *testing.T) {
"--audit-log-maxbackup=12",
"--audit-log-maxsize=13",
"--audit-log-path=/var/log",
"--audit-log-mode=blocking",
"--audit-log-batch-buffer-size=46",
"--audit-log-batch-max-size=47",
"--audit-log-batch-max-wait=48s",
"--audit-log-batch-throttle-enable=true",
"--audit-log-batch-throttle-qps=49.5",
"--audit-log-batch-throttle-burst=50",
"--audit-policy-file=/policy",
"--audit-webhook-config-file=/webhook-config",
"--audit-webhook-mode=blocking",
"--audit-webhook-batch-buffer-size=42",
"--audit-webhook-batch-max-size=43",
"--audit-webhook-batch-max-wait=1s",
"--audit-webhook-batch-throttle-enable=false",
"--audit-webhook-batch-throttle-qps=43.5",
"--audit-webhook-batch-throttle-burst=44",
"--audit-webhook-batch-initial-backoff=2s",
"--audit-webhook-initial-backoff=2s",
"--authentication-token-webhook-cache-ttl=3m",
"--authentication-token-webhook-config-file=/token-webhook-config",
"--authorization-mode=AlwaysDeny",
Expand Down Expand Up @@ -180,18 +188,32 @@ func TestAddFlags(t *testing.T) {
MaxBackups: 12,
MaxSize: 13,
Format: "json",
BatchOptions: apiserveroptions.AuditBatchOptions{
Mode: "blocking",
BatchConfig: auditbuffered.BatchConfig{
BufferSize: 46,
MaxBatchSize: 47,
MaxBatchWait: 48 * time.Second,
ThrottleEnable: true,
ThrottleQPS: 49.5,
ThrottleBurst: 50,
},
},
},
WebhookOptions: apiserveroptions.AuditWebhookOptions{
Mode: "blocking",
ConfigFile: "/webhook-config",
BatchConfig: auditwebhook.BatchBackendConfig{
BufferSize: 42,
MaxBatchSize: 43,
MaxBatchWait: 1 * time.Second,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
InitialBackoff: 2 * time.Second,
BatchOptions: apiserveroptions.AuditBatchOptions{
Mode: "blocking",
BatchConfig: auditbuffered.BatchConfig{
BufferSize: 42,
MaxBatchSize: 43,
MaxBatchWait: 1 * time.Second,
ThrottleEnable: false,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
},
},
InitialBackoff: 2 * time.Second,
},
PolicyFile: "/policy",
},
Expand Down
4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/server/options/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/log:go_default_library",
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
Expand Down
183 changes: 127 additions & 56 deletions staging/src/k8s.io/apiserver/pkg/server/options/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"os"
"strings"
"time"

"github.com/golang/glog"
"github.com/spf13/pflag"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
pluginlog "k8s.io/apiserver/plugin/pkg/audit/log"
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)
Expand All @@ -58,6 +60,33 @@ type AuditOptions struct {
WebhookOptions AuditWebhookOptions
}

const (
// ModeBatch indicates that the audit backend should buffer audit events
// internally, sending batch updates either once a certain number of
// events have been received or a certain amount of time has passed.
ModeBatch = "batch"
// ModeBlocking causes the audit backend to block on every attempt to process
// a set of events. This causes requests to the API server to wait for the
// flush before sending a response.
ModeBlocking = "blocking"
)

// AllowedModes is the modes known for audit backends.
var AllowedModes = []string{
ModeBatch,
ModeBlocking,
}

type AuditBatchOptions struct {
// Should the backend asynchronous batch events to the webhook backend or
// should the backend block responses?
//
// Defaults to asynchronous batch events.
Mode string
// Configuration for batching backend. Only used in batch mode.
BatchConfig pluginbuffered.BatchConfig
}

// AuditLogOptions determines the output of the structured audit log by default.
// If the AdvancedAuditing feature is set to false, AuditLogOptions holds the legacy
// audit log writer.
Expand All @@ -67,27 +96,37 @@ type AuditLogOptions struct {
MaxBackups int
MaxSize int
Format string

BatchOptions AuditBatchOptions
}

// AuditWebhookOptions control the webhook configuration for audit events.
type AuditWebhookOptions struct {
ConfigFile string
// Should the webhook asynchronous batch events to the webhook backend or
// should the webhook block responses?
//
// Defaults to asynchronous batch events.
Mode string
// Configuration for batching webhook. Only used in batch mode.
BatchConfig pluginwebhook.BatchBackendConfig
ConfigFile string
InitialBackoff time.Duration

BatchOptions AuditBatchOptions
}

func NewAuditOptions() *AuditOptions {
defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig()
defaultLogBatchConfig.ThrottleEnable = false

return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
Mode: pluginwebhook.ModeBatch,
BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(),
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: defaultLogBatchConfig,
},
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
},
LogOptions: AuditLogOptions{
Format: pluginlog.FormatJson,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes default behavior of log backend.

#60773

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a pull request: #60739

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, this is a conscious decision. I believe batching is always better by default than blocking

BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
},
},
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
}
}

Expand All @@ -107,30 +146,20 @@ func (o *AuditOptions) Validate() []error {
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
}
} else {
// Check webhook mode
validMode := false
for _, m := range pluginwebhook.AllowedModes {
if m == o.WebhookOptions.Mode {
validMode = true
break
}
// check webhook configuration
if err := validateBackendMode(pluginwebhook.PluginName, o.WebhookOptions.BatchOptions.Mode); err != nil {
allErrors = append(allErrors, err)
}
if !validMode {
allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ",")))
if err := validateBackendBatchConfig(pluginwebhook.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
allErrors = append(allErrors, err)
}

// Check webhook batch configuration
if o.WebhookOptions.BatchConfig.BufferSize <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize))
}
if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize))
// check log configuration
if err := validateBackendMode(pluginlog.PluginName, o.LogOptions.BatchOptions.Mode); err != nil {
allErrors = append(allErrors, err)
}
if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS))
}
if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst))
if err := validateBackendBatchConfig(pluginlog.PluginName, o.LogOptions.BatchOptions.BatchConfig); err != nil {
allErrors = append(allErrors, err)
}

// Check log format
Expand Down Expand Up @@ -160,6 +189,31 @@ func (o *AuditOptions) Validate() []error {
return allErrors
}

func validateBackendMode(pluginName string, mode string) error {
for _, m := range AllowedModes {
if m == mode {
return nil
}
}
return fmt.Errorf("invalid audit %s mode %s, allowed modes are %q", pluginName, mode, strings.Join(AllowedModes, ","))
}

func validateBackendBatchConfig(pluginName string, config pluginbuffered.BatchConfig) error {
if config.BufferSize <= 0 {
return fmt.Errorf("invalid audit batch %s buffer size %v, must be a positive number", pluginName, config.BufferSize)
}
if config.MaxBatchSize <= 0 {
return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize)
}
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
}
return nil
}

func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
Expand All @@ -170,7 +224,9 @@ func (o *AuditOptions) AddFlags(fs *pflag.FlagSet) {
" With AdvancedAuditing, a profile is required to enable auditing.")

o.LogOptions.AddFlags(fs)
o.LogOptions.BatchOptions.AddFlags(pluginlog.PluginName, fs)
o.WebhookOptions.AddFlags(fs)
o.WebhookOptions.BatchOptions.AddFlags(pluginwebhook.PluginName, fs)
}

func (o *AuditOptions) ApplyTo(c *server.Config) error {
Expand Down Expand Up @@ -216,6 +272,36 @@ func (o *AuditOptions) applyTo(c *server.Config) error {
return nil
}

func (o *AuditBatchOptions) AddFlags(pluginName string, fs *pflag.FlagSet) {
fs.StringVar(&o.Mode, fmt.Sprintf("audit-%s-mode", pluginName), o.Mode,
"Strategy for sending audit events. Blocking indicates sending events should block"+
" server responses. Batch causes the backend to buffer and write events"+
" asynchronously. Known modes are "+strings.Join(AllowedModes, ",")+".")
fs.IntVar(&o.BatchConfig.BufferSize, fmt.Sprintf("audit-%s-batch-buffer-size", pluginName),
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
"batching and writing. Only used in batch mode.")
fs.IntVar(&o.BatchConfig.MaxBatchSize, fmt.Sprintf("audit-%s-batch-max-size", pluginName),
o.BatchConfig.MaxBatchSize, "The maximum size of a batch. Only used in batch mode.")
fs.DurationVar(&o.BatchConfig.MaxBatchWait, fmt.Sprintf("audit-%s-batch-max-wait", pluginName),
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force writing the "+
"batch that hadn't reached the max size. Only used in batch mode.")
fs.BoolVar(&o.BatchConfig.ThrottleEnable, fmt.Sprintf("audit-%s-batch-throttle-enable", pluginName),
o.BatchConfig.ThrottleEnable, "Whether batching throttling is enabled. Only used in batch mode.")
fs.Float32Var(&o.BatchConfig.ThrottleQPS, fmt.Sprintf("audit-%s-batch-throttle-qps", pluginName),
o.BatchConfig.ThrottleQPS, "Maximum average number of batches per second. "+
"Only used in batch mode.")
fs.IntVar(&o.BatchConfig.ThrottleBurst, fmt.Sprintf("audit-%s-batch-throttle-burst", pluginName),
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
}

func (o *AuditBatchOptions) wrapBackend(delegate audit.Backend) audit.Backend {
if o.Mode == ModeBlocking {
return delegate
}
return pluginbuffered.NewBackend(delegate, o.BatchConfig)
}

func (o *AuditLogOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Path, "audit-log-path", o.Path,
"If set, all requests coming to the apiserver will be logged to this file. '-' means standard out.")
Expand Down Expand Up @@ -250,7 +336,8 @@ func (o *AuditLogOptions) getWriter() io.Writer {

func (o *AuditLogOptions) advancedApplyTo(c *server.Config) error {
if w := o.getWriter(); w != nil {
c.AuditBackend = appendBackend(c.AuditBackend, pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion))
log := pluginlog.NewBackend(w, o.Format, auditv1beta1.SchemeGroupVersion)
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(log))
}
return nil
}
Expand All @@ -264,39 +351,23 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ConfigFile, "audit-webhook-config-file", o.ConfigFile,
"Path to a kubeconfig formatted file that defines the audit webhook configuration."+
" Requires the 'AdvancedAuditing' feature gate.")
fs.StringVar(&o.Mode, "audit-webhook-mode", o.Mode,
"Strategy for sending audit events. Blocking indicates sending events should block"+
" server responses. Batch causes the webhook to buffer and send events"+
" asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".")
fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size",
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
"batching and sending to the webhook. Only used in batch mode.")
fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size",
o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+
"Only used in batch mode.")
fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait",
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+
"batch that hadn't reached the max size. Only used in batch mode.")
fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps",
o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+
"Only used in batch mode.")
fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst",
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff",
o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+
"first failed requests. Only used in batch mode.")
fs.DurationVar(&o.InitialBackoff, "audit-webhook-initial-backoff",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately you can't just change flags (see deprecation policy, #5b). I suggest marking the old one as deprecated, and making it an alias for the new one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks, done

o.InitialBackoff, "The amount of time to wait before retrying the first failed request.")
fs.DurationVar(&o.InitialBackoff, "audit-webhook-batch-initial-backoff",
o.InitialBackoff, "The amount of time to wait before retrying the first failed request.")
fs.MarkDeprecated("audit-webhook-batch-initial-backoff",
"Deprecated, use --audit-webhook-initial-backoff instead.")
}

func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
if o.ConfigFile == "" {
return nil
}

webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig)
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, auditv1beta1.SchemeGroupVersion)
if err != nil {
return fmt.Errorf("initializing audit webhook: %v", err)
}
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
c.AuditBackend = appendBackend(c.AuditBackend, o.BatchOptions.wrapBackend(webhook))
return nil
}