Skip to content

Commit

Permalink
Merge pull request #106337 from chenlinx17/cherry-pick-106045-1.21
Browse files Browse the repository at this point in the history
Cherry pick #106045 to 1.21: Fix concurrent map writes error in kube-apiserver
  • Loading branch information
k8s-ci-robot committed Jan 15, 2022
2 parents 9d7f909 + 415cbd2 commit 298c6d0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
17 changes: 12 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/admission/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admission
import (
"context"
"fmt"
"sync"

auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
Expand All @@ -27,7 +28,10 @@ import (
// auditHandler logs annotations set by other admission handlers
type auditHandler struct {
Interface
ae *auditinternal.Event
// TODO: move the lock near the Annotations field of the audit event so it is always protected from concurrent access.
// to protect the 'Annotations' map of the audit event from concurrent writes
mutex sync.Mutex
ae *auditinternal.Event
}

var _ Interface = &auditHandler{}
Expand All @@ -42,10 +46,10 @@ func WithAudit(i Interface, ae *auditinternal.Event) Interface {
if i == nil {
return i
}
return &auditHandler{i, ae}
return &auditHandler{Interface: i, ae: ae}
}

func (handler auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
func (handler *auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error {
if !handler.Interface.Handles(a.GetOperation()) {
return nil
}
Expand All @@ -60,7 +64,7 @@ func (handler auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInt
return err
}

func (handler auditHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
func (handler *auditHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error {
if !handler.Interface.Handles(a.GetOperation()) {
return nil
}
Expand All @@ -84,10 +88,13 @@ func ensureAnnotationGetter(a Attributes) error {
return fmt.Errorf("attributes must be an instance of privateAnnotationsGetter or AnnotationsGetter")
}

func (handler auditHandler) logAnnotations(a Attributes) {
func (handler *auditHandler) logAnnotations(a Attributes) {
if handler.ae == nil {
return
}
handler.mutex.Lock()
defer handler.mutex.Unlock()

switch a := a.(type) {
case privateAnnotationsGetter:
for key, value := range a.getAnnotations(handler.ae.Level) {
Expand Down
30 changes: 30 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/admission/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admission
import (
"context"
"fmt"
"sync"
"testing"

"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -172,3 +173,32 @@ func TestWithAudit(t *testing.T) {
}
}
}

func TestWithAuditConcurrency(t *testing.T) {
admitAnnotations := map[string]string{
"plugin.example.com/foo": "foo",
"plugin.example.com/bar": "bar",
"plugin.example.com/baz": "baz",
"plugin.example.com/qux": "qux",
}
var handler Interface = fakeHandler{admitAnnotations: admitAnnotations, handles: true}
ae := &auditinternal.Event{Level: auditinternal.LevelMetadata}
auditHandler := WithAudit(handler, ae)
a := attributes()

// Simulate the scenario store.DeleteCollection
workers := 2
wg := &sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
mutator, ok := handler.(MutationInterface)
require.True(t, ok)
auditMutator, ok := auditHandler.(MutationInterface)
require.True(t, ok)
assert.Equal(t, mutator.Admit(context.TODO(), a, nil), auditMutator.Admit(context.TODO(), a, nil), "WithAudit decorator should not effect the return value")
}()
}
wg.Wait()
}

0 comments on commit 298c6d0

Please sign in to comment.