Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Fix deadlock and cleaner cache APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Dec 22, 2017
1 parent 21ecb52 commit 1a58083
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 57 deletions.
5 changes: 4 additions & 1 deletion matcher/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ type Cache interface {
// between [fromNanos, toNanos).
ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult

// Register sets the result source for a given namespace.
// Register sets the source for a given namespace.
Register(namespace []byte, source Source)

// Update updates the source for a given namespace.
Update(namespace []byte, source Source)

// Unregister deletes the cached results for a given namespace.
Unregister(namespace []byte)

Expand Down
97 changes: 69 additions & 28 deletions matcher/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,36 @@ func newResults(source matcher.Source) results {
}

type cacheMetrics struct {
hits tally.Counter
misses tally.Counter
expires tally.Counter
registers tally.Counter
unregisters tally.Counter
promotions tally.Counter
evictions tally.Counter
deletions tally.Counter
hits tally.Counter
misses tally.Counter
expires tally.Counter
registers tally.Counter
registerExists tally.Counter
updates tally.Counter
updateNotExists tally.Counter
updateStaleSource tally.Counter
unregisters tally.Counter
unregisterNotExists tally.Counter
promotions tally.Counter
evictions tally.Counter
deletions tally.Counter
}

func newCacheMetrics(scope tally.Scope) cacheMetrics {
return cacheMetrics{
hits: scope.Counter("hits"),
misses: scope.Counter("misses"),
expires: scope.Counter("expires"),
registers: scope.Counter("registers"),
unregisters: scope.Counter("unregisters"),
promotions: scope.Counter("promotions"),
evictions: scope.Counter("evictions"),
deletions: scope.Counter("deletions"),
hits: scope.Counter("hits"),
misses: scope.Counter("misses"),
expires: scope.Counter("expires"),
registers: scope.Counter("registers"),
registerExists: scope.Counter("register-exists"),
updates: scope.Counter("updates"),
updateNotExists: scope.Counter("update-not-exists"),
updateStaleSource: scope.Counter("update-stale-source"),
unregisters: scope.Counter("unregisters"),
unregisterNotExists: scope.Counter("unregister-not-exists"),
promotions: scope.Counter("promotions"),
evictions: scope.Counter("evictions"),
deletions: scope.Counter("deletions"),
}
}

Expand Down Expand Up @@ -155,34 +165,65 @@ func (c *cache) ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rul

func (c *cache) Register(namespace []byte, source matcher.Source) {
c.Lock()
defer c.Unlock()

nsHash := xid.HashFn(namespace)
if results, exist := c.namespaces[nsHash]; !exist {
c.namespaces[nsHash] = newResults(source)
c.metrics.registers.Inc(1)
} else {
c.updateWithLock(nsHash, source, results)
c.metrics.registerExists.Inc(1)
}
}

func (c *cache) Update(namespace []byte, source matcher.Source) {
c.Lock()
defer c.Unlock()

nsHash := xid.HashFn(namespace)
results, exist := c.namespaces[nsHash]
// NB: The namespace does not exist yet. This could happen if the source update came
// before its namespace is registered. It is safe to ignore this premature update
// because the namespace will eventually register itself and refreshes the cache.
if !exist {
results = newResults(source)
} else {
// Invalidate existing cached results.
c.toDelete = append(c.toDelete, results.elems)
c.notifyDeletion()
results.source = source
results.elems = make(elemMap)
c.metrics.updateNotExists.Inc(1)
return
}
// NB: The source to update is different from what's stored in the cache. This could
// happen if the namespace is changed, removed, and then revived before the rule change
// could be processed. It is safe to ignore this stale update because the last rule
// change update will eventually be processed and the cache will be refreshed.
if results.source != source {
c.metrics.updateStaleSource.Inc(1)
return
}
c.updateWithLock(nsHash, source, results)
c.metrics.updates.Inc(1)
}

// updateWithLock clears the existing cached results for namespace nsHash
// and associates the namespace results with a new source.
func (c *cache) updateWithLock(nsHash xid.Hash, source matcher.Source, results results) {
c.toDelete = append(c.toDelete, results.elems)
c.notifyDeletion()
results.source = source
results.elems = make(elemMap)
c.namespaces[nsHash] = results
c.Unlock()
c.metrics.registers.Inc(1)
}

func (c *cache) Unregister(namespace []byte) {
c.Lock()
defer c.Unlock()

nsHash := xid.HashFn(namespace)
results, exists := c.namespaces[nsHash]
if !exists {
c.Unlock()
c.metrics.unregisterNotExists.Inc(1)
return
}
delete(c.namespaces, nsHash)
c.toDelete = append(c.toDelete, results.elems)
c.Unlock()

c.notifyDeletion()
c.metrics.unregisters.Inc(1)
}
Expand Down
54 changes: 54 additions & 0 deletions matcher/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,60 @@ func TestCacheRegisterNamespaceExists(t *testing.T) {
require.Equal(t, source, c.namespaces[nsHash].source)
}

func TestCacheUpdateNamespaceDoesNotExist(t *testing.T) {
opts := testCacheOptions()
c := NewCache(opts).(*cache)
require.Equal(t, 0, len(c.namespaces))

var (
ns = []byte("ns")
source = newMockSource()
)
c.Update(ns, source)
require.Equal(t, 0, len(c.namespaces))
}

func TestCacheUpdateStaleSource(t *testing.T) {
opts := testCacheOptions()
c := NewCache(opts).(*cache)
require.Equal(t, 0, len(c.namespaces))

var (
ns = []byte("ns")
nsHash = xid.HashFn(ns)
source1 = newMockSource()
source2 = newMockSource()
)
c.Register(ns, source1)
require.Equal(t, 1, len(c.namespaces))

c.Update(ns, source2)
require.Equal(t, 1, len(c.namespaces))
require.Equal(t, source1, c.namespaces[nsHash].source)
}

func TestCacheUpdateSuccess(t *testing.T) {
opts := testCacheOptions()
c := NewCache(opts).(*cache)
now := time.Now()
c.nowFn = func() time.Time { return now }

var (
ns = testValues[0].namespace
nsHash = xid.HashFn(ns)
src = newMockSource()
)
populateCache(c, []testValue{testValues[0]}, now, src, populateBoth)
require.Equal(t, 1, len(c.namespaces))
require.Equal(t, 1, len(c.namespaces[nsHash].elems))
require.Equal(t, src, c.namespaces[nsHash].source)

c.Update(ns, src)
require.Equal(t, 1, len(c.namespaces))
require.Equal(t, 0, len(c.namespaces[nsHash].elems))
require.Equal(t, src, c.namespaces[nsHash].source)
}

func TestCacheUnregisterNamespaceDoesNotExist(t *testing.T) {
opts := testCacheOptions()
c := NewCache(opts).(*cache)
Expand Down
2 changes: 1 addition & 1 deletion matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewMatcher(cache Cache, opts Options) (Matcher, error) {
cache.Unregister(namespace)
}).
SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
cache.Update(namespace, ruleSet)
})
key := opts.NamespacesKey()
namespaces := NewNamespaces(key, namespacesOpts)
Expand Down
21 changes: 18 additions & 3 deletions matcher/namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package matcher

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -268,10 +269,24 @@ func (c *memCache) Register(namespace []byte, source Source) {
results: make(map[string]rules.MatchResult),
source: source,
}
} else {
results.source = source
c.namespaces[string(namespace)] = results
return
}
panic(fmt.Errorf("re-registering existing namespace %s", namespace))
}

func (c *memCache) Update(namespace []byte, source Source) {
c.Lock()
defer c.Unlock()

results, exists := c.namespaces[string(namespace)]
if !exists || results.source != source {
return
}
c.namespaces[string(namespace)] = memResults{
results: make(map[string]rules.MatchResult),
source: source,
}
c.namespaces[string(namespace)] = results
}

func (c *memCache) Unregister(namespace []byte) {
Expand Down
6 changes: 4 additions & 2 deletions matcher/ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,15 @@ func (r *ruleSet) toRuleSet(value kv.Value) (interface{}, error) {
// process processes an ruleset update.
func (r *ruleSet) process(value interface{}) error {
r.Lock()
defer r.Unlock()

ruleSet := value.(rules.RuleSet)
r.version = ruleSet.Version()
r.cutoverNanos = ruleSet.CutoverNanos()
r.tombstoned = ruleSet.Tombstoned()
r.matcher = ruleSet.ActiveSet(r.nowFn().Add(-r.matchRangePast).UnixNano())
r.Unlock()

// NB: calling the update callback outside the ruleset lock to avoid circular
// lock dependency causing a deadlock.
if r.onRuleSetUpdatedFn != nil {
r.onRuleSetUpdatedFn(r.namespace, r)
}
Expand Down
Loading

0 comments on commit 1a58083

Please sign in to comment.