From f994ea5e60c51be29d7e47d72a7e580fb7778ffe Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 10 Mar 2022 09:40:14 -0800 Subject: [PATCH] Rename namespace persistence API (#2595) --- common/metrics/defs.go | 3 + .../persistence/cassandra/metadata_store.go | 54 ++++++++++++ common/persistence/client/fault_injection.go | 7 ++ common/persistence/dataInterfaces.go | 7 ++ common/persistence/dataInterfaces_mock.go | 14 +++ common/persistence/metadata_manager.go | 37 +++++++- common/persistence/mock/store_mock.go | 14 +++ .../metadataPersistenceV2Test.go | 86 +++++++++++++++++++ common/persistence/persistenceInterface.go | 6 ++ .../persistence/persistenceMetricClients.go | 18 +++- .../persistenceRateLimitedClients.go | 9 ++ common/persistence/sql/metadata.go | 10 ++- 12 files changed, 261 insertions(+), 4 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 55cbb6bd552..37327f95dfb 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -264,6 +264,8 @@ const ( PersistenceUpdateNamespaceScope // PersistenceDeleteNamespaceScope tracks DeleteNamespace calls made by service to persistence layer PersistenceDeleteNamespaceScope + // PersistenceRenameNamespaceScope tracks RenameNamespace calls made by service to persistence layer + PersistenceRenameNamespaceScope // PersistenceDeleteNamespaceByNameScope tracks DeleteNamespaceByName calls made by service to persistence layer PersistenceDeleteNamespaceByNameScope // PersistenceListNamespaceScope tracks DeleteNamespaceByName calls made by service to persistence layer @@ -1261,6 +1263,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceGetNamespaceScope: {operation: "GetNamespace"}, PersistenceUpdateNamespaceScope: {operation: "UpdateNamespace"}, PersistenceDeleteNamespaceScope: {operation: "DeleteNamespace"}, + PersistenceRenameNamespaceScope: {operation: "RenameNamespace"}, PersistenceDeleteNamespaceByNameScope: {operation: "DeleteNamespaceByName"}, PersistenceListNamespaceScope: {operation: "ListNamespace"}, PersistenceGetMetadataScope: {operation: "GetMetadata"}, diff --git a/common/persistence/cassandra/metadata_store.go b/common/persistence/cassandra/metadata_store.go index 71d1bce71ed..2d1716ff4cb 100644 --- a/common/persistence/cassandra/metadata_store.go +++ b/common/persistence/cassandra/metadata_store.go @@ -88,6 +88,10 @@ const ( templateNamespaceColumns + ` FROM namespaces ` + `WHERE namespaces_partition = ? ` + + templateUpdateNamespaceByIdQuery = `UPDATE namespaces_by_id ` + + `SET name = ? ` + + `WHERE id = ?` ) type ( @@ -198,6 +202,56 @@ func (m *MetadataStore) UpdateNamespace(request *p.InternalUpdateNamespaceReques return nil } +// RenameNamespace should be used with caution. +// Not every namespace can be renamed because namespace name are stored in the database. +// It may leave database in inconsistent state and must be retried until success. +// Step 1. Update row in `namespaces_by_id` table with the new name. +// Step 2. Batch of: +// Insert row into `namespaces` table with new name and new `notification_version`. +// Delete row from `namespaces` table with old name. +// Update `notification_version` in metadata row. +// +// NOTE: `namespaces_by_id` is currently used only for `DescribeNamespace` API and namespace Id collision check. +func (m *MetadataStore) RenameNamespace(request *p.InternalRenameNamespaceRequest) error { + // Step 1. + if updateErr := m.session.Query(templateUpdateNamespaceByIdQuery, + request.Name, + request.Id, + ).Exec(); updateErr != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("RenameNamespace operation failed to update 'namespaces_by_id' table. Error: %v", updateErr)) + } + + // Step 2. + batch := m.session.NewBatch(gocql.LoggedBatch) + batch.Query(templateCreateNamespaceByNameQueryWithinBatchV2, + constNamespacePartition, + request.Id, + request.Name, + request.Namespace.Data, + request.Namespace.EncodingType.String(), + request.NotificationVersion, + request.IsGlobal, + ) + batch.Query(templateDeleteNamespaceByNameQueryV2, + constNamespacePartition, + request.PreviousName, + ) + m.updateMetadataBatch(batch, request.NotificationVersion) + + previous := make(map[string]interface{}) + applied, iter, err := m.session.MapExecuteBatchCAS(batch, previous) + if err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("RenameNamespace operation failed. Error: %v", err)) + } + defer func() { _ = iter.Close() }() + + if !applied { + return serviceerror.NewUnavailable(fmt.Sprintf("RenameNamespace operation failed because of conditional failure.")) + } + + return nil +} + func (m *MetadataStore) GetNamespace(request *p.GetNamespaceRequest) (*p.InternalGetNamespaceResponse, error) { var query gocql.Query var err error diff --git a/common/persistence/client/fault_injection.go b/common/persistence/client/fault_injection.go index 51af13804cc..75184a729b6 100644 --- a/common/persistence/client/fault_injection.go +++ b/common/persistence/client/fault_injection.go @@ -720,6 +720,13 @@ func (m *FaultInjectionMetadataStore) UpdateNamespace(request *persistence.Inter return m.baseMetadataStore.UpdateNamespace(request) } +func (m *FaultInjectionMetadataStore) RenameNamespace(request *persistence.InternalRenameNamespaceRequest) error { + if err := m.ErrorGenerator.Generate(); err != nil { + return err + } + return m.baseMetadataStore.RenameNamespace(request) +} + func (m *FaultInjectionMetadataStore) DeleteNamespace(request *persistence.DeleteNamespaceRequest) error { if err := m.ErrorGenerator.Generate(); err != nil { return err diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 196ceae71c6..10e5625bb02 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -623,6 +623,12 @@ type ( NotificationVersion int64 } + // RenameNamespaceRequest is used to rename namespace. + RenameNamespaceRequest struct { + PreviousName string + NewName string + } + // DeleteNamespaceRequest is used to delete namespace entry from namespaces table DeleteNamespaceRequest struct { ID string @@ -1065,6 +1071,7 @@ type ( CreateNamespace(request *CreateNamespaceRequest) (*CreateNamespaceResponse, error) GetNamespace(request *GetNamespaceRequest) (*GetNamespaceResponse, error) UpdateNamespace(request *UpdateNamespaceRequest) error + RenameNamespace(request *RenameNamespaceRequest) error DeleteNamespace(request *DeleteNamespaceRequest) error DeleteNamespaceByName(request *DeleteNamespaceByNameRequest) error ListNamespaces(request *ListNamespacesRequest) (*ListNamespacesResponse, error) diff --git a/common/persistence/dataInterfaces_mock.go b/common/persistence/dataInterfaces_mock.go index b7a62a534c4..38876b1e848 100644 --- a/common/persistence/dataInterfaces_mock.go +++ b/common/persistence/dataInterfaces_mock.go @@ -940,6 +940,20 @@ func (mr *MockMetadataManagerMockRecorder) ListNamespaces(request interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListNamespaces", reflect.TypeOf((*MockMetadataManager)(nil).ListNamespaces), request) } +// RenameNamespace mocks base method. +func (m *MockMetadataManager) RenameNamespace(request *RenameNamespaceRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RenameNamespace", request) + ret0, _ := ret[0].(error) + return ret0 +} + +// RenameNamespace indicates an expected call of RenameNamespace. +func (mr *MockMetadataManagerMockRecorder) RenameNamespace(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RenameNamespace", reflect.TypeOf((*MockMetadataManager)(nil).RenameNamespace), request) +} + // UpdateNamespace mocks base method. func (m *MockMetadataManager) UpdateNamespace(request *UpdateNamespaceRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/metadata_manager.go b/common/persistence/metadata_manager.go index 85bdb460043..dc831ac9d1b 100644 --- a/common/persistence/metadata_manager.go +++ b/common/persistence/metadata_manager.go @@ -49,7 +49,7 @@ type ( var _ MetadataManager = (*metadataManagerImpl)(nil) -//NewMetadataManagerImpl returns new MetadataManager +// NewMetadataManagerImpl returns new MetadataManager func NewMetadataManagerImpl( persistence MetadataStore, serializer serialization.Serializer, @@ -105,6 +105,41 @@ func (m *metadataManagerImpl) UpdateNamespace(request *UpdateNamespaceRequest) e }) } +func (m *metadataManagerImpl) RenameNamespace(request *RenameNamespaceRequest) error { + ns, err := m.GetNamespace(&GetNamespaceRequest{ + Name: request.PreviousName, + }) + if err != nil { + return err + } + + metadata, err := m.GetMetadata() + if err != nil { + return err + } + + previousName := ns.Namespace.Info.Name + ns.Namespace.Info.Name = request.NewName + + nsDataBlob, err := m.serializer.NamespaceDetailToBlob(ns.Namespace, enumspb.ENCODING_TYPE_PROTO3) + if err != nil { + return err + } + + renameRequest := &InternalRenameNamespaceRequest{ + InternalUpdateNamespaceRequest: &InternalUpdateNamespaceRequest{ + Id: ns.Namespace.Info.Id, + Name: ns.Namespace.Info.Name, + Namespace: nsDataBlob, + NotificationVersion: metadata.NotificationVersion, + IsGlobal: ns.IsGlobalNamespace, + }, + PreviousName: previousName, + } + + return m.persistence.RenameNamespace(renameRequest) +} + func (m *metadataManagerImpl) DeleteNamespace(request *DeleteNamespaceRequest) error { return m.persistence.DeleteNamespace(request) } diff --git a/common/persistence/mock/store_mock.go b/common/persistence/mock/store_mock.go index c085555df8e..40518b35d50 100644 --- a/common/persistence/mock/store_mock.go +++ b/common/persistence/mock/store_mock.go @@ -446,6 +446,20 @@ func (mr *MockMetadataStoreMockRecorder) ListNamespaces(request interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListNamespaces", reflect.TypeOf((*MockMetadataStore)(nil).ListNamespaces), request) } +// RenameNamespace mocks base method. +func (m *MockMetadataStore) RenameNamespace(request *persistence.InternalRenameNamespaceRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RenameNamespace", request) + ret0, _ := ret[0].(error) + return ret0 +} + +// RenameNamespace indicates an expected call of RenameNamespace. +func (mr *MockMetadataStoreMockRecorder) RenameNamespace(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RenameNamespace", reflect.TypeOf((*MockMetadataStore)(nil).RenameNamespace), request) +} + // UpdateNamespace mocks base method. func (m *MockMetadataStore) UpdateNamespace(request *persistence.InternalUpdateNamespaceRequest) error { m.ctrl.T.Helper() diff --git a/common/persistence/persistence-tests/metadataPersistenceV2Test.go b/common/persistence/persistence-tests/metadataPersistenceV2Test.go index 404247e4049..e816b8a73ab 100644 --- a/common/persistence/persistence-tests/metadataPersistenceV2Test.go +++ b/common/persistence/persistence-tests/metadataPersistenceV2Test.go @@ -777,6 +777,92 @@ func (m *MetadataPersistenceSuiteV2) TestUpdateNamespace() { m.EqualTimes(time.Unix(0, 0).UTC(), *resp6.Namespace.FailoverEndTime) } +func (m *MetadataPersistenceSuiteV2) TestRenameNamespace() { + id := uuid.New() + name := "rename-namespace-test-name" + newName := "rename-namespace-test-new-name" + newNewName := "rename-namespace-test-new-new-name" + state := enumspb.NAMESPACE_STATE_REGISTERED + description := "rename-namespace-test-description" + owner := "rename-namespace-test-owner" + data := map[string]string{"k1": "v1"} + retention := int32(10) + historyArchivalState := enumspb.ARCHIVAL_STATE_ENABLED + historyArchivalURI := "test://history/uri" + visibilityArchivalState := enumspb.ARCHIVAL_STATE_ENABLED + visibilityArchivalURI := "test://visibility/uri" + + clusterActive := "some random active cluster name" + clusterStandby := "some random standby cluster name" + configVersion := int64(10) + failoverVersion := int64(59) + isGlobalNamespace := true + clusters := []string{clusterActive, clusterStandby} + + resp1, err1 := m.CreateNamespace( + &persistencespb.NamespaceInfo{ + Id: id, + Name: name, + State: state, + Description: description, + Owner: owner, + Data: data, + }, + &persistencespb.NamespaceConfig{ + Retention: timestamp.DurationFromDays(retention), + HistoryArchivalState: historyArchivalState, + HistoryArchivalUri: historyArchivalURI, + VisibilityArchivalState: visibilityArchivalState, + VisibilityArchivalUri: visibilityArchivalURI, + }, + &persistencespb.NamespaceReplicationConfig{ + ActiveClusterName: clusterActive, + Clusters: clusters, + }, + isGlobalNamespace, + configVersion, + failoverVersion, + ) + m.NoError(err1) + m.EqualValues(id, resp1.ID) + + _, err2 := m.GetNamespace(id, "") + m.NoError(err2) + + err3 := m.MetadataManager.RenameNamespace(&p.RenameNamespaceRequest{ + PreviousName: name, + NewName: newName, + }) + m.NoError(err3) + + resp4, err4 := m.GetNamespace("", newName) + m.NoError(err4) + m.NotNil(resp4) + m.EqualValues(id, resp4.Namespace.Info.Id) + m.Equal(newName, resp4.Namespace.Info.Name) + m.Equal(isGlobalNamespace, resp4.IsGlobalNamespace) + + resp5, err5 := m.GetNamespace(id, "") + m.NoError(err5) + m.NotNil(resp5) + m.EqualValues(id, resp5.Namespace.Info.Id) + m.Equal(newName, resp5.Namespace.Info.Name) + m.Equal(isGlobalNamespace, resp5.IsGlobalNamespace) + + err6 := m.MetadataManager.RenameNamespace(&p.RenameNamespaceRequest{ + PreviousName: newName, + NewName: newNewName, + }) + m.NoError(err6) + + resp6, err6 := m.GetNamespace(id, "") + m.NoError(err6) + m.NotNil(resp6) + m.EqualValues(id, resp6.Namespace.Info.Id) + m.Equal(newNewName, resp6.Namespace.Info.Name) + m.Equal(isGlobalNamespace, resp6.IsGlobalNamespace) +} + // TestDeleteNamespace test func (m *MetadataPersistenceSuiteV2) TestDeleteNamespace() { id := uuid.New() diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 6f989709247..45f9a38c14e 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -81,6 +81,7 @@ type ( CreateNamespace(request *InternalCreateNamespaceRequest) (*CreateNamespaceResponse, error) GetNamespace(request *GetNamespaceRequest) (*InternalGetNamespaceResponse, error) UpdateNamespace(request *InternalUpdateNamespaceRequest) error + RenameNamespace(request *InternalRenameNamespaceRequest) error DeleteNamespace(request *DeleteNamespaceRequest) error DeleteNamespaceByName(request *DeleteNamespaceByNameRequest) error ListNamespaces(request *ListNamespacesRequest) (*InternalListNamespacesResponse, error) @@ -625,6 +626,11 @@ type ( IsGlobal bool } + InternalRenameNamespaceRequest struct { + *InternalUpdateNamespaceRequest + PreviousName string + } + // InternalListNamespacesResponse is the response for GetNamespace InternalListNamespacesResponse struct { Namespaces []*InternalGetNamespaceResponse diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 99ec6c84a9f..f1e67e6b858 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -677,6 +677,20 @@ func (p *metadataPersistenceClient) UpdateNamespace(request *UpdateNamespaceRequ return err } +func (p *metadataPersistenceClient) RenameNamespace(request *RenameNamespaceRequest) error { + p.metricClient.IncCounter(metrics.PersistenceRenameNamespaceScope, metrics.PersistenceRequests) + + sw := p.metricClient.StartTimer(metrics.PersistenceRenameNamespaceScope, metrics.PersistenceLatency) + err := p.persistence.RenameNamespace(request) + sw.Stop() + + if err != nil { + p.updateErrorMetric(metrics.PersistenceRenameNamespaceScope, err) + } + + return err +} + func (p *metadataPersistenceClient) DeleteNamespace(request *DeleteNamespaceRequest) error { p.metricClient.IncCounter(metrics.PersistenceDeleteNamespaceScope, metrics.PersistenceRequests) @@ -1025,7 +1039,7 @@ func (c *clusterMetadataPersistenceClient) Close() { } func (c *clusterMetadataPersistenceClient) ListClusterMetadata(request *ListClusterMetadataRequest) (*ListClusterMetadataResponse, error) { - //This is a wrapper of GetClusterMetadata API, use the same scope here + // This is a wrapper of GetClusterMetadata API, use the same scope here c.metricClient.IncCounter(metrics.PersistenceListClusterMetadataScope, metrics.PersistenceRequests) sw := c.metricClient.StartTimer(metrics.PersistenceListClusterMetadataScope, metrics.PersistenceLatency) @@ -1040,7 +1054,7 @@ func (c *clusterMetadataPersistenceClient) ListClusterMetadata(request *ListClus } func (c *clusterMetadataPersistenceClient) GetCurrentClusterMetadata() (*GetClusterMetadataResponse, error) { - //This is a wrapper of GetClusterMetadata API, use the same scope here + // This is a wrapper of GetClusterMetadata API, use the same scope here c.metricClient.IncCounter(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceRequests) sw := c.metricClient.StartTimer(metrics.PersistenceGetClusterMetadataScope, metrics.PersistenceLatency) diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index 2207baa4723..daf70a8d97e 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -445,6 +445,15 @@ func (p *metadataRateLimitedPersistenceClient) UpdateNamespace(request *UpdateNa return err } +func (p *metadataRateLimitedPersistenceClient) RenameNamespace(request *RenameNamespaceRequest) error { + if ok := p.rateLimiter.Allow(); !ok { + return ErrPersistenceLimitExceeded + } + + err := p.persistence.RenameNamespace(request) + return err +} + func (p *metadataRateLimitedPersistenceClient) DeleteNamespace(request *DeleteNamespaceRequest) error { if ok := p.rateLimiter.Allow(); !ok { return ErrPersistenceLimitExceeded diff --git a/common/persistence/sql/metadata.go b/common/persistence/sql/metadata.go index e31cb5725f0..163e3c6ebb9 100644 --- a/common/persistence/sql/metadata.go +++ b/common/persistence/sql/metadata.go @@ -149,6 +149,14 @@ func (m *sqlMetadataManagerV2) namespaceRowToGetNamespaceResponse(row *sqlplugin } func (m *sqlMetadataManagerV2) UpdateNamespace(request *persistence.InternalUpdateNamespaceRequest) error { + return m.updateNamespace(request, "UpdateNamespace") +} + +func (m *sqlMetadataManagerV2) RenameNamespace(request *persistence.InternalRenameNamespaceRequest) error { + return m.updateNamespace(request.InternalUpdateNamespaceRequest, "RenameNamespace") +} + +func (m *sqlMetadataManagerV2) updateNamespace(request *persistence.InternalUpdateNamespaceRequest, operationName string) error { ctx, cancel := newExecutionContext() defer cancel() idBytes, err := primitives.ParseUUID(request.Id) @@ -156,7 +164,7 @@ func (m *sqlMetadataManagerV2) UpdateNamespace(request *persistence.InternalUpda return err } - return m.txExecute(ctx, "UpdateNamespace", func(tx sqlplugin.Tx) error { + return m.txExecute(ctx, operationName, func(tx sqlplugin.Tx) error { metadata, err := lockMetadata(ctx, tx) if err != nil { return err