Skip to content

Commit

Permalink
Exclude deleted namespaces from ListNamespace response (#2646)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Mar 26, 2022
1 parent 4152b17 commit 752ff50
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 62 deletions.
5 changes: 3 additions & 2 deletions common/namespace/handler.go
Expand Up @@ -301,8 +301,9 @@ func (d *HandlerImpl) ListNamespaces(
}

resp, err := d.metadataMgr.ListNamespaces(ctx, &persistence.ListNamespacesRequest{
PageSize: pageSize,
NextPageToken: listRequest.NextPageToken,
PageSize: pageSize,
NextPageToken: listRequest.NextPageToken,
IncludeDeleted: listRequest.GetNamespaceFilter().GetIncludeDeleted(),
})

if err != nil {
Expand Down
73 changes: 46 additions & 27 deletions common/persistence/cassandra/metadata_store.go
Expand Up @@ -307,46 +307,65 @@ func (m *MetadataStore) GetNamespace(request *p.GetNamespaceRequest) (*p.Interna
}, nil
}

func (m *MetadataStore) ListNamespaces(request *p.ListNamespacesRequest) (*p.InternalListNamespacesResponse, error) {
func (m *MetadataStore) ListNamespaces(request *p.InternalListNamespacesRequest) (*p.InternalListNamespacesResponse, error) {
query := m.session.Query(templateListNamespaceQueryV2, constNamespacePartition)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()

pageSize := request.PageSize
nextPageToken := request.NextPageToken
response := &p.InternalListNamespacesResponse{}

for {
var name string
var detail []byte
var detailEncoding string
var notificationVersion int64
var isGlobal bool
if !iter.Scan(
nil,
&name,
&detail,
&detailEncoding,
&notificationVersion,
&isGlobal,
) {
// done iterating over all namespaces in this page
break
}
iter := query.PageSize(pageSize).PageState(nextPageToken).Iter()
skippedRows := 0

for {
var name string
var detail []byte
var detailEncoding string
var notificationVersion int64
var isGlobal bool
if !iter.Scan(
nil,
&name,
&detail,
&detailEncoding,
&notificationVersion,
&isGlobal,
) {
// done iterating over all namespaces in this page
break
}

// do not include the metadata record
if name != namespaceMetadataRecordName {
// do not include the metadata record
if name == namespaceMetadataRecordName {
skippedRows++
continue
}
response.Namespaces = append(response.Namespaces, &p.InternalGetNamespaceResponse{
Namespace: p.NewDataBlob(detail, detailEncoding),
IsGlobal: isGlobal,
NotificationVersion: notificationVersion,
})
}
}
if len(iter.PageState()) > 0 {
nextPageToken = iter.PageState()
} else {
nextPageToken = nil
}
if err := iter.Close(); err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("ListNamespaces operation failed. Error: %v", err))
}

if len(iter.PageState()) > 0 {
response.NextPageToken = iter.PageState()
}
if err := iter.Close(); err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("ListNamespaces operation failed. Error: %v", err))
if len(nextPageToken) == 0 {
// No more records in DB.
break
}
if skippedRows == 0 {
break
}
pageSize = skippedRows
}

response.NextPageToken = nextPageToken
return response, nil
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/client/fault_injection.go
Expand Up @@ -741,7 +741,7 @@ func (m *FaultInjectionMetadataStore) DeleteNamespaceByName(request *persistence
return m.baseMetadataStore.DeleteNamespaceByName(request)
}

func (m *FaultInjectionMetadataStore) ListNamespaces(request *persistence.ListNamespacesRequest) (
func (m *FaultInjectionMetadataStore) ListNamespaces(request *persistence.InternalListNamespacesRequest) (
*persistence.InternalListNamespacesResponse,
error,
) {
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/dataInterfaces.go
Expand Up @@ -641,8 +641,9 @@ type (

// ListNamespacesRequest is used to list namespaces
ListNamespacesRequest struct {
PageSize int
NextPageToken []byte
PageSize int
NextPageToken []byte
IncludeDeleted bool
}

// ListNamespacesResponse is the response for GetNamespace
Expand Down
42 changes: 33 additions & 9 deletions common/persistence/metadata_manager.go
Expand Up @@ -195,21 +195,45 @@ func (m *metadataManagerImpl) ListNamespaces(
_ context.Context,
request *ListNamespacesRequest,
) (*ListNamespacesResponse, error) {
resp, err := m.persistence.ListNamespaces(request)
if err != nil {
return nil, err
}
namespaces := make([]*GetNamespaceResponse, 0, len(resp.Namespaces))
for _, d := range resp.Namespaces {
ret, err := m.ConvertInternalGetResponse(d)
var namespaces []*GetNamespaceResponse
nextPageToken := request.NextPageToken
pageSize := request.PageSize

for {
resp, err := m.persistence.ListNamespaces(&InternalListNamespacesRequest{
PageSize: pageSize,
NextPageToken: nextPageToken,
})
if err != nil {
return nil, err
}
namespaces = append(namespaces, ret)
deletedNamespacesCount := 0
for _, d := range resp.Namespaces {
ret, err := m.ConvertInternalGetResponse(d)
if err != nil {
return nil, err
}
if ret.Namespace.Info.State == enumspb.NAMESPACE_STATE_DELETED && !request.IncludeDeleted {
deletedNamespacesCount++
continue
}
namespaces = append(namespaces, ret)
}
nextPageToken = resp.NextPageToken
if len(nextPageToken) == 0 {
// Page wasn't full, no more namespaces in DB.
break
}
if deletedNamespacesCount == 0 {
break
}
// Page was full but few namespaces weren't added. Read number of deleted namespaces for DB again.
pageSize = deletedNamespacesCount
}

return &ListNamespacesResponse{
Namespaces: namespaces,
NextPageToken: resp.NextPageToken,
NextPageToken: nextPageToken,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/mock/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

118 changes: 113 additions & 5 deletions common/persistence/persistence-tests/metadataPersistenceV2Test.go
Expand Up @@ -1084,27 +1084,135 @@ func (m *MetadataPersistenceSuiteV2) TestListNamespaces() {
}

var token []byte
pageSize := 1
const pageSize = 1
pageCount := 0
outputNamespaces := make(map[string]*p.GetNamespaceResponse)
ListLoop:
for {
resp, err := m.ListNamespaces(pageSize, token)
m.NoError(err)
token = resp.NextPageToken
for _, namespace := range resp.Namespaces {
outputNamespaces[string(namespace.Namespace.Info.Id)] = namespace
outputNamespaces[namespace.Namespace.Info.Id] = namespace
// global notification version is already tested, so here we make it 0
// so we can test == easily
namespace.NotificationVersion = 0
}
pageCount++
if len(token) == 0 {
break ListLoop
break
}
}

// 2 pages with data and 1 empty page which is unavoidable.
m.Equal(pageCount, 3)
m.Equal(len(inputNamespaces), len(outputNamespaces))
for _, namespace := range inputNamespaces {
m.Equal(namespace, outputNamespaces[string(namespace.Namespace.Info.Id)])
m.Equal(namespace, outputNamespaces[namespace.Namespace.Info.Id])
}
}

func (m *MetadataPersistenceSuiteV2) TestListNamespaces_DeletedNamespace() {
inputNamespaces := []*p.GetNamespaceResponse{
{
Namespace: &persistencespb.NamespaceDetail{
Info: &persistencespb.NamespaceInfo{
Id: uuid.New(),
Name: "list-namespace-test-name-1",
State: enumspb.NAMESPACE_STATE_REGISTERED,
},
Config: &persistencespb.NamespaceConfig{},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{},
},
},
{
Namespace: &persistencespb.NamespaceDetail{
Info: &persistencespb.NamespaceInfo{
Id: uuid.New(),
Name: "list-namespace-test-name-2",
State: enumspb.NAMESPACE_STATE_DELETED,
},
Config: &persistencespb.NamespaceConfig{},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{},
},
},
{
Namespace: &persistencespb.NamespaceDetail{
Info: &persistencespb.NamespaceInfo{
Id: uuid.New(),
Name: "list-namespace-test-name-3",
State: enumspb.NAMESPACE_STATE_REGISTERED,
},
Config: &persistencespb.NamespaceConfig{},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{},
},
},
{
Namespace: &persistencespb.NamespaceDetail{
Info: &persistencespb.NamespaceInfo{
Id: uuid.New(),
Name: "list-namespace-test-name-4",
State: enumspb.NAMESPACE_STATE_DELETED,
},
Config: &persistencespb.NamespaceConfig{},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{},
},
},
}
for _, namespace := range inputNamespaces {
_, err := m.CreateNamespace(
namespace.Namespace.Info,
namespace.Namespace.Config,
namespace.Namespace.ReplicationConfig,
namespace.IsGlobalNamespace,
namespace.Namespace.ConfigVersion,
namespace.Namespace.FailoverVersion,
)
m.NoError(err)
}

var token []byte
var listNamespacesPageSize2 []*p.GetNamespaceResponse
pageCount := 0
for {
resp, err := m.ListNamespaces(2, token)
m.NoError(err)
token = resp.NextPageToken
for _, namespace := range resp.Namespaces {
listNamespacesPageSize2 = append(listNamespacesPageSize2, namespace)
}
pageCount++
if len(token) == 0 {
break
}
}

// 1 page with data and 1 empty page which is unavoidable.
m.Equal(2, pageCount)
m.Len(listNamespacesPageSize2, 2)
for _, namespace := range listNamespacesPageSize2 {
m.NotEqual(namespace.Namespace.Info.State, enumspb.NAMESPACE_STATE_DELETED)
}

pageCount = 0
var listNamespacesPageSize1 []*p.GetNamespaceResponse
for {
resp, err := m.ListNamespaces(1, token)
m.NoError(err)
token = resp.NextPageToken
for _, namespace := range resp.Namespaces {
listNamespacesPageSize1 = append(listNamespacesPageSize1, namespace)
}
pageCount++
if len(token) == 0 {
break
}
}

// 2 pages with data and 1 empty page which is unavoidable.
m.Equal(3, pageCount)
m.Len(listNamespacesPageSize1, 2)
for _, namespace := range listNamespacesPageSize1 {
m.NotEqual(namespace.Namespace.Info.State, enumspb.NAMESPACE_STATE_DELETED)
}
}

Expand Down
7 changes: 6 additions & 1 deletion common/persistence/persistenceInterface.go
Expand Up @@ -84,7 +84,7 @@ type (
RenameNamespace(request *InternalRenameNamespaceRequest) error
DeleteNamespace(request *DeleteNamespaceRequest) error
DeleteNamespaceByName(request *DeleteNamespaceByNameRequest) error
ListNamespaces(request *ListNamespacesRequest) (*InternalListNamespacesResponse, error)
ListNamespaces(request *InternalListNamespacesRequest) (*InternalListNamespacesResponse, error)
GetMetadata() (*GetMetadataResponse, error)
}

Expand Down Expand Up @@ -631,6 +631,11 @@ type (
PreviousName string
}

InternalListNamespacesRequest struct {
PageSize int
NextPageToken []byte
}

// InternalListNamespacesResponse is the response for GetNamespace
InternalListNamespacesResponse struct {
Namespaces []*InternalGetNamespaceResponse
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/metadata.go
Expand Up @@ -235,7 +235,7 @@ func (m *sqlMetadataManagerV2) GetMetadata() (*persistence.GetMetadataResponse,
return &persistence.GetMetadataResponse{NotificationVersion: row.NotificationVersion}, nil
}

func (m *sqlMetadataManagerV2) ListNamespaces(request *persistence.ListNamespacesRequest) (*persistence.InternalListNamespacesResponse, error) {
func (m *sqlMetadataManagerV2) ListNamespaces(request *persistence.InternalListNamespacesRequest) (*persistence.InternalListNamespacesResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
var pageToken *primitives.UUID
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Expand Up @@ -39,7 +39,7 @@ require (
go.opentelemetry.io/otel/sdk v1.4.0
go.opentelemetry.io/otel/sdk/export/metric v0.27.0
go.opentelemetry.io/otel/sdk/metric v0.27.0
go.temporal.io/api v1.7.1-0.20220324004000-817724af565a
go.temporal.io/api v1.7.1-0.20220325233305-d1f0ee499b92
go.temporal.io/sdk v1.14.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand All @@ -56,6 +56,8 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

replace go.temporal.io/api v1.7.1-0.20220324004000-817724af565a => ../temporal-api-go

require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/compute v1.2.0 // indirect
Expand Down Expand Up @@ -100,12 +102,12 @@ require (
go.opentelemetry.io/otel/trace v1.4.0 // indirect
go.uber.org/dig v1.13.0 // indirect
golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect
golang.org/x/net v0.0.0-20220325170049-de3da57026de // indirect
golang.org/x/sys v0.0.0-20220325203850-36772127a21f // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220323144105-ec3c684e5b14 // indirect
google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)

0 comments on commit 752ff50

Please sign in to comment.