diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index fe23f62f82ae..234961d81e0a 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -230,6 +230,7 @@ func TestAddFlags(t *testing.T) { ThrottleEnable: false, ThrottleQPS: 43.5, ThrottleBurst: 44, + AsyncDelegate: true, }, }, TruncateOptions: apiserveroptions.AuditTruncateOptions{ diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go index d06fffe406a6..29269a2288dd 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/audit.go @@ -42,6 +42,16 @@ import ( pluginwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook" ) +const ( + // Default configuration values for ModeBatch. + defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. + // These batch parameters are only used by the webhook backend. + defaultBatchMaxSize = 400 // Only send up to 400 events at a time. + defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. + defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. + defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. +) + func appendBackend(existing, newBackend audit.Backend) audit.Backend { if existing == nil { return newBackend @@ -129,15 +139,12 @@ type AuditWebhookOptions struct { } func NewAuditOptions() *AuditOptions { - defaultLogBatchConfig := pluginbuffered.NewDefaultBatchConfig() - defaultLogBatchConfig.ThrottleEnable = false - return &AuditOptions{ WebhookOptions: AuditWebhookOptions{ InitialBackoff: pluginwebhook.DefaultInitialBackoff, BatchOptions: AuditBatchOptions{ Mode: ModeBatch, - BatchConfig: pluginbuffered.NewDefaultBatchConfig(), + BatchConfig: defaultWebhookBatchConfig(), }, TruncateOptions: NewAuditTruncateOptions(), // TODO(audit): use v1 API in release 1.13 @@ -147,7 +154,7 @@ func NewAuditOptions() *AuditOptions { Format: pluginlog.FormatJson, BatchOptions: AuditBatchOptions{ Mode: ModeBlocking, - BatchConfig: defaultLogBatchConfig, + BatchConfig: defaultLogBatchConfig(), }, TruncateOptions: NewAuditTruncateOptions(), // TODO(audit): use v1 API in release 1.13 @@ -213,11 +220,13 @@ func validateBackendBatchOptions(pluginName string, options AuditBatchOptions) e if config.MaxBatchSize <= 0 { return fmt.Errorf("invalid audit batch %s max batch size %v, must be a positive number", pluginName, config.MaxBatchSize) } - if config.ThrottleQPS <= 0 { - return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS) - } - if config.ThrottleBurst <= 0 { - return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst) + if config.ThrottleEnable { + if config.ThrottleQPS <= 0 { + return fmt.Errorf("invalid audit batch %s throttle QPS %v, must be a positive number", pluginName, config.ThrottleQPS) + } + if config.ThrottleBurst <= 0 { + return fmt.Errorf("invalid audit batch %s throttle burst %v, must be a positive number", pluginName, config.ThrottleBurst) + } } return nil } @@ -525,3 +534,31 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error { c.AuditBackend = appendBackend(c.AuditBackend, webhook) return nil } + +// defaultWebhookBatchConfig returns the default BatchConfig used by the Webhook backend. +func defaultWebhookBatchConfig() pluginbuffered.BatchConfig { + return pluginbuffered.BatchConfig{ + BufferSize: defaultBatchBufferSize, + MaxBatchSize: defaultBatchMaxSize, + MaxBatchWait: defaultBatchMaxWait, + + ThrottleEnable: true, + ThrottleQPS: defaultBatchThrottleQPS, + ThrottleBurst: defaultBatchThrottleBurst, + + AsyncDelegate: true, + } +} + +// defaultLogBatchConfig returns the default BatchConfig used by the Log backend. +func defaultLogBatchConfig() pluginbuffered.BatchConfig { + return pluginbuffered.BatchConfig{ + BufferSize: defaultBatchBufferSize, + // Batching is not useful for the log-file backend. + // MaxBatchWait ignored. + MaxBatchSize: 1, + ThrottleEnable: false, + // Asynchronous log threads just create lock contention. + AsyncDelegate: false, + } +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD index 52694bba5081..2ed9b4f6199c 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/BUILD @@ -26,6 +26,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", "//staging/src/k8s.io/apiserver/plugin/pkg/audit/fake:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go index f50e5ad6d62f..66165915fcca 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered.go @@ -31,16 +31,6 @@ import ( // PluginName is the name reported in error metrics. const PluginName = "buffered" -const ( - // Default configuration values for ModeBatch. - defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding. - defaultBatchMaxSize = 400 // Only send up to 400 events at a time. - defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute. - - defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS. - defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst. -) - // BatchConfig represents batching delegate audit backend configuration. type BatchConfig struct { // BufferSize defines a size of the buffering queue. @@ -57,19 +47,9 @@ type BatchConfig struct { // ThrottleBurst defines the maximum number of requests sent to the delegate backend at the same moment in case // the capacity defined by ThrottleQPS was not utilized. ThrottleBurst int -} -// NewDefaultBatchConfig returns new Config objects populated by default values. -func NewDefaultBatchConfig() BatchConfig { - return BatchConfig{ - BufferSize: defaultBatchBufferSize, - MaxBatchSize: defaultBatchMaxSize, - MaxBatchWait: defaultBatchMaxWait, - - ThrottleEnable: true, - ThrottleQPS: defaultBatchThrottleQPS, - ThrottleBurst: defaultBatchThrottleBurst, - } + // Whether the delegate backend should be called asynchronously. + AsyncDelegate bool } type bufferedBackend struct { @@ -85,6 +65,9 @@ type bufferedBackend struct { // Receiving maxBatchSize events will always trigger sending a batch, regardless of the amount of time passed. maxBatchWait time.Duration + // Whether the delegate backend should be called asynchronously. + asyncDelegate bool + // Channel to signal that the batching routine has processed all remaining events and exited. // Once `shutdownCh` is closed no new events will be sent to the delegate backend. shutdownCh chan struct{} @@ -113,6 +96,7 @@ func NewBackend(delegate audit.Backend, config BatchConfig) audit.Backend { buffer: make(chan *auditinternal.Event, config.BufferSize), maxBatchSize: config.MaxBatchSize, maxBatchWait: config.MaxBatchWait, + asyncDelegate: config.AsyncDelegate, shutdownCh: make(chan struct{}), wg: sync.WaitGroup{}, throttle: throttle, @@ -169,8 +153,17 @@ func (b *bufferedBackend) Shutdown() { // b.stopCh is closed, processIncomingEvents stops and closes the buffer. func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) { defer close(b.buffer) - t := time.NewTimer(b.maxBatchWait) - defer t.Stop() + + var ( + maxWaitChan <-chan time.Time + maxWaitTimer *time.Timer + ) + // Only use max wait batching if batching is enabled. + if b.maxBatchSize > 1 { + maxWaitTimer = time.NewTimer(b.maxBatchWait) + maxWaitChan = maxWaitTimer.C + defer maxWaitTimer.Stop() + } for { func() { @@ -178,8 +171,10 @@ func (b *bufferedBackend) processIncomingEvents(stopCh <-chan struct{}) { // goroutine can't bring down the main routine. defer runtime.HandleCrash() - t.Reset(b.maxBatchWait) - b.processEvents(b.collectEvents(t.C, stopCh)) + if b.maxBatchSize > 1 { + maxWaitTimer.Reset(b.maxBatchWait) + } + b.processEvents(b.collectEvents(maxWaitChan, stopCh)) }() select { @@ -235,15 +230,25 @@ func (b *bufferedBackend) processEvents(events []*auditinternal.Event) { b.throttle.Accept() } - b.wg.Add(1) - go func() { - defer b.wg.Done() - defer runtime.HandleCrash() + if b.asyncDelegate { + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer runtime.HandleCrash() - // Execute the real processing in a goroutine to keep it from blocking. - // This lets the batching routine continue draining the queue immediately. - b.delegateBackend.ProcessEvents(events...) - }() + // Execute the real processing in a goroutine to keep it from blocking. + // This lets the batching routine continue draining the queue immediately. + b.delegateBackend.ProcessEvents(events...) + }() + } else { + func() { + defer runtime.HandleCrash() + + // Execute the real processing in a goroutine to keep it from blocking. + // This lets the batching routine continue draining the queue immediately. + b.delegateBackend.ProcessEvents(events...) + }() + } } func (b *bufferedBackend) ProcessEvents(ev ...*auditinternal.Event) { diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go index c01258e04ed2..c34b5fbd9f9b 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered/buffered_test.go @@ -17,9 +17,12 @@ limitations under the License. package buffered import ( + "fmt" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" @@ -28,17 +31,7 @@ import ( ) var ( - closedStopCh = func() <-chan struct{} { - ch := make(chan struct{}) - close(ch) - return ch - }() - infiniteTimeCh <-chan time.Time = make(chan time.Time) - closedTimeCh = func() <-chan time.Time { - ch := make(chan time.Time) - close(ch) - return ch - }() + infiniteTimeCh <-chan time.Time ) func newEvents(number int) []*auditinternal.Event { @@ -50,72 +43,118 @@ func newEvents(number int) []*auditinternal.Event { return events } -func TestBufferedBackendCollectEvents(t *testing.T) { - config := NewDefaultBatchConfig() - - testCases := []struct { - desc string - timer <-chan time.Time - stopCh <-chan struct{} - numEvents int - wantBatchSize int - }{ - { - desc: "max batch size encountered", - timer: infiniteTimeCh, - stopCh: wait.NeverStop, - numEvents: config.MaxBatchSize + 1, - wantBatchSize: config.MaxBatchSize, - }, - { - desc: "timer expired", - timer: closedTimeCh, - stopCh: wait.NeverStop, - }, - { - desc: "chanel closed", - timer: infiniteTimeCh, - stopCh: closedStopCh, - }, +func testBatchConfig() BatchConfig { + return BatchConfig{ + BufferSize: 100, + MaxBatchSize: 10, + MaxBatchWait: wait.ForeverTestTimeout, + ThrottleEnable: false, + AsyncDelegate: true, } - for _, tc := range testCases { - tc := tc - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() +} - backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) +func TestBatchedBackendCollectEvents(t *testing.T) { + config := testBatchConfig() + batchSize := config.MaxBatchSize + backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) - backend.ProcessEvents(newEvents(tc.numEvents)...) - batch := backend.collectEvents(tc.timer, tc.stopCh) + t.Log("Max batch size encountered.") + backend.ProcessEvents(newEvents(batchSize + 1)...) + batch := backend.collectEvents(nil, nil) + assert.Len(t, batch, batchSize, "Expected full batch") - require.Equal(t, tc.wantBatchSize, len(batch), "unexpected batch size") - }) + t.Log("Partial batch should hang until timer expires.") + backend.ProcessEvents(newEvents(1)...) + tc := make(chan time.Time) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + batch = backend.collectEvents(tc, nil) + }() + // Wait for the queued events to be collected. + err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + return len(backend.buffer) == 0, nil + }) + require.NoError(t, err) + + tc <- time.Now() // Trigger "timeout" + wg.Wait() + assert.Len(t, batch, 2, "Expected partial batch") + + t.Log("Collected events should be delivered when stop channel is closed.") + backend.ProcessEvents(newEvents(3)...) + stopCh := make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + batch = backend.collectEvents(nil, stopCh) + }() + // Wait for the queued events to be collected. + err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + return len(backend.buffer) == 0, nil + }) + require.NoError(t, err) + + close(stopCh) + wg.Wait() + assert.Len(t, batch, 3, "Expected partial batch") +} + +func TestUnbatchedBackendCollectEvents(t *testing.T) { + config := testBatchConfig() + config.MaxBatchSize = 1 // No batching. + backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) + + t.Log("Max batch size encountered.") + backend.ProcessEvents(newEvents(3)...) + batch := backend.collectEvents(nil, nil) + assert.Len(t, batch, 1, "Expected single event") + + t.Log("Queue should always be drained.") + for len(backend.buffer) > 0 { + batch = backend.collectEvents(nil, nil) + assert.Len(t, batch, 1, "Expected single event") } + + t.Log("Collection should hault when stop channel is closed.") + stopCh := make(chan struct{}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + batch = backend.collectEvents(nil, stopCh) + }() + close(stopCh) + wg.Wait() + assert.Empty(t, batch, "Empty final batch") } func TestBufferedBackendProcessEventsAfterStop(t *testing.T) { t.Parallel() - backend := NewBackend(&fake.Backend{}, NewDefaultBatchConfig()).(*bufferedBackend) + backend := NewBackend(&fake.Backend{}, testBatchConfig()).(*bufferedBackend) + closedStopCh := make(chan struct{}) + close(closedStopCh) backend.Run(closedStopCh) backend.Shutdown() backend.ProcessEvents(newEvents(1)...) batch := backend.collectEvents(infiniteTimeCh, wait.NeverStop) - require.Equal(t, 0, len(batch), "processed events after the backed has been stopped") + require.Empty(t, batch, "processed events after the backed has been stopped") } func TestBufferedBackendProcessEventsBufferFull(t *testing.T) { t.Parallel() - config := NewDefaultBatchConfig() + config := testBatchConfig() config.BufferSize = 1 backend := NewBackend(&fake.Backend{}, config).(*bufferedBackend) backend.ProcessEvents(newEvents(2)...) - require.Equal(t, 1, len(backend.buffer), "buffed contains more elements than it should") + require.Len(t, backend.buffer, 1, "buffed contains more elements than it should") } func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { @@ -129,7 +168,7 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { <-delegatedCallEndCh }, } - config := NewDefaultBatchConfig() + config := testBatchConfig() backend := NewBackend(delegateBackend, config) // Run backend, process events, wait for them to be batched and for delegated call to start. @@ -159,3 +198,25 @@ func TestBufferedBackendShutdownWaitsForDelegatedCalls(t *testing.T) { close(delegatedCallEndCh) <-shutdownEndCh } + +func TestDelegateProcessEvents(t *testing.T) { + for _, async := range []bool{true, false} { + t.Run(fmt.Sprintf("async:%t", async), func(t *testing.T) { + config := testBatchConfig() + config.AsyncDelegate = async + wg := sync.WaitGroup{} + delegate := &fake.Backend{ + OnRequest: func(events []*auditinternal.Event) { + assert.Len(t, events, config.MaxBatchSize, "Unexpected batch") + wg.Done() + }, + } + b := NewBackend(delegate, config).(*bufferedBackend) + wg.Add(5) + for i := 0; i < 5; i++ { + b.processEvents(newEvents(config.MaxBatchSize)) + } + wg.Wait() + }) + } +}