Skip to content

Commit

Permalink
Merge pull request #67223 from tallclair/audit-log
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Synchronous & unbatched audit log writes

**What this PR does / why we need it**:
When enabling buffered audit log file writes to reduce latency under high load, we shouldn't be batching the writes, as the large data write can have an inverse (though unpredictable) impact. Additionally, batched audit log writes should not be done asynchronously, as this just creates lock contention on the log writer.

This is a clean-ed up version of #61217

**Which issue(s) this PR fixes**
Fixes #61932 

**Release note**:

```release-note
NONE
```

/sig auth
/priority important-soon
/kind bug
/milestone v1.12
  • Loading branch information
Kubernetes Submit Queue committed Aug 16, 2018
2 parents 4d40dd0 + c9670d0 commit 87e7b9f
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 96 deletions.
1 change: 1 addition & 0 deletions cmd/kube-apiserver/app/options/options_test.go
Expand Up @@ -230,6 +230,7 @@ func TestAddFlags(t *testing.T) {
ThrottleEnable: false,
ThrottleQPS: 43.5,
ThrottleBurst: 44,
AsyncDelegate: true,
},
},
TruncateOptions: apiserveroptions.AuditTruncateOptions{
Expand Down
57 changes: 47 additions & 10 deletions staging/src/k8s.io/apiserver/pkg/server/options/audit.go
Expand Up @@ -42,6 +42,16 @@ import (
pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
)

const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
// These batch parameters are only used by the webhook backend.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)

func appendBackend(existing, newBackend audit.Backend) audit.Backend {
if existing == nil {
return newBackend
Expand Down Expand Up @@ -129,15 +139,12 @@ type AuditWebhookOptions struct {
}

func NewAuditOptions() *AuditOptions {
defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig()
defaultLogBatchConfig.ThrottleEnable = false

return &AuditOptions{
WebhookOptions: AuditWebhookOptions{
InitialBackoff: pluginwebhook.DefaultInitialBackoff,
BatchOptions: AuditBatchOptions{
Mode: ModeBatch,
BatchConfig: pluginbuffered.NewDefaultBatchConfig(),
BatchConfig: defaultWebhookBatchConfig(),
},
TruncateOptions: NewAuditTruncateOptions(),
// TODO(audit): use v1 API in release 1.13
Expand All @@ -147,7 +154,7 @@ func NewAuditOptions() *AuditOptions {
Format: pluginlog.FormatJson,
BatchOptions: AuditBatchOptions{
Mode: ModeBlocking,
BatchConfig: defaultLogBatchConfig,
BatchConfig: defaultLogBatchConfig(),
},
TruncateOptions: NewAuditTruncateOptions(),
// TODO(audit): use v1 API in release 1.13
Expand Down Expand Up @@ -213,11 +220,13 @@ func validateBackendBatchOptions(pluginName string, options AuditBatchOptions) e
if config.MaxBatchSize <= 0 {
return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize)
}
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
if config.ThrottleEnable {
if config.ThrottleQPS <= 0 {
return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS)
}
if config.ThrottleBurst <= 0 {
return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst)
}
}
return nil
}
Expand Down Expand Up @@ -525,3 +534,31 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
c.AuditBackend = appendBackend(c.AuditBackend, webhook)
return nil
}

// defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend.
func defaultWebhookBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,

ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,

AsyncDelegate: true,
}
}

// defaultLogBatchConfig returns the default BatchConfig used by the Log backend.
func defaultLogBatchConfig() pluginbuffered.BatchConfig {
return pluginbuffered.BatchConfig{
BufferSize: defaultBatchBufferSize,
// Batching is not useful for the log-file backend.
// MaxBatchWait ignored.
MaxBatchSize: 1,
ThrottleEnable: false,
// Asynchronous log threads just create lock contention.
AsyncDelegate: false,
}
}
Expand Up @@ -26,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)
Expand Down
73 changes: 39 additions & 34 deletions staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go
Expand Up @@ -31,16 +31,6 @@ import (
// PluginName is the name reported in error metrics.
const PluginName = "buffered"

const (
// Default configuration values for ModeBatch.
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.

defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)

// BatchConfig represents batching delegate audit backend configuration.
type BatchConfig struct {
// BufferSize defines a size of the buffering queue.
Expand All @@ -57,19 +47,9 @@ type BatchConfig struct {
// ThrottleBurst defines the maximum number of requests sent to the delegate backend at the same moment in case
// the capacity defined by ThrottleQPS was not utilized.
ThrottleBurst int
}

// NewDefaultBatchConfig returns new Config objects populated by default values.
func NewDefaultBatchConfig() BatchConfig {
return BatchConfig{
BufferSize: defaultBatchBufferSize,
MaxBatchSize: defaultBatchMaxSize,
MaxBatchWait: defaultBatchMaxWait,

ThrottleEnable: true,
ThrottleQPS: defaultBatchThrottleQPS,
ThrottleBurst: defaultBatchThrottleBurst,
}
// Whether the delegate backend should be called asynchronously.
AsyncDelegate bool
}

type bufferedBackend struct {
Expand All @@ -85,6 +65,9 @@ type bufferedBackend struct {
// Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed.
maxBatchWait time.Duration

// Whether the delegate backend should be called asynchronously.
asyncDelegate bool

// Channel to signal that the batching routine has processed all remaining events and exited.
// Once `shutdownCh` is closed no new events will be sent to the delegate backend.
shutdownCh chan struct{}
Expand Down Expand Up @@ -113,6 +96,7 @@ func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend {
buffer: make(chan *auditinternal.Event, config.BufferSize),
maxBatchSize: config.MaxBatchSize,
maxBatchWait: config.MaxBatchWait,
asyncDelegate: config.AsyncDelegate,
shutdownCh: make(chan struct{}),
wg: sync.WaitGroup{},
throttle: throttle,
Expand Down Expand Up @@ -169,17 +153,28 @@ func (b *bufferedBackend) Shutdown() {
// b.stopCh is closed, processIncomingEvents stops and closes the buffer.
func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) {
defer close(b.buffer)
t := time.NewTimer(b.maxBatchWait)
defer t.Stop()

var (
maxWaitChan <-chan time.Time
maxWaitTimer *time.Timer
)
// Only use max wait batching if batching is enabled.
if b.maxBatchSize > 1 {
maxWaitTimer = time.NewTimer(b.maxBatchWait)
maxWaitChan = maxWaitTimer.C
defer maxWaitTimer.Stop()
}

for {
func() {
// Recover from any panics caused by this function so a panic in the
// goroutine can't bring down the main routine.
defer runtime.HandleCrash()

t.Reset(b.maxBatchWait)
b.processEvents(b.collectEvents(t.C, stopCh))
if b.maxBatchSize > 1 {
maxWaitTimer.Reset(b.maxBatchWait)
}
b.processEvents(b.collectEvents(maxWaitChan, stopCh))
}()

select {
Expand Down Expand Up @@ -235,15 +230,25 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) {
b.throttle.Accept()
}

b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()
if b.asyncDelegate {
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer runtime.HandleCrash()

// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
} else {
func() {
defer runtime.HandleCrash()

// Execute the real processing in a goroutine to keep it from blocking.
// This lets the batching routine continue draining the queue immediately.
b.delegateBackend.ProcessEvents(events...)
}()
}
}

func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) {
Expand Down

0 comments on commit 87e7b9f

Please sign in to comment.