Skip to content

Commit

Permalink
Delete namespace workflow (#2569)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Mar 23, 2022
1 parent e60d6bd commit b6de7d5
Show file tree
Hide file tree
Showing 17 changed files with 2,014 additions and 7 deletions.
18 changes: 13 additions & 5 deletions common/log/tag/tags.go
Expand Up @@ -50,7 +50,7 @@ const (
errorPrefix = "*"
)

/////////////////// Common tags defined here ///////////////////
// ========== Common tags defined here ==========

// Operation returns tag for Operation
func Operation(operation string) ZapTag {
Expand Down Expand Up @@ -87,7 +87,7 @@ func TimestampPtr(t *time.Time) ZapTag {
return NewTimeTag("timestamp", timestamp.TimeValue(t))
}

/////////////////// Workflow tags defined here: ( wf is short for workflow) ///////////////////
// ========== Workflow tags defined here: ( wf is short for workflow) ==========

// WorkflowAction returns tag for WorkflowAction
func workflowAction(action string) ZapTag {
Expand Down Expand Up @@ -326,7 +326,7 @@ func WorkflowEventCount(eventCount int) ZapTag {
return NewInt("wf-event-count", eventCount)
}

/////////////////// System tags defined here: ///////////////////
// ========== System tags defined here: ==========
// Tags with pre-define values

// Component returns tag for Component
Expand Down Expand Up @@ -672,7 +672,7 @@ func TokenLastEventID(id int64) ZapTag {
return NewInt64("token-last-event-id", id)
}

/////////////////// XDC tags defined here: xdc- ///////////////////
// ========== XDC tags defined here: xdc- ==========

// SourceCluster returns tag for SourceCluster
func SourceCluster(sourceCluster string) ZapTag {
Expand Down Expand Up @@ -719,7 +719,7 @@ func TokenLastEventVersion(version int64) ZapTag {
return NewInt64("xdc-token-last-event-version", version)
}

/////////////////// Archival tags defined here: archival- ///////////////////
// ========== Archival tags defined here: archival- ==========
// archival request tags

// ArchivalCallerServiceName returns tag for the service name calling archival client
Expand Down Expand Up @@ -884,3 +884,11 @@ func TLSCertFiles(filePaths []string) ZapTag {
func Timeout(timeoutValue string) ZapTag {
return NewStringTag("timeout", timeoutValue)
}

func DeletedExecutionsCount(count int) ZapTag {
return NewInt("deleted-executions-count", count)
}

func DeletedExecutionsErrorCount(count int) ZapTag {
return NewInt("delete-executions-error-count", count)
}
31 changes: 31 additions & 0 deletions common/metrics/defs.go
Expand Up @@ -1190,6 +1190,10 @@ const (
// MigrationWorkflowScope is scope used by metrics emitted by migration related workflows
MigrationWorkflowScope

DeleteNamespaceWorkflowScope
ReclaimResourcesWorkflowScope
DeleteExecutionsWorkflowScope

NumWorkerScopes
)

Expand Down Expand Up @@ -1730,6 +1734,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ParentClosePolicyProcessorScope: {operation: "ParentClosePolicyProcessor"},
AddSearchAttributesWorkflowScope: {operation: "AddSearchAttributesWorkflow"},
MigrationWorkflowScope: {operation: "MigrationWorkflow"},
DeleteNamespaceWorkflowScope: {operation: "DeleteNamespaceWorkflow"},
ReclaimResourcesWorkflowScope: {operation: "ReclaimResourcesWorkflow"},
DeleteExecutionsWorkflowScope: {operation: "DeleteExecutionsWorkflow"},
},
Server: {
ServerTlsScope: {operation: "ServerTls"},
Expand Down Expand Up @@ -2193,6 +2200,18 @@ const (
CatchUpReadyShardCountGauge
HandoverReadyShardCountGauge

DeleteNamespaceSuccessCount
RenameNamespaceSuccessCount
DeleteExecutionsSuccessCount
DeleteNamespaceFailuresCount
UpdateNamespaceFailuresCount
RenameNamespaceFailuresCount
ReadNamespaceFailuresCount
ListExecutionsFailuresCount
TerminateExecutionFailuresCount
DeleteExecutionFailuresCount
RateLimiterFailuresCount

NumWorkerMetrics
)

Expand Down Expand Up @@ -2646,6 +2665,18 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
AddSearchAttributesFailuresCount: NewCounterDef("add_search_attributes_failures"),
CatchUpReadyShardCountGauge: NewGaugeDef("catchup_ready_shard_count"),
HandoverReadyShardCountGauge: NewGaugeDef("handover_ready_shard_count"),

DeleteNamespaceSuccessCount: NewCounterDef("delete_namespace_success"),
RenameNamespaceSuccessCount: NewCounterDef("rename_namespace_success"),
DeleteExecutionsSuccessCount: NewCounterDef("delete_executions_success"),
DeleteNamespaceFailuresCount: NewCounterDef("delete_namespace_failures"),
UpdateNamespaceFailuresCount: NewCounterDef("update_namespace_failures"),
RenameNamespaceFailuresCount: NewCounterDef("rename_namespace_failures"),
ReadNamespaceFailuresCount: NewCounterDef("read_namespace_failures"),
ListExecutionsFailuresCount: NewCounterDef("list_executions_failures"),
TerminateExecutionFailuresCount: NewCounterDef("terminate_executions_failures"),
DeleteExecutionFailuresCount: NewCounterDef("delete_execution_failures"),
RateLimiterFailuresCount: NewCounterDef("rate_limiter_failures"),
},
Server: {
TlsCertsExpired: NewGaugeDef("certificates_expired"),
Expand Down
1 change: 1 addition & 0 deletions host/onebox.go
Expand Up @@ -708,6 +708,7 @@ func (c *temporalImpl) startWorker(hosts map[string][]string, startWG *sync.Wait
fx.Provide(func() dynamicconfig.Client { return newIntegrationConfigClient(dynamicconfig.NewNoopClient()) }),
fx.Provide(func() log.Logger { return c.logger }),
fx.Provide(func() esclient.Client { return c.esClient }),
fx.Provide(func() *esclient.Config { return c.esConfig }),

worker.Module,
fx.Populate(&workerService, &clientBean, &namespaceRegistry),
Expand Down
162 changes: 162 additions & 0 deletions service/worker/deletenamespace/activities.go
@@ -0,0 +1,162 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package deletenamespace

import (
"context"
"fmt"
"math/rand"
"time"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
)

type (
activities struct {
metadataManager persistence.MetadataManager
metricsClient metrics.Client
logger log.Logger
}
)

func NewActivities(
metadataManager persistence.MetadataManager,
metricsClient metrics.Client,
logger log.Logger,
) *activities {
return &activities{
metadataManager: metadataManager,
metricsClient: metricsClient,
logger: logger,
}
}

func (a *activities) GetNamespaceIDActivity(_ context.Context, nsName namespace.Name) (namespace.ID, error) {
getNamespaceRequest := &persistence.GetNamespaceRequest{
Name: nsName.String(),
}

getNamespaceResponse, err := a.metadataManager.GetNamespace(getNamespaceRequest)
if err != nil {
return namespace.EmptyID, err
}

if getNamespaceResponse.Namespace == nil || getNamespaceResponse.Namespace.Info == nil || getNamespaceResponse.Namespace.Info.Id == "" {
return namespace.EmptyID, serviceerror.NewInternal("namespace info is corrupted")
}

return namespace.ID(getNamespaceResponse.Namespace.Info.Id), nil
}

func (a *activities) MarkNamespaceDeletedActivity(_ context.Context, nsName namespace.Name) error {
getNamespaceRequest := &persistence.GetNamespaceRequest{
Name: nsName.String(),
}

metadata, err := a.metadataManager.GetMetadata()
if err != nil {
a.metricsClient.IncCounter(metrics.DeleteNamespaceWorkflowScope, metrics.ReadNamespaceFailuresCount)
a.logger.Error("Unable to get cluster metadata.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

ns, err := a.metadataManager.GetNamespace(getNamespaceRequest)
if err != nil {
a.metricsClient.IncCounter(metrics.DeleteNamespaceWorkflowScope, metrics.ReadNamespaceFailuresCount)
a.logger.Error("Unable to get namespace details.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

ns.Namespace.Info.State = enumspb.NAMESPACE_STATE_DELETED

updateRequest := &persistence.UpdateNamespaceRequest{
Namespace: ns.Namespace,
IsGlobalNamespace: ns.IsGlobalNamespace,
NotificationVersion: metadata.NotificationVersion,
}

err = a.metadataManager.UpdateNamespace(updateRequest)
if err != nil {
a.metricsClient.IncCounter(metrics.DeleteNamespaceWorkflowScope, metrics.UpdateNamespaceFailuresCount)
a.logger.Error("Unable to update namespace state to Deleted.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}
return nil
}

func (a *activities) GenerateDeletedNamespaceNameActivity(_ context.Context, nsName namespace.Name) (namespace.Name, error) {
var letters = []rune("1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
const suffixLength = 5
rand.Seed(time.Now().UnixNano())

for { // Just in case. Generated name should always be unique after first attempt.
b := make([]rune, suffixLength)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}

newName := fmt.Sprintf("%s-deleted-%s", nsName, string(b))

_, err := a.metadataManager.GetNamespace(&persistence.GetNamespaceRequest{
Name: newName,
})
switch err.(type) {
case nil:
a.logger.Warn("Regenerate namespace name due to collision.", tag.WorkflowNamespace(nsName.String()), tag.WorkflowNamespace(newName))
case *serviceerror.NotFound:
a.logger.Info("Generated new name for deleted namespace.", tag.WorkflowNamespace(nsName.String()), tag.WorkflowNamespace(newName))
return namespace.Name(newName), nil
default:
a.metricsClient.IncCounter(metrics.DeleteNamespaceWorkflowScope, metrics.ReadNamespaceFailuresCount)
a.logger.Error("Unable to get namespace details.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return namespace.EmptyName, err
}
}
}

func (a *activities) RenameNamespaceActivity(_ context.Context, previousName namespace.Name, newName namespace.Name) error {
renameNamespaceRequest := &persistence.RenameNamespaceRequest{
PreviousName: previousName.String(),
NewName: newName.String(),
}

err := a.metadataManager.RenameNamespace(renameNamespaceRequest)
if err != nil {
a.metricsClient.IncCounter(metrics.DeleteNamespaceWorkflowScope, metrics.RenameNamespaceFailuresCount)
a.logger.Error("Unable to rename namespace.", tag.WorkflowNamespace(previousName.String()), tag.Error(err))
return err
}

a.metricsClient.IncCounter(metrics.DeleteNamespaceWorkflowScope, metrics.RenameNamespaceSuccessCount)
a.logger.Info("Namespace renamed successfully.", tag.WorkflowNamespace(previousName.String()), tag.WorkflowNamespace(newName.String()))
return nil
}

0 comments on commit b6de7d5

Please sign in to comment.