Skip to content

Commit

Permalink
Remove deleted namespace from registry (#2567)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Mar 4, 2022
1 parent 40e9fe0 commit 1a89465
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 18 deletions.
38 changes: 20 additions & 18 deletions common/namespace/registry.go
Expand Up @@ -385,42 +385,44 @@ func (r *registry) refreshNamespaces(ctx context.Context) error {
}
namespaceNotificationVersion := metadata.NotificationVersion

var token []byte
request := &persistence.ListNamespacesRequest{PageSize: CacheRefreshPageSize}
var namespaces Namespaces
continuePage := true
var namespacesDb Namespaces
namespaceIDsDb := make(map[ID]struct{})

for continuePage {
request.NextPageToken = token
for {
response, err := r.persistence.ListNamespaces(request)
if err != nil {
return err
}
token = response.NextPageToken
for _, namespace := range response.Namespaces {
namespaces = append(namespaces, FromPersistentState(namespace))
for _, namespaceDb := range response.Namespaces {
namespacesDb = append(namespacesDb, FromPersistentState(namespaceDb))
namespaceIDsDb[ID(namespaceDb.Namespace.Info.Id)] = struct{}{}
}
if len(response.NextPageToken) == 0 {
break
}
continuePage = len(token) != 0
request.NextPageToken = response.NextPageToken
}

// we mush apply the namespace change by order
// since history shard have to update the shard info
// with namespace change version.
sort.Sort(namespaces)

var oldEntries []*Namespace
var newEntries []*Namespace
// Sort namespaces by notification version because changes must be applied in this order
// because history shard has to update the shard info with namespace change version.
sort.Sort(namespacesDb)

// make a copy of the existing namespace cache, so we can calculate diff and do compare and swap
// Make a copy of the existing namespace cache (excluding deleted), so we can calculate diff and do "compare and swap".
newCacheNameToID := cache.New(cacheMaxSize, &cacheOpts)
newCacheByID := cache.New(cacheMaxSize, &cacheOpts)
for _, namespace := range r.getAllNamespace() {
if _, namespaceExistsDb := namespaceIDsDb[namespace.ID()]; !namespaceExistsDb {
continue
}
newCacheNameToID.Put(Name(namespace.info.Name), ID(namespace.info.Id))
newCacheByID.Put(ID(namespace.info.Id), namespace)
}

var oldEntries []*Namespace
var newEntries []*Namespace
UpdateLoop:
for _, namespace := range namespaces {
for _, namespace := range namespacesDb {
if namespace.notificationVersion >= namespaceNotificationVersion {
// this guarantee that namespace change events before the
// namespaceNotificationVersion is loaded into the cache.
Expand Down
99 changes: 99 additions & 0 deletions common/namespace/registry_test.go
Expand Up @@ -529,6 +529,105 @@ func (s *registrySuite) TestGetTriggerListAndUpdateCache_ConcurrentAccess() {
waitGroup.Wait()
}

func (s *registrySuite) TestRemoveDeletedNamespace() {
namespaceNotificationVersion := int64(0)
namespaceRecord1 := &persistence.GetNamespaceResponse{
Namespace: &persistencespb.NamespaceDetail{
Info: &persistencespb.NamespaceInfo{
Id: namespace.NewID().String(),
Name: "some random namespace name",
Data: make(map[string]string)},
Config: &persistencespb.NamespaceConfig{
Retention: timestamp.DurationFromDays(1),
BadBinaries: &namespacepb.BadBinaries{
Binaries: map[string]*namespacepb.BadBinaryInfo{},
}},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []string{
cluster.TestCurrentClusterName,
cluster.TestAlternativeClusterName,
},
},
ConfigVersion: 10,
FailoverVersion: 11,
FailoverNotificationVersion: 0,
},
NotificationVersion: namespaceNotificationVersion,
}
namespaceNotificationVersion++

namespaceRecord2 := &persistence.GetNamespaceResponse{
Namespace: &persistencespb.NamespaceDetail{
Info: &persistencespb.NamespaceInfo{
Id: namespace.NewID().String(),
Name: "another random namespace name",
Data: make(map[string]string)},
Config: &persistencespb.NamespaceConfig{
Retention: timestamp.DurationFromDays(2),
BadBinaries: &namespacepb.BadBinaries{
Binaries: map[string]*namespacepb.BadBinaryInfo{},
}},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName,
Clusters: []string{
cluster.TestCurrentClusterName,
cluster.TestAlternativeClusterName,
},
},
ConfigVersion: 20,
FailoverVersion: 21,
FailoverNotificationVersion: 0,
},
NotificationVersion: namespaceNotificationVersion,
}
namespaceNotificationVersion++

s.regPersistence.EXPECT().GetMetadata().Return(
&persistence.GetMetadataResponse{
NotificationVersion: namespaceNotificationVersion,
}, nil)
s.regPersistence.EXPECT().ListNamespaces(&persistence.ListNamespacesRequest{
PageSize: namespace.CacheRefreshPageSize,
NextPageToken: nil,
}).Return(&persistence.ListNamespacesResponse{
Namespaces: []*persistence.GetNamespaceResponse{
namespaceRecord1,
namespaceRecord2},
NextPageToken: nil,
}, nil)

// load namespaces
s.registry.Start()
defer s.registry.Stop()

s.regPersistence.EXPECT().GetMetadata().Return(
&persistence.GetMetadataResponse{
NotificationVersion: namespaceNotificationVersion,
}, nil)
s.regPersistence.EXPECT().ListNamespaces(&persistence.ListNamespacesRequest{
PageSize: namespace.CacheRefreshPageSize,
NextPageToken: nil,
}).Return(&persistence.ListNamespacesResponse{
Namespaces: []*persistence.GetNamespaceResponse{
// namespaceRecord1 is removed
namespaceRecord2},
NextPageToken: nil,
}, nil)

s.registry.Refresh()

ns2FromRegistry, err := s.registry.GetNamespace(namespace.Name(namespaceRecord2.Namespace.Info.Name))
s.NotNil(ns2FromRegistry)
s.NoError(err)

ns1FromRegistry, err := s.registry.GetNamespace(namespace.Name(namespaceRecord1.Namespace.Info.Name))
s.Nil(ns1FromRegistry)
s.Error(err)
var notFound *serviceerror.NotFound
s.ErrorAs(err, &notFound)
}

func TestCacheByName(t *testing.T) {
nsrec := persistence.GetNamespaceResponse{
Namespace: &persistencespb.NamespaceDetail{
Expand Down

0 comments on commit 1a89465

Please sign in to comment.