Skip to content

Commit

Permalink
Add protection on namespace cache in case namespace is not created (#824
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wxing1292 committed Oct 15, 2020
1 parent 9d5c94e commit b25bd3d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 22 deletions.
61 changes: 48 additions & 13 deletions common/cache/namespaceCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package cache

import (
"fmt"
"hash/fnv"
"sort"
"strconv"
Expand Down Expand Up @@ -65,6 +66,8 @@ const (
namespaceCacheInitialSize = 10 * 1024
namespaceCacheMaxSize = 64 * 1024
namespaceCacheTTL = 0 // 0 means infinity
// NamespaceCacheMinRefreshInterval is a minimun namespace cache refresh interval.
NamespaceCacheMinRefreshInterval = 2 * time.Second
// NamespaceCacheRefreshInterval namespace cache refresh interval
NamespaceCacheRefreshInterval = 10 * time.Second
// NamespaceCacheRefreshFailureRetryInterval is the wait time
Expand Down Expand Up @@ -116,7 +119,10 @@ type (

// refresh lock is used to guarantee at most one
// coroutine is doing namespace refreshment
refreshLock sync.Mutex
refreshLock sync.Mutex
lastRefreshTime atomic.Value
checkLock sync.Mutex
lastCheckTime time.Time

callbackLock sync.Mutex
prepareCallbacks map[int32]PrepareCallbackFn
Expand Down Expand Up @@ -165,6 +171,7 @@ func NewNamespaceCache(
}
cache.cacheNameToID.Store(newNamespaceCache())
cache.cacheByID.Store(newNamespaceCache())
cache.lastRefreshTime.Store(time.Time{})

return cache
}
Expand Down Expand Up @@ -316,8 +323,8 @@ func (c *namespaceCache) RegisterNamespaceChangeCallback(
// with namespace change version.
sort.Sort(namespaces)

prevEntries := []*NamespaceCacheEntry{}
nextEntries := []*NamespaceCacheEntry{}
var prevEntries []*NamespaceCacheEntry
var nextEntries []*NamespaceCacheEntry
for _, namespace := range namespaces {
if namespace.notificationVersion >= initialNotificationVersion {
prevEntries = append(prevEntries, nil)
Expand Down Expand Up @@ -420,6 +427,8 @@ func (c *namespaceCache) refreshNamespaces() error {
// this function only refresh the namespaces in the v2 table
// the namespaces in the v1 table will be refreshed if cache is stale
func (c *namespaceCache) refreshNamespacesLocked() error {
now := c.timeSource.Now()

// first load the metadata record, then load namespaces
// this can guarantee that namespaces in the cache are not updated more than metadata record
metadata, err := c.metadataMgr.GetMetadata()
Expand Down Expand Up @@ -451,8 +460,8 @@ func (c *namespaceCache) refreshNamespacesLocked() error {
// with namespace change version.
sort.Sort(namespaces)

prevEntries := []*NamespaceCacheEntry{}
nextEntries := []*NamespaceCacheEntry{}
var prevEntries []*NamespaceCacheEntry
var nextEntries []*NamespaceCacheEntry

// make a copy of the existing namespace cache, so we can calculate diff and do compare and swap
newCacheNameToID := newNamespaceCache()
Expand Down Expand Up @@ -492,16 +501,35 @@ UpdateLoop:
c.cacheByID.Store(newCacheByID)
c.cacheNameToID.Store(newCacheNameToID)
c.triggerNamespaceChangeCallbackLocked(prevEntries, nextEntries)

// only update last refresh time when refresh succeeded
c.lastRefreshTime.Store(now)
return nil
}

func (c *namespaceCache) checkNamespaceExists(
func (c *namespaceCache) checkAndContinue(
name string,
id string,
) error {
) (bool, error) {
now := c.timeSource.Now()
if now.Sub(c.lastRefreshTime.Load().(time.Time)) < NamespaceCacheMinRefreshInterval {
return false, nil
}

c.checkLock.Lock()
defer c.checkLock.Unlock()

now = c.timeSource.Now()
if now.Sub(c.lastCheckTime) < NamespaceCacheMinRefreshInterval {
return true, nil
}

c.lastCheckTime = now
_, err := c.metadataMgr.GetNamespace(&persistence.GetNamespaceRequest{Name: name, ID: id})
return err
if err != nil {
return false, err
}
return true, nil
}

func (c *namespaceCache) updateNameToIDCache(
Expand Down Expand Up @@ -563,9 +591,13 @@ func (c *namespaceCache) getNamespace(
return c.getNamespaceByID(id, true)
}

if err := c.checkNamespaceExists(name, ""); err != nil {
doContinue, err := c.checkAndContinue(name, "")
if err != nil {
return nil, err
}
if !doContinue {
return nil, serviceerror.NewNotFound(fmt.Sprintf("namespace: %v not found", name))
}

c.refreshLock.Lock()
defer c.refreshLock.Unlock()
Expand All @@ -581,7 +613,7 @@ func (c *namespaceCache) getNamespace(
return c.getNamespaceByID(id, true)
}
// impossible case
return nil, serviceerror.NewInternal("namespaceCache encounter case where namespace exists but cannot be loaded")
return nil, serviceerror.NewNotFound(fmt.Sprintf("namespace: %v not found", name))
}

// getNamespaceByID retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
Expand All @@ -603,9 +635,13 @@ func (c *namespaceCache) getNamespaceByID(
return result, nil
}

if err := c.checkNamespaceExists("", id); err != nil {
doContinue, err := c.checkAndContinue("", id)
if err != nil {
return nil, err
}
if !doContinue {
return nil, serviceerror.NewNotFound(fmt.Sprintf("namespace ID: %v not found", id))
}

c.refreshLock.Lock()
defer c.refreshLock.Unlock()
Expand All @@ -632,8 +668,7 @@ func (c *namespaceCache) getNamespaceByID(
entry.RUnlock()
return result, nil
}
// impossible case
return nil, serviceerror.NewInternal("namespaceCache encounter case where namespace exists but cannot be loaded")
return nil, serviceerror.NewNotFound(fmt.Sprintf("namespace ID: %v not found", id))
}

func (c *namespaceCache) triggerNamespaceChangePrepareCallbackLocked() {
Expand Down
32 changes: 23 additions & 9 deletions common/cache/namespaceCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (s *namespaceCacheSuite) TestRegisterCallback_CatchUp() {
s.Nil(s.namespaceCache.refreshNamespaces())

prepareCallbacckInvoked := false
entriesNotification := []*NamespaceCacheEntry{}
var entriesNotification []*NamespaceCacheEntry
// we are not testing catching up, so make this really large
currentNamespaceNotificationVersion := int64(0)
s.namespaceCache.RegisterNamespaceChangeCallback(
Expand Down Expand Up @@ -480,8 +480,8 @@ func (s *namespaceCacheSuite) TestUpdateCache_TriggerCallBack() {
namespaceNotificationVersion++

prepareCallbacckInvoked := false
entriesOld := []*NamespaceCacheEntry{}
entriesNew := []*NamespaceCacheEntry{}
var entriesOld []*NamespaceCacheEntry
var entriesNew []*NamespaceCacheEntry
// we are not testing catching up, so make this really large
currentNamespaceNotificationVersion := int64(9999999)
s.namespaceCache.RegisterNamespaceChangeCallback(
Expand Down Expand Up @@ -559,12 +559,26 @@ func (s *namespaceCacheSuite) TestGetTriggerListAndUpdateCache_ConcurrentAccess(
testGetFn := func() {
<-startChan
entryNew, err := s.namespaceCache.GetNamespaceByID(id)
s.Nil(err)
// make the config version the same so we can easily compare those
entryNew.configVersion = 0
entryNew.failoverVersion = 0
s.Equal(entryOld, entryNew)
waitGroup.Done()
switch err.(type) {
case nil:
// make the config version the same so we can easily compare those
entryNew.configVersion = 0
entryNew.failoverVersion = 0
s.Equal(entryOld, entryNew)
waitGroup.Done()
case *serviceerror.NotFound:
time.Sleep(2 * NamespaceCacheMinRefreshInterval)
entryNew, err := s.namespaceCache.GetNamespaceByID(id)
s.NoError(err)
// make the config version the same so we can easily compare those
entryNew.configVersion = 0
entryNew.failoverVersion = 0
s.Equal(entryOld, entryNew)
waitGroup.Done()
default:
s.NoError(err)
waitGroup.Done()
}
}

for i := 0; i < coroutineCountGet; i++ {
Expand Down

0 comments on commit b25bd3d

Please sign in to comment.