Skip to content

Commit e4228f6

Browse files
committed
fixup! use singleflight.Group and fix data race
Signed-off-by: Bryce Palmer <bpalmer@redhat.com>
1 parent 6eaf02c commit e4228f6

File tree

2 files changed

+74
-92
lines changed

2 files changed

+74
-92
lines changed

openshift-kube-apiserver/admission/customresourcevalidation/authentication/validate_authentication.go

Lines changed: 50 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"time"
88

9+
"golang.org/x/sync/singleflight"
910
"k8s.io/apimachinery/pkg/api/validation"
1011
"k8s.io/apimachinery/pkg/runtime"
1112
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -33,7 +34,7 @@ func Register(plugins *admission.Plugins) {
3334
configv1.GroupVersion.WithKind("Authentication"): authenticationV1{
3435
cel: &celStore{
3536
compiledStore: lru.New(100),
36-
compilingStore: lru.New(100),
37+
compilingGroup: new(singleflight.Group),
3738
compiler: authenticationcel.NewDefaultCompiler(),
3839
},
3940
},
@@ -58,7 +59,7 @@ func toAuthenticationV1(uncastObj runtime.Object) (*configv1.Authentication, fie
5859
}
5960

6061
type celStore struct {
61-
compilingStore *lru.Cache
62+
compilingGroup *singleflight.Group
6263
compiledStore *lru.Cache
6364
compiler authenticationcel.Compiler
6465
}
@@ -266,69 +267,66 @@ func validateCELExpression(ctx context.Context, cel *celStore, accessor authenti
266267
return err, ""
267268
}
268269

269-
// expression is currently being compiled
270-
if val, ok := cel.compilingStore.Get(accessor.GetExpression()); ok {
271-
// we use a channel to determine if it has finished compiling
272-
select {
273-
case res := <-val.(chan compileResult):
274-
// done compiling
275-
return res.err, res.warn
276-
case <-ctx.Done(): // handle the case of context cancelling while waiting
277-
return ctx.Err(), ""
278-
}
279-
}
270+
result, err, _ := cel.compilingGroup.Do(accessor.GetExpression(), func() (interface{}, error) {
271+
// if the expression is not currently being compiled, it might have already been compiled
272+
if val, ok := cel.compiledStore.Get(accessor.GetExpression()); ok {
273+
if val != nil {
274+
res := val.(compileResult)
275+
return res, nil
276+
}
280277

281-
// if the expression is not currently being compiled, it might have already been compiled
282-
// either successfully or unsucessfully
283-
if val, ok := cel.compiledStore.Get(accessor.GetExpression()); ok {
284-
if val != nil {
285-
res := val.(compileResult)
286-
return res.err, res.warn
278+
return nil, nil
287279
}
288280

289-
return nil, ""
290-
}
291-
292-
// expression is not currently being compiled, and has not been compiled before (or has been long enough since it was last compiled that we dropped it).
293-
// Let's compile it.
294-
warningString := ""
295-
compiled := make(chan error)
296-
defer close(compiled)
281+
// expression is not currently being compiled, and has not been compiled before (or has been long enough since it was last compiled that we dropped it).
282+
// Let's compile it.
283+
warningString := ""
284+
compiled := make(chan error)
285+
defer close(compiled)
297286

298-
compiling := make(chan compileResult, 1)
299-
defer close(compiling)
287+
go func() {
288+
defer func() {
289+
if r := recover(); r != nil {
290+
// convert the panic into an error state for the expression
291+
compiled <- fmt.Errorf("recovered from a panic while compiling expression %q: %v", accessor.GetExpression(), r)
292+
}
293+
}()
300294

301-
cel.compilingStore.Add(accessor.GetExpression(), compiling)
302-
defer cel.compilingStore.Remove(accessor.GetExpression())
295+
_, err := cel.compiler.CompileClaimsExpression(accessor)
303296

304-
go func() {
305-
defer func() {
306-
if r := recover(); r != nil {
307-
// convert the panic into an error state for the expression
308-
compiled <- fmt.Errorf("recovered from a panic while compiling expression %q: %v", accessor.GetExpression(), r)
309-
}
297+
compiled <- err
310298
}()
311299

312-
_, err := cel.compiler.CompileClaimsExpression(accessor)
300+
warning := make(chan string, 1)
301+
timer := time.AfterFunc(time.Second, func() {
302+
defer close(warning)
303+
warning <- fmt.Sprintf("cel expression %q took more than 1 second to compile", accessor.GetExpression())
304+
})
313305

314-
compiled <- err
315-
}()
306+
err := <-compiled
316307

317-
timer := time.AfterFunc(time.Second, func() {
318-
warningString = fmt.Sprintf("cel expression %q took more than 1 second to compile", accessor.GetExpression())
319-
})
308+
timer.Stop()
320309

321-
err := <-compiled
322-
323-
timer.Stop()
310+
// check if we received a warning. If not, continue
311+
select {
312+
case warn := <-warning:
313+
warningString = warn
314+
default:
315+
break
316+
}
324317

325-
compilationResult := compileResult{
326-
err, warningString,
327-
}
318+
compilationResult := compileResult{
319+
err, warningString,
320+
}
328321

329-
cel.compiledStore.Add(accessor.GetExpression(), compilationResult)
322+
cel.compiledStore.Add(accessor.GetExpression(), compilationResult)
330323

331-
compiling <- compilationResult
324+
return compilationResult, nil
325+
})
326+
if err != nil {
327+
return fmt.Errorf("running compilation of expression %q: %v", accessor.GetExpression(), err), ""
328+
}
332329

333-
return compilationResult.err, compilationResult.warn
330+
compileRes := result.(compileResult)
331+
return compileRes.err, compileRes.warn
334332
}

openshift-kube-apiserver/admission/customresourcevalidation/authentication/validate_authentication_test.go

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
configv1 "github.com/openshift/api/config/v1"
12+
"golang.org/x/sync/singleflight"
1213
"k8s.io/apimachinery/pkg/util/rand"
1314
"k8s.io/apimachinery/pkg/util/validation/field"
1415
authenticationcel "k8s.io/apiserver/pkg/authentication/cel"
@@ -97,7 +98,7 @@ func TestFailValidateAuthenticationSpec(t *testing.T) {
9798
for tcName, tc := range errorCases {
9899
errs, _ := validateAuthenticationSpec(context.TODO(), tc.spec, &celStore{
99100
compiler: authenticationcel.NewDefaultCompiler(),
100-
compilingStore: lru.New(100),
101+
compilingGroup: new(singleflight.Group),
101102
compiledStore: lru.New(100),
102103
})
103104
if (len(errs) > 0) != (len(tc.errorType) != 0) {
@@ -188,7 +189,7 @@ func TestSucceedValidateAuthenticationSpec(t *testing.T) {
188189
for tcName, s := range successCases {
189190
errs, _ := validateAuthenticationSpec(context.TODO(), s, &celStore{
190191
compiler: authenticationcel.NewDefaultCompiler(),
191-
compilingStore: lru.New(100),
192+
compilingGroup: new(singleflight.Group),
192193
compiledStore: lru.New(100),
193194
})
194195
if len(errs) != 0 {
@@ -277,7 +278,7 @@ func TestValidateCELExpression(t *testing.T) {
277278
delay: 200 * time.Millisecond,
278279
err: nil,
279280
},
280-
compilingStore: lru.New(1),
281+
compilingGroup: new(singleflight.Group),
281282
compiledStore: lru.New(1),
282283
}
283284
},
@@ -291,7 +292,7 @@ func TestValidateCELExpression(t *testing.T) {
291292
delay: 1500 * time.Millisecond,
292293
err: nil,
293294
},
294-
compilingStore: lru.New(1),
295+
compilingGroup: new(singleflight.Group),
295296
compiledStore: lru.New(1),
296297
}
297298
},
@@ -306,7 +307,7 @@ func TestValidateCELExpression(t *testing.T) {
306307
delay: 1500 * time.Millisecond,
307308
err: errors.New("boom"),
308309
},
309-
compilingStore: lru.New(1),
310+
compilingGroup: new(singleflight.Group),
310311
compiledStore: lru.New(1),
311312
}
312313
},
@@ -322,7 +323,7 @@ func TestValidateCELExpression(t *testing.T) {
322323
delay: 1500 * time.Millisecond,
323324
err: nil,
324325
},
325-
compilingStore: lru.New(1),
326+
compilingGroup: new(singleflight.Group),
326327
compiledStore: lru.New(1),
327328
}
328329
},
@@ -336,45 +337,28 @@ func TestValidateCELExpression(t *testing.T) {
336337
{
337338
name: "waits for already compiling expression to finish compiling and returns its results",
338339
cel: func() *celStore {
339-
compilingLRU := lru.New(1)
340-
compiled := make(chan compileResult)
341-
compilingLRU.Add(expression.Expression, compiled)
342-
time.AfterFunc(time.Second, func() { compiled <- compileResult{err: errors.New("boom"), warn: "warning"} })
340+
compGroup := new(singleflight.Group)
341+
342+
_ = compGroup.DoChan(expression.Expression, func() (interface{}, error) {
343+
// Hog the group for a bit
344+
time.Sleep(time.Second)
345+
346+
return compileResult{
347+
err: errors.New("boom"),
348+
warn: "warning",
349+
}, nil
350+
})
343351

344352
return &celStore{
345353
compiler: nil, // should never end up calling this
346-
compilingStore: compilingLRU,
354+
compilingGroup: compGroup,
347355
compiledStore: lru.New(1),
348356
}
349357
},
350358
ctx: func() context.Context { return context.TODO() },
351359
shouldErr: true,
352360
shouldWarn: true,
353361
},
354-
{
355-
name: "stops waiting for already compiling expression if context is canceled",
356-
cel: func() *celStore {
357-
compilingLRU := lru.New(1)
358-
compiled := make(chan compileResult)
359-
compilingLRU.Add(expression.Expression, compiled)
360-
time.AfterFunc(time.Second, func() { close(compiled) })
361-
362-
return &celStore{
363-
compiler: nil, // should never end up calling this
364-
compilingStore: compilingLRU,
365-
compiledStore: lru.New(1),
366-
}
367-
},
368-
ctx: func() context.Context {
369-
ctx, cancel := context.WithCancel(context.TODO())
370-
time.AfterFunc(500*time.Millisecond, func() {
371-
cancel()
372-
})
373-
374-
return ctx
375-
},
376-
shouldErr: true,
377-
},
378362
{
379363
name: "returns already compiled expression results if the expression has been compiled before",
380364
cel: func() *celStore {
@@ -387,7 +371,7 @@ func TestValidateCELExpression(t *testing.T) {
387371

388372
return &celStore{
389373
compiler: nil, // should never end up calling this
390-
compilingStore: lru.New(1),
374+
compilingGroup: new(singleflight.Group),
391375
compiledStore: compiledLRU,
392376
}
393377
},
@@ -399,8 +383,8 @@ func TestValidateCELExpression(t *testing.T) {
399383
name: "handles panic in compilation goroutine",
400384
cel: func() *celStore {
401385
return &celStore{
402-
compiler: nil, // should never end up calling this
403-
compilingStore: lru.New(1),
386+
compiler: nil, // causes panic
387+
compilingGroup: new(singleflight.Group),
404388
compiledStore: lru.New(1),
405389
}
406390
},
@@ -459,7 +443,7 @@ func BenchmarkValidateClaimMappingsAllSameMaxed(b *testing.B) {
459443

460444
celStore := &celStore{
461445
compiler: authenticationcel.NewDefaultCompiler(),
462-
compilingStore: lru.New(100),
446+
compilingGroup: new(singleflight.Group),
463447
compiledStore: lru.New(100),
464448
}
465449
for b.Loop() {
@@ -486,7 +470,7 @@ func BenchmarkValidateClaimMappingsAllDifferentMaxed(b *testing.B) {
486470

487471
celStore := &celStore{
488472
compiler: authenticationcel.NewDefaultCompiler(),
489-
compilingStore: lru.New(100),
473+
compilingGroup: new(singleflight.Group),
490474
compiledStore: lru.New(100),
491475
}
492476
for b.Loop() {

0 commit comments

Comments
 (0)