Skip to content

Commit

Permalink
Use uuid as namespace notification ID (#2733)
Browse files Browse the repository at this point in the history
* Use uuid as namespace change listener ID, instead of shard ID
  • Loading branch information
wxing1292 committed Apr 19, 2022
1 parent 67e34a3 commit 78defe1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 24 deletions.
24 changes: 12 additions & 12 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ type (
// Registry provides access to Namespace objects by name or by ID.
Registry interface {
common.Daemon
RegisterNamespaceChangeCallback(shard int32, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn)
UnregisterNamespaceChangeCallback(shard int32)
RegisterNamespaceChangeCallback(listenerID string, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn)
UnregisterNamespaceChangeCallback(listenerID string)
GetNamespace(name Name) (*Namespace, error)
GetNamespaceByID(id ID) (*Namespace, error)
GetNamespaceID(name Name) (ID, error)
Expand Down Expand Up @@ -162,8 +162,8 @@ type (
// cacheLock.Lock() (the other lock in this struct, above) while holding
// this lock or you risk a deadlock.
callbackLock sync.Mutex
prepareCallbacks map[int32]PrepareCallbackFn
callbacks map[int32]CallbackFn
prepareCallbacks map[string]PrepareCallbackFn
callbacks map[string]CallbackFn
}
)

Expand All @@ -184,8 +184,8 @@ func NewRegistry(
logger: logger,
cacheNameToID: cache.New(cacheMaxSize, &cacheOpts),
cacheByID: cache.New(cacheMaxSize, &cacheOpts),
prepareCallbacks: make(map[int32]PrepareCallbackFn),
callbacks: make(map[int32]CallbackFn),
prepareCallbacks: make(map[string]PrepareCallbackFn),
callbacks: make(map[string]CallbackFn),
}
reg.lastRefreshTime.Store(time.Time{})
return reg
Expand Down Expand Up @@ -244,15 +244,15 @@ func (r *registry) getAllNamespace() map[ID]*Namespace {
// callback functions MUST NOT call back into this registry instance, either to
// unregister themselves or to look up Namespaces.
func (r *registry) RegisterNamespaceChangeCallback(
shard int32,
listenerID string,
initialNotificationVersion int64,
prepareCallback PrepareCallbackFn,
callback CallbackFn,
) {

r.callbackLock.Lock()
r.prepareCallbacks[shard] = prepareCallback
r.callbacks[shard] = callback
r.prepareCallbacks[listenerID] = prepareCallback
r.callbacks[listenerID] = callback
r.callbackLock.Unlock()

// this section is trying to make the shard catch up with namespace changes
Expand Down Expand Up @@ -281,13 +281,13 @@ func (r *registry) RegisterNamespaceChangeCallback(

// UnregisterNamespaceChangeCallback delete a namespace failover callback
func (r *registry) UnregisterNamespaceChangeCallback(
shard int32,
listenerID string,
) {
r.callbackLock.Lock()
defer r.callbackLock.Unlock()

delete(r.prepareCallbacks, shard)
delete(r.callbacks, shard)
delete(r.prepareCallbacks, listenerID)
delete(r.callbacks, listenerID)
}

// GetNamespace retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
Expand Down
16 changes: 8 additions & 8 deletions common/namespace/registry_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions common/namespace/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *registrySuite) TestRegisterCallback_CatchUp() {
// we are not testing catching up, so make this really large
currentNamespaceNotificationVersion := int64(0)
s.registry.RegisterNamespaceChangeCallback(
0,
"0",
currentNamespaceNotificationVersion,
func() {
prepareCallbackInvoked = true
Expand Down Expand Up @@ -428,7 +428,7 @@ func (s *registrySuite) TestUpdateCache_TriggerCallBack() {
// we are not testing catching up, so make this really large
currentNamespaceNotificationVersion := int64(9999999)
s.registry.RegisterNamespaceChangeCallback(
0,
"0",
currentNamespaceNotificationVersion,
func() {
prepareCallbackInvoked = true
Expand Down
6 changes: 4 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type (
historyEngineImpl struct {
status int32
currentClusterName string
namespaceChangeListenerID string
shard shard.Context
timeSource clock.TimeSource
workflowTaskHandler workflowTaskHandlerCallbacks
Expand Down Expand Up @@ -157,6 +158,7 @@ func NewEngineWithShardContext(
historyEngImpl := &historyEngineImpl{
status: common.DaemonStatusInitialized,
currentClusterName: currentClusterName,
namespaceChangeListenerID: uuid.New(),
shard: shard,
clusterMetadata: shard.GetClusterMetadata(),
timeSource: shard.GetTimeSource(),
Expand Down Expand Up @@ -286,7 +288,7 @@ func (e *historyEngineImpl) Stop() {
e.replicationTaskProcessorsLock.Unlock()

// unset the failover callback
e.shard.GetNamespaceRegistry().UnregisterNamespaceChangeCallback(e.shard.GetShardID())
e.shard.GetNamespaceRegistry().UnregisterNamespaceChangeCallback(e.namespaceChangeListenerID)
}

func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
Expand Down Expand Up @@ -323,7 +325,7 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {

// first set the failover callback
e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback(
e.shard.GetShardID(),
e.namespaceChangeListenerID,
0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */
func() {
for _, queueProcessor := range e.queueProcessors {
Expand Down

0 comments on commit 78defe1

Please sign in to comment.