Skip to content

Commit

Permalink
Improve deletenamespace workflow logging (#2667)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Mar 29, 2022
1 parent 9469618 commit 7bff5ee
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Expand Up @@ -2214,7 +2214,9 @@ const (
ReadNamespaceFailuresCount
ListExecutionsFailuresCount
TerminateExecutionFailuresCount
TerminateExecutionNotFoundCount
DeleteExecutionFailuresCount
DeleteExecutionNotFoundCount
RateLimiterFailuresCount

NumWorkerMetrics
Expand Down Expand Up @@ -2683,7 +2685,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReadNamespaceFailuresCount: NewCounterDef("read_namespace_failures"),
ListExecutionsFailuresCount: NewCounterDef("list_executions_failures"),
TerminateExecutionFailuresCount: NewCounterDef("terminate_executions_failures"),
TerminateExecutionNotFoundCount: NewCounterDef("terminate_executions_not_found"),
DeleteExecutionFailuresCount: NewCounterDef("delete_execution_failures"),
DeleteExecutionNotFoundCount: NewCounterDef("delete_execution_not_found"),
RateLimiterFailuresCount: NewCounterDef("rate_limiter_failures"),
},
Server: {
Expand Down
11 changes: 8 additions & 3 deletions service/worker/deletenamespace/deleteexecutions/activities.go
Expand Up @@ -116,14 +116,14 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete
resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to list all workflows.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err))
a.logger.Error("Unable to list all workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err))
return result, err
}
for _, execution := range resp.Executions {
err = rateLimiter.Wait(ctx)
if err != nil {
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.RateLimiterFailuresCount)
a.logger.Error("Workflow execution deletion rate limiter error.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err))
a.logger.Error("Workflow executions delete rate limiter error.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err))
return result, err
}
if execution.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
Expand All @@ -136,7 +136,10 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete
},
})
switch err.(type) {
case nil, *serviceerror.NotFound: // Workflow execution has already completed or doesn't exist.
case nil:
case *serviceerror.NotFound: // Workflow execution has already completed or doesn't exist.
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.TerminateExecutionNotFoundCount)
a.logger.Info("Workflow execution is not found or not running.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
default:
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.TerminateExecutionFailuresCount)
a.logger.Error("Unable to terminate workflow execution.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()), tag.Error(err))
Expand All @@ -153,6 +156,8 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete
result.SuccessCount++
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionsSuccessCount)
case *serviceerror.NotFound: // Workflow execution doesn't exist. Do nothing.
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionNotFoundCount)
a.logger.Info("Workflow execution is not found.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()))
default:
result.ErrorCount++
a.metricsClient.IncCounter(metrics.DeleteExecutionsWorkflowScope, metrics.DeleteExecutionFailuresCount)
Expand Down
42 changes: 22 additions & 20 deletions service/worker/deletenamespace/reclaimresources/activities.go
Expand Up @@ -27,13 +27,14 @@ package reclaimresources
import (
"context"

"go.temporal.io/sdk/activity"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/service/worker/deletenamespace/errors"
)

type (
Expand Down Expand Up @@ -61,6 +62,7 @@ func NewActivities(

func (a *Activities) EnsureNoExecutionsActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
// TODO: remove this check after CountWorkflowExecutions is implemented in standard visibility.
count := int64(0)
if a.visibilityManager.GetName() == "elasticsearch" {
req := &manager.CountWorkflowExecutionsRequest{
NamespaceID: nsID,
Expand All @@ -69,30 +71,30 @@ func (a *Activities) EnsureNoExecutionsActivity(ctx context.Context, nsID namesp
resp, err := a.visibilityManager.CountWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflows.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
a.logger.Error("Unable to count workflow executions.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

if resp.Count > 0 {
return errors.ErrExecutionsStillExist
count = resp.Count
} else {
req := &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: nsID,
Namespace: nsName,
PageSize: 1,
}
return nil
}

req := &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: nsID,
Namespace: nsName,
PageSize: 1,
}
resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflows using list.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflow executions using list.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}
// If not 0, it will always be 1 due to PageSize set to 1.
count = int64(len(resp.Executions))
}

if len(resp.Executions) > 0 {
return errors.ErrExecutionsStillExist
if count > 0 {
a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(int(count)))
activity.RecordHeartbeat(ctx, count)
}
return nil
}
Expand All @@ -105,7 +107,7 @@ func (a *Activities) DeleteNamespaceActivity(ctx context.Context, nsID namespace
err := a.metadataManager.DeleteNamespaceByName(ctx, deleteNamespaceRequest)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.DeleteNamespaceFailuresCount)
a.logger.Error("Unable delete namespace from persistence.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
a.logger.Error("Unable to delete namespace from persistence.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

Expand Down

0 comments on commit 7bff5ee

Please sign in to comment.