Skip to content

Commit

Permalink
Introduce circuit breaker into objectTracker
Browse files Browse the repository at this point in the history
Introduces a circuit breaker into objectTracker which is tripped once
expectations have been met. When tripped, internal state tracking memory
can be freed and subsequent operations will not consume additional
memory in the tracker.

Closes open-policy-agent#660

Signed-off-by: Oren Shomron <shomron@gmail.com>
  • Loading branch information
shomron committed Jun 15, 2020
1 parent 606d5ba commit 70f59cd
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 33 deletions.
96 changes: 64 additions & 32 deletions pkg/readiness/object_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ type Expectations interface {
// Expectations are satisfied by calls to Observe().
// Once all expectations are satisfied, Satisfied() will begin returning true.
type objectTracker struct {
mu sync.RWMutex
gvk schema.GroupVersionKind
cancelled objSet // expectations that have been cancelled
expect objSet // unresolved expectations
seen objSet // observations made before their expectations
satisfied objSet // tracked to avoid re-adding satisfied expectations and to support unsatisfied()
populated bool // all expectations have been provided
mu sync.RWMutex
gvk schema.GroupVersionKind
cancelled objSet // expectations that have been cancelled
expect objSet // unresolved expectations
seen objSet // observations made before their expectations
satisfied objSet // tracked to avoid re-adding satisfied expectations and to support unsatisfied()
populated bool // all expectations have been provided
allSatisfied bool // true once all expectations have been satified. Acts as a circuit-breaker.
}

func newObjTracker(gvk schema.GroupVersionKind) *objectTracker {
Expand All @@ -72,13 +73,14 @@ func (t *objectTracker) Expect(o runtime.Object) {
return
}

accessor, err := meta.Accessor(o)
if err != nil {
// Respect circuit-breaker.
if t.allSatisfied {
return
}

// Don't expect resources which are being terminated.
if !accessor.GetDeletionTimestamp().IsZero() {
accessor, err := meta.Accessor(o)
if err == nil && !accessor.GetDeletionTimestamp().IsZero() {
return
}

Expand Down Expand Up @@ -109,6 +111,12 @@ func (t *objectTracker) Expect(o runtime.Object) {
func (t *objectTracker) CancelExpect(o runtime.Object) {
t.mu.Lock()
defer t.mu.Unlock()

// Respect circuit-breaker.
if t.allSatisfied {
return
}

k, err := objKeyFromObject(o)
if err != nil {
log.Error(err, "skipping")
Expand Down Expand Up @@ -152,6 +160,11 @@ func (t *objectTracker) Observe(o runtime.Object) {
t.mu.Lock()
defer t.mu.Unlock()

// Respect circuit-breaker.
if t.allSatisfied {
return
}

k, err := objKeyFromObject(o)
if err != nil {
log.Error(err, "skipping")
Expand All @@ -163,6 +176,11 @@ func (t *objectTracker) Observe(o runtime.Object) {
return
}

// Ignore satisfied expectations
if _, ok := t.satisfied[k]; ok {
return
}

_, wasExpecting := t.expect[k]
switch {
case wasExpecting:
Expand Down Expand Up @@ -193,44 +211,58 @@ func (t *objectTracker) Populated() bool {
// Expectations must be populated before the tracker can be considered satisfied.
// Expectations are marked as populated by calling ExpectationsDone().
func (t *objectTracker) Satisfied() bool {
satisfied, seenKeys := func() (bool, []objKey) {
satisfied, needMutate := func() (bool, bool) {
t.mu.RLock()
defer t.mu.RUnlock()

if !t.populated {
return false, nil
}

if len(t.expect) == 0 {
return true, nil
}

// Resolve any expectations where the observation preceded the expect request.
var keys []objKey
for k := range t.seen {
if _, ok := t.expect[k]; !ok {
continue
}
keys = append(keys, k)
}
return false, keys
// We'll only need the write lock in certain conditions:
// 1. We haven't tripped the circuit breaker
// 2. Expectations have been previously populated
// 3. We have expectations and observations - some of these may match can be resolved.
needMutate :=
!t.allSatisfied &&
t.populated &&
((len(t.seen) > 0 && len(t.expect) > 0) || // ...are there expectations we can resolve?
(len(t.seen) == 0 && len(t.expect) == 0)) // ...is the circuit-breaker ready to flip?
return t.allSatisfied, needMutate
}()

if len(seenKeys) == 0 {
return satisfied
if satisfied {
return true
}

// Proceed only if we have state changes to make.
if !needMutate {
return false
}

// From here we need a write lock to mutate state.
t.mu.Lock()
defer t.mu.Unlock()

for _, k := range seenKeys {
// Resolve any expectations where the observation preceded the expect request.
for k := range t.seen {
if _, ok := t.expect[k]; !ok {
continue
}
delete(t.seen, k)
delete(t.expect, k)
t.satisfied[k] = struct{}{}
}

return len(t.expect) == 0
// All satisfied if:
// 1. Expectations have been previously populated
// 2. No expectations remain
if allSatisfied := t.populated && len(t.expect) == 0; allSatisfied {
t.allSatisfied = true

// Circuit-breaker tripped - free tracking memory
t.seen = nil
t.expect = nil
t.satisfied = nil
t.cancelled = nil
}
return t.allSatisfied
}

func (t *objectTracker) kinds() []schema.GroupVersionKind {
Expand Down
57 changes: 56 additions & 1 deletion pkg/readiness/object_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Test_ObjectTracker_Seen_Before_Expect(t *testing.T) {
}

// Verify that terminated resources are ignored when calling Expect.
func Test_ObjectTracker_Termintated_Expect(t *testing.T) {
func Test_ObjectTracker_Terminated_Expect(t *testing.T) {
g := gomega.NewWithT(t)
ot := newObjTracker(schema.GroupVersionKind{})
ct := makeCT("test-ct")
Expand Down Expand Up @@ -161,3 +161,58 @@ func Test_ObjectTracker_CancelBeforeExpect(t *testing.T) {

g.Expect(ot.Satisfied()).To(gomega.BeTrue())
}

// Verify that the allSatisfied circuit breaker keeps Satisfied==true and
// no other operations have any impact.
func Test_ObjectTracker_CircuitBreaker(t *testing.T) {
g := gomega.NewWithT(t)
ot := newObjTracker(schema.GroupVersionKind{})

const count = 10
ct := makeCTSlice("ct-", count)
for i := 0; i < len(ct); i++ {
ot.Expect(ct[i])
}

g.Expect(ot.Satisfied()).NotTo(gomega.BeTrue(), "should not be satisfied before ExpectationsDone")
ot.ExpectationsDone()

for i := 0; i < len(ct); i++ {
ot.Observe(ct[i])
}

g.Expect(ot.Satisfied()).To(gomega.BeTrue())

// The circuit-breaker should have been tripped. Let's try different operations
// and make sure they do not consume additional memory or affect the satisfied state.
for i := 0; i < len(ct); i++ {
ot.CancelExpect(ct[i])
ot.Observe(ct[i])
}

expectNoObserve := makeCTSlice("expectNoObserve-", count)
for i := 0; i < len(expectNoObserve); i++ {
// Expect resources we won't then observe
ot.Expect(expectNoObserve[i])
}
cancelNoObserve := makeCTSlice("cancelNoObserve-", count)
for i := 0; i < len(cancelNoObserve); i++ {
// Cancel resources we won't then observe
ot.CancelExpect(cancelNoObserve[i])
}
justObserve := makeCTSlice("justObserve-", count)
for i := 0; i < len(justObserve); i++ {
// Observe some unexpected resources
ot.Observe(justObserve[i])
}

g.Expect(ot.Satisfied()).To(gomega.BeTrue())

// Peek at internals - we should no be accruing memory from the post-circuit-breaker operations
ot.mu.RLock()
defer ot.mu.RUnlock()
g.Expect(ot.cancelled).To(gomega.BeEmpty())
g.Expect(ot.expect).To(gomega.BeEmpty())
g.Expect(ot.seen).To(gomega.BeEmpty())
g.Expect(ot.satisfied).To(gomega.BeEmpty())
}

0 comments on commit 70f59cd

Please sign in to comment.