Skip to content

Commit

Permalink
Rename namespace persistence API (#2595)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Mar 10, 2022
1 parent 7d1d1c1 commit f994ea5
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 4 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Expand Up @@ -264,6 +264,8 @@ const (
PersistenceUpdateNamespaceScope
// PersistenceDeleteNamespaceScope tracks DeleteNamespace calls made by service to persistence layer
PersistenceDeleteNamespaceScope
// PersistenceRenameNamespaceScope tracks RenameNamespace calls made by service to persistence layer
PersistenceRenameNamespaceScope
// PersistenceDeleteNamespaceByNameScope tracks DeleteNamespaceByName calls made by service to persistence layer
PersistenceDeleteNamespaceByNameScope
// PersistenceListNamespaceScope tracks DeleteNamespaceByName calls made by service to persistence layer
Expand Down Expand Up @@ -1261,6 +1263,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetNamespaceScope: {operation: "GetNamespace"},
PersistenceUpdateNamespaceScope: {operation: "UpdateNamespace"},
PersistenceDeleteNamespaceScope: {operation: "DeleteNamespace"},
PersistenceRenameNamespaceScope: {operation: "RenameNamespace"},
PersistenceDeleteNamespaceByNameScope: {operation: "DeleteNamespaceByName"},
PersistenceListNamespaceScope: {operation: "ListNamespace"},
PersistenceGetMetadataScope: {operation: "GetMetadata"},
Expand Down
54 changes: 54 additions & 0 deletions common/persistence/cassandra/metadata_store.go
Expand Up @@ -88,6 +88,10 @@ const (
templateNamespaceColumns +
` FROM namespaces ` +
`WHERE namespaces_partition = ? `

templateUpdateNamespaceByIdQuery = `UPDATE namespaces_by_id ` +
`SET name = ? ` +
`WHERE id = ?`
)

type (
Expand Down Expand Up @@ -198,6 +202,56 @@ func (m *MetadataStore) UpdateNamespace(request *p.InternalUpdateNamespaceReques
return nil
}

// RenameNamespace should be used with caution.
// Not every namespace can be renamed because namespace name are stored in the database.
// It may leave database in inconsistent state and must be retried until success.
// Step 1. Update row in `namespaces_by_id` table with the new name.
// Step 2. Batch of:
// Insert row into `namespaces` table with new name and new `notification_version`.
// Delete row from `namespaces` table with old name.
// Update `notification_version` in metadata row.
//
// NOTE: `namespaces_by_id` is currently used only for `DescribeNamespace` API and namespace Id collision check.
func (m *MetadataStore) RenameNamespace(request *p.InternalRenameNamespaceRequest) error {
// Step 1.
if updateErr := m.session.Query(templateUpdateNamespaceByIdQuery,
request.Name,
request.Id,
).Exec(); updateErr != nil {
return serviceerror.NewUnavailable(fmt.Sprintf("RenameNamespace operation failed to update 'namespaces_by_id' table. Error: %v", updateErr))
}

// Step 2.
batch := m.session.NewBatch(gocql.LoggedBatch)
batch.Query(templateCreateNamespaceByNameQueryWithinBatchV2,
constNamespacePartition,
request.Id,
request.Name,
request.Namespace.Data,
request.Namespace.EncodingType.String(),
request.NotificationVersion,
request.IsGlobal,
)
batch.Query(templateDeleteNamespaceByNameQueryV2,
constNamespacePartition,
request.PreviousName,
)
m.updateMetadataBatch(batch, request.NotificationVersion)

previous := make(map[string]interface{})
applied, iter, err := m.session.MapExecuteBatchCAS(batch, previous)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf("RenameNamespace operation failed. Error: %v", err))
}
defer func() { _ = iter.Close() }()

if !applied {
return serviceerror.NewUnavailable(fmt.Sprintf("RenameNamespace operation failed because of conditional failure."))
}

return nil
}

func (m *MetadataStore) GetNamespace(request *p.GetNamespaceRequest) (*p.InternalGetNamespaceResponse, error) {
var query gocql.Query
var err error
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/client/fault_injection.go
Expand Up @@ -720,6 +720,13 @@ func (m *FaultInjectionMetadataStore) UpdateNamespace(request *persistence.Inter
return m.baseMetadataStore.UpdateNamespace(request)
}

func (m *FaultInjectionMetadataStore) RenameNamespace(request *persistence.InternalRenameNamespaceRequest) error {
if err := m.ErrorGenerator.Generate(); err != nil {
return err
}
return m.baseMetadataStore.RenameNamespace(request)
}

func (m *FaultInjectionMetadataStore) DeleteNamespace(request *persistence.DeleteNamespaceRequest) error {
if err := m.ErrorGenerator.Generate(); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/dataInterfaces.go
Expand Up @@ -623,6 +623,12 @@ type (
NotificationVersion int64
}

// RenameNamespaceRequest is used to rename namespace.
RenameNamespaceRequest struct {
PreviousName string
NewName string
}

// DeleteNamespaceRequest is used to delete namespace entry from namespaces table
DeleteNamespaceRequest struct {
ID string
Expand Down Expand Up @@ -1065,6 +1071,7 @@ type (
CreateNamespace(request *CreateNamespaceRequest) (*CreateNamespaceResponse, error)
GetNamespace(request *GetNamespaceRequest) (*GetNamespaceResponse, error)
UpdateNamespace(request *UpdateNamespaceRequest) error
RenameNamespace(request *RenameNamespaceRequest) error
DeleteNamespace(request *DeleteNamespaceRequest) error
DeleteNamespaceByName(request *DeleteNamespaceByNameRequest) error
ListNamespaces(request *ListNamespacesRequest) (*ListNamespacesResponse, error)
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/dataInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion common/persistence/metadata_manager.go
Expand Up @@ -49,7 +49,7 @@ type (

var _ MetadataManager = (*metadataManagerImpl)(nil)

//NewMetadataManagerImpl returns new MetadataManager
// NewMetadataManagerImpl returns new MetadataManager
func NewMetadataManagerImpl(
persistence MetadataStore,
serializer serialization.Serializer,
Expand Down Expand Up @@ -105,6 +105,41 @@ func (m *metadataManagerImpl) UpdateNamespace(request *UpdateNamespaceRequest) e
})
}

func (m *metadataManagerImpl) RenameNamespace(request *RenameNamespaceRequest) error {
ns, err := m.GetNamespace(&GetNamespaceRequest{
Name: request.PreviousName,
})
if err != nil {
return err
}

metadata, err := m.GetMetadata()
if err != nil {
return err
}

previousName := ns.Namespace.Info.Name
ns.Namespace.Info.Name = request.NewName

nsDataBlob, err := m.serializer.NamespaceDetailToBlob(ns.Namespace, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return err
}

renameRequest := &InternalRenameNamespaceRequest{
InternalUpdateNamespaceRequest: &InternalUpdateNamespaceRequest{
Id: ns.Namespace.Info.Id,
Name: ns.Namespace.Info.Name,
Namespace: nsDataBlob,
NotificationVersion: metadata.NotificationVersion,
IsGlobal: ns.IsGlobalNamespace,
},
PreviousName: previousName,
}

return m.persistence.RenameNamespace(renameRequest)
}

func (m *metadataManagerImpl) DeleteNamespace(request *DeleteNamespaceRequest) error {
return m.persistence.DeleteNamespace(request)
}
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/mock/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions common/persistence/persistence-tests/metadataPersistenceV2Test.go
Expand Up @@ -777,6 +777,92 @@ func (m *MetadataPersistenceSuiteV2) TestUpdateNamespace() {
m.EqualTimes(time.Unix(0, 0).UTC(), *resp6.Namespace.FailoverEndTime)
}

func (m *MetadataPersistenceSuiteV2) TestRenameNamespace() {
id := uuid.New()
name := "rename-namespace-test-name"
newName := "rename-namespace-test-new-name"
newNewName := "rename-namespace-test-new-new-name"
state := enumspb.NAMESPACE_STATE_REGISTERED
description := "rename-namespace-test-description"
owner := "rename-namespace-test-owner"
data := map[string]string{"k1": "v1"}
retention := int32(10)
historyArchivalState := enumspb.ARCHIVAL_STATE_ENABLED
historyArchivalURI := "test://history/uri"
visibilityArchivalState := enumspb.ARCHIVAL_STATE_ENABLED
visibilityArchivalURI := "test://visibility/uri"

clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(10)
failoverVersion := int64(59)
isGlobalNamespace := true
clusters := []string{clusterActive, clusterStandby}

resp1, err1 := m.CreateNamespace(
&persistencespb.NamespaceInfo{
Id: id,
Name: name,
State: state,
Description: description,
Owner: owner,
Data: data,
},
&persistencespb.NamespaceConfig{
Retention: timestamp.DurationFromDays(retention),
HistoryArchivalState: historyArchivalState,
HistoryArchivalUri: historyArchivalURI,
VisibilityArchivalState: visibilityArchivalState,
VisibilityArchivalUri: visibilityArchivalURI,
},
&persistencespb.NamespaceReplicationConfig{
ActiveClusterName: clusterActive,
Clusters: clusters,
},
isGlobalNamespace,
configVersion,
failoverVersion,
)
m.NoError(err1)
m.EqualValues(id, resp1.ID)

_, err2 := m.GetNamespace(id, "")
m.NoError(err2)

err3 := m.MetadataManager.RenameNamespace(&p.RenameNamespaceRequest{
PreviousName: name,
NewName: newName,
})
m.NoError(err3)

resp4, err4 := m.GetNamespace("", newName)
m.NoError(err4)
m.NotNil(resp4)
m.EqualValues(id, resp4.Namespace.Info.Id)
m.Equal(newName, resp4.Namespace.Info.Name)
m.Equal(isGlobalNamespace, resp4.IsGlobalNamespace)

resp5, err5 := m.GetNamespace(id, "")
m.NoError(err5)
m.NotNil(resp5)
m.EqualValues(id, resp5.Namespace.Info.Id)
m.Equal(newName, resp5.Namespace.Info.Name)
m.Equal(isGlobalNamespace, resp5.IsGlobalNamespace)

err6 := m.MetadataManager.RenameNamespace(&p.RenameNamespaceRequest{
PreviousName: newName,
NewName: newNewName,
})
m.NoError(err6)

resp6, err6 := m.GetNamespace(id, "")
m.NoError(err6)
m.NotNil(resp6)
m.EqualValues(id, resp6.Namespace.Info.Id)
m.Equal(newNewName, resp6.Namespace.Info.Name)
m.Equal(isGlobalNamespace, resp6.IsGlobalNamespace)
}

// TestDeleteNamespace test
func (m *MetadataPersistenceSuiteV2) TestDeleteNamespace() {
id := uuid.New()
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/persistenceInterface.go
Expand Up @@ -81,6 +81,7 @@ type (
CreateNamespace(request *InternalCreateNamespaceRequest) (*CreateNamespaceResponse, error)
GetNamespace(request *GetNamespaceRequest) (*InternalGetNamespaceResponse, error)
UpdateNamespace(request *InternalUpdateNamespaceRequest) error
RenameNamespace(request *InternalRenameNamespaceRequest) error
DeleteNamespace(request *DeleteNamespaceRequest) error
DeleteNamespaceByName(request *DeleteNamespaceByNameRequest) error
ListNamespaces(request *ListNamespacesRequest) (*InternalListNamespacesResponse, error)
Expand Down Expand Up @@ -625,6 +626,11 @@ type (
IsGlobal bool
}

InternalRenameNamespaceRequest struct {
*InternalUpdateNamespaceRequest
PreviousName string
}

// InternalListNamespacesResponse is the response for GetNamespace
InternalListNamespacesResponse struct {
Namespaces []*InternalGetNamespaceResponse
Expand Down
18 changes: 16 additions & 2 deletions common/persistence/persistenceMetricClients.go
Expand Up @@ -677,6 +677,20 @@ func (p *metadataPersistenceClient) UpdateNamespace(request *UpdateNamespaceRequ
return err
}

func (p *metadataPersistenceClient) RenameNamespace(request *RenameNamespaceRequest) error {
p.metricClient.IncCounter(metrics.PersistenceRenameNamespaceScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceRenameNamespaceScope, metrics.PersistenceLatency)
err := p.persistence.RenameNamespace(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceRenameNamespaceScope, err)
}

return err
}

func (p *metadataPersistenceClient) DeleteNamespace(request *DeleteNamespaceRequest) error {
p.metricClient.IncCounter(metrics.PersistenceDeleteNamespaceScope, metrics.PersistenceRequests)

Expand Down Expand Up @@ -1025,7 +1039,7 @@ func (c *clusterMetadataPersistenceClient) Close() {
}

func (c *clusterMetadataPersistenceClient) ListClusterMetadata(request *ListClusterMetadataRequest) (*ListClusterMetadataResponse, error) {
//This is a wrapper of GetClusterMetadata API, use the same scope here
// This is a wrapper of GetClusterMetadata API, use the same scope here
c.metricClient.IncCounter(metrics.PersistenceListClusterMetadataScope, metrics.PersistenceRequests)

sw := c.metricClient.StartTimer(metrics.PersistenceListClusterMetadataScope, metrics.PersistenceLatency)
Expand All @@ -1040,7 +1054,7 @@ func (c *clusterMetadataPersistenceClient) ListClusterMetadata(request *ListClus
}

func (c *clusterMetadataPersistenceClient) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) {
//This is a wrapper of GetClusterMetadata API, use the same scope here
// This is a wrapper of GetClusterMetadata API, use the same scope here
c.metricClient.IncCounter(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceRequests)

sw := c.metricClient.StartTimer(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceLatency)
Expand Down
9 changes: 9 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Expand Up @@ -445,6 +445,15 @@ func (p *metadataRateLimitedPersistenceClient) UpdateNamespace(request *UpdateNa
return err
}

func (p *metadataRateLimitedPersistenceClient) RenameNamespace(request *RenameNamespaceRequest) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
}

err := p.persistence.RenameNamespace(request)
return err
}

func (p *metadataRateLimitedPersistenceClient) DeleteNamespace(request *DeleteNamespaceRequest) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
Expand Down

0 comments on commit f994ea5

Please sign in to comment.