diff --git a/staging/src/k8s.io/apiserver/pkg/admission/audit.go b/staging/src/k8s.io/apiserver/pkg/admission/audit.go index d1e103cfc6228..02694b0a91401 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/audit.go @@ -19,6 +19,7 @@ package admission import ( "context" "fmt" + "sync" auditinternal "k8s.io/apiserver/pkg/apis/audit" "k8s.io/apiserver/pkg/audit" @@ -27,7 +28,10 @@ import ( // auditHandler logs annotations set by other admission handlers type auditHandler struct { Interface - ae *auditinternal.Event + // TODO: move the lock near the Annotations field of the audit event so it is always protected from concurrent access. + // to protect the 'Annotations' map of the audit event from concurrent writes + mutex sync.Mutex + ae *auditinternal.Event } var _ Interface = &auditHandler{} @@ -42,10 +46,10 @@ func WithAudit(i Interface, ae *auditinternal.Event) Interface { if i == nil { return i } - return &auditHandler{i, ae} + return &auditHandler{Interface: i, ae: ae} } -func (handler auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error { +func (handler *auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInterfaces) error { if !handler.Interface.Handles(a.GetOperation()) { return nil } @@ -60,7 +64,7 @@ func (handler auditHandler) Admit(ctx context.Context, a Attributes, o ObjectInt return err } -func (handler auditHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error { +func (handler *auditHandler) Validate(ctx context.Context, a Attributes, o ObjectInterfaces) error { if !handler.Interface.Handles(a.GetOperation()) { return nil } @@ -84,10 +88,13 @@ func ensureAnnotationGetter(a Attributes) error { return fmt.Errorf("attributes must be an instance of privateAnnotationsGetter or AnnotationsGetter") } -func (handler auditHandler) logAnnotations(a Attributes) { +func (handler *auditHandler) logAnnotations(a Attributes) { if handler.ae == nil { return } + handler.mutex.Lock() + defer handler.mutex.Unlock() + switch a := a.(type) { case privateAnnotationsGetter: for key, value := range a.getAnnotations(handler.ae.Level) { diff --git a/staging/src/k8s.io/apiserver/pkg/admission/audit_test.go b/staging/src/k8s.io/apiserver/pkg/admission/audit_test.go index 3e1d3d2012646..32b7497e94974 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/audit_test.go @@ -19,6 +19,7 @@ package admission import ( "context" "fmt" + "sync" "testing" "k8s.io/apimachinery/pkg/runtime/schema" @@ -172,3 +173,32 @@ func TestWithAudit(t *testing.T) { } } } + +func TestWithAuditConcurrency(t *testing.T) { + admitAnnotations := map[string]string{ + "plugin.example.com/foo": "foo", + "plugin.example.com/bar": "bar", + "plugin.example.com/baz": "baz", + "plugin.example.com/qux": "qux", + } + var handler Interface = fakeHandler{admitAnnotations: admitAnnotations, handles: true} + ae := &auditinternal.Event{Level: auditinternal.LevelMetadata} + auditHandler := WithAudit(handler, ae) + a := attributes() + + // Simulate the scenario store.DeleteCollection + workers := 2 + wg := &sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + mutator, ok := handler.(MutationInterface) + require.True(t, ok) + auditMutator, ok := auditHandler.(MutationInterface) + require.True(t, ok) + assert.Equal(t, mutator.Admit(context.TODO(), a, nil), auditMutator.Admit(context.TODO(), a, nil), "WithAudit decorator should not effect the return value") + }() + } + wg.Wait() +}