Skip to content

Commit

Permalink
Add log and metrics to workflow termination events
Browse files Browse the repository at this point in the history
**What changed?**
Added logs to workflow termination events, containing the workflowID, runID and tagged with the domainName.
Added metrics to workflow termination events, using a counter per domain `WorkflowTerminateCounterPerDomain` under the `HistoryTerminateWorkflowExecutionScope` scope.

**Why?**
Improve workflow termination visibility, allowing Cadence and clients to easily find terminated workflows. This is particularly important to provide better information for workflows terminated during failovers.

**How did you test it?**
Unit tests.

**Potential risks**
The risks are associated with the changes in functions parameters being passed. Need to ensure that the parameters are correct and that they do not contain `nil` values.

**Release notes**

**Documentation Changes**
  • Loading branch information
fimanishi committed Jun 25, 2024
1 parent 83ebf7a commit f3a7f5d
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 10 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,7 @@ const (
TaskQueueLatencyPerDomain
TransferTaskMissingEventCounterPerDomain
ReplicationTasksAppliedPerDomain
WorkflowTerminateCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

Expand Down Expand Up @@ -2878,6 +2879,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},
ReplicationTasksAppliedPerDomain: {metricName: "replication_tasks_applied_per_domain", metricRollupName: "replication_tasks_applied", metricType: Counter},
WorkflowTerminateCounterPerDomain: {metricName: "workflow_terminate_counter_per_domain", metricRollupName: "workflow_terminate_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskBatchCompleteFailure: {metricName: "task_batch_complete_error", metricType: Counter},
Expand Down
2 changes: 2 additions & 0 deletions service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ Update_History_Loop:
common.FailureReasonTransactionSizeExceedsLimit,
[]byte(updateErr.Error()),
execution.IdentityHistoryService,
handler.logger,
handler.metricsClient,
); err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions service/history/engine/engineimpl/reapply_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func (e *historyEngineImpl) ReapplyEvents(
wfContext,
mutableState,
execution.NoopReleaseFn,
e.logger,
e.metricsClient,
),
ndc.EventsReapplicationResetWorkflowReason,
toReapplyEvents,
Expand Down
2 changes: 2 additions & 0 deletions service/history/engine/engineimpl/reset_workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
currentContext,
currentMutableState,
currentReleaseFn,
e.logger,
e.metricsClient,
),
request.GetReason(),
nil,
Expand Down
2 changes: 2 additions & 0 deletions service/history/engine/engineimpl/start_workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,8 @@ UpdateWorkflowLoop:
TerminateIfRunningReason,
getTerminateIfRunningDetails(workflowExecution.GetRunID()),
execution.IdentityHistoryService,
e.logger,
e.metricsClient,
); err != nil {
if err == workflow.ErrStaleState {
// Handler detected that cached workflow mutable could potentially be stale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
request.GetReason(),
request.GetDetails(),
request.GetIdentity(),
e.logger,
e.metricsClient,
)
})
}
41 changes: 33 additions & 8 deletions service/history/execution/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"fmt"

"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
Expand Down Expand Up @@ -56,10 +59,12 @@ type (
workflowImpl struct {
clusterMetadata cluster.Metadata

ctx context.Context
context Context
mutableState MutableState
releaseFn ReleaseFunc
ctx context.Context
context Context
mutableState MutableState
releaseFn ReleaseFunc
logger log.Logger
metricsClient metrics.Client
}
)

Expand All @@ -70,15 +75,19 @@ func NewWorkflow(
context Context,
mutableState MutableState,
releaseFn ReleaseFunc,
logger log.Logger,
metricsClient metrics.Client,
) Workflow {

return &workflowImpl{
ctx: ctx,
clusterMetadata: clusterMetadata,

context: context,
mutableState: mutableState,
releaseFn: releaseFn,
context: context,
mutableState: mutableState,
releaseFn: releaseFn,
logger: logger,
metricsClient: metricsClient,
}
}

Expand Down Expand Up @@ -189,7 +198,7 @@ func (r *workflowImpl) SuppressBy(
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if currentCluster == lastWriteCluster {
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion)
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion, r.logger, r.metricsClient)
}
return TransactionPolicyPassive, r.zombiefyWorkflow()
}
Expand Down Expand Up @@ -251,6 +260,8 @@ func (r *workflowImpl) failDecision(
func (r *workflowImpl) terminateWorkflow(
lastWriteVersion int64,
incomingLastWriteVersion int64,
logger log.Logger,
metricsClient metrics.Client,
) error {

eventBatchFirstEventID := r.GetMutableState().GetNextEventID()
Expand All @@ -270,6 +281,20 @@ func (r *workflowImpl) terminateWorkflow(
WorkflowTerminationIdentity,
)

domainName := r.mutableState.GetDomainEntry().GetInfo().Name

logger.Info(
fmt.Sprintf(
"Workflow with WorkflowID %v and RunID %v is terminated.",
r.mutableState.GetExecutionInfo().WorkflowID,
r.mutableState.GetExecutionInfo().RunID,
),
tag.WorkflowDomainName(domainName),
)

scopeWithDomainTag := metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).Tagged(metrics.DomainTag(domainName))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return err
}

Expand Down
26 changes: 25 additions & 1 deletion service/history/execution/workflow_execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@

package execution

import "github.com/uber/cadence/common/types"
import (
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)

// TerminateWorkflow is a helper function to terminate workflow
func TerminateWorkflow(
Expand All @@ -29,6 +36,8 @@ func TerminateWorkflow(
terminateReason string,
terminateDetails []byte,
terminateIdentity string,
logger log.Logger,
metricsClient metrics.Client,
) error {

if decision, ok := mutableState.GetInFlightDecision(); ok {
Expand All @@ -47,5 +56,20 @@ func TerminateWorkflow(
terminateDetails,
terminateIdentity,
)

domainName := mutableState.GetDomainEntry().GetInfo().Name

logger.Info(
fmt.Sprintf(
"Workflow with WorkflowID %v and RunID %v is terminated.",
mutableState.GetExecutionInfo().WorkflowID,
mutableState.GetExecutionInfo().RunID,
),
tag.WorkflowDomainName(domainName),
)

scopeWithDomainTag := metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).Tagged(metrics.DomainTag(domainName))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return err
}
51 changes: 51 additions & 0 deletions service/history/execution/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package execution

import (
"context"
"fmt"
"reflect"
"runtime"
"testing"
Expand All @@ -30,10 +31,16 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/constants"
)

type (
Expand All @@ -44,6 +51,9 @@ type (
controller *gomock.Controller
mockContext *MockContext
mockMutableState *MockMutableState
logger log.Logger
metricsClient metrics.Client
testScope tally.TestScope

domainID string
workflowID string
Expand All @@ -62,6 +72,9 @@ func (s *workflowSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockContext = NewMockContext(s.controller)
s.mockMutableState = NewMockMutableState(s.controller)
s.logger = log.NewNoop()
s.testScope = tally.NewTestScope("test", nil)
s.metricsClient = metrics.NewClient(s.testScope, metrics.History)

s.domainID = uuid.New()
s.workflowID = "some random workflow ID"
Expand Down Expand Up @@ -89,6 +102,8 @@ func (s *workflowSuite) TestGetMethods() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

s.Equal(s.mockContext, nDCWorkflow.GetContext())
Expand Down Expand Up @@ -168,6 +183,8 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Error() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

incomingMockContext := NewMockContext(s.controller)
Expand All @@ -178,6 +195,8 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Error() {
incomingMockContext,
incomingMockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

// cannot suppress by older workflow
Expand Down Expand Up @@ -218,12 +237,23 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
RunID: s.runID,
LastEventTaskID: lastEventTaskID,
}).AnyTimes()
domainCacheEntry := cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{Name: constants.TestDomainName},
&persistence.DomainConfig{},
false,
&persistence.DomainReplicationConfig{},
1,
common.Int64Ptr(1),
)
s.mockMutableState.EXPECT().GetDomainEntry().Return(domainCacheEntry).Times(1)
nDCWorkflow := NewWorkflow(
context.Background(),
cluster.TestActiveClusterMetadata,
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

incomingRunID := uuid.New()
Expand All @@ -237,6 +267,8 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
incomingMockContext,
incomingMockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
Expand Down Expand Up @@ -282,6 +314,9 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
policy, err = nDCWorkflow.SuppressBy(incomingNDCWorkflow)
s.NoError(err)
s.Equal(TransactionPolicyActive, policy)
countersSnapshot := s.testScope.Snapshot().Counters()
s.Contains(countersSnapshot, fmt.Sprintf("test.workflow_terminate_counter_per_domain+domain=%v,operation=TerminateWorkflowExecution", constants.TestDomainName))
s.Equal(int64(1), countersSnapshot[fmt.Sprintf("test.workflow_terminate_counter_per_domain+domain=%v,operation=TerminateWorkflowExecution", constants.TestDomainName)].Value())
}

func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
Expand All @@ -303,6 +338,8 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)

incomingRunID := uuid.New()
Expand All @@ -316,6 +353,8 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Zombiefy() {
incomingMockContext,
incomingMockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
incomingMockMutableState.EXPECT().GetLastWriteVersion().Return(incomingLastEventVersion, nil).AnyTimes()
incomingMockMutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{
Expand Down Expand Up @@ -350,6 +389,8 @@ func (s *workflowSuite) TestRevive_Zombie_Error() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.Revive()
s.Error(err)
Expand All @@ -366,6 +407,8 @@ func (s *workflowSuite) TestRevive_Zombie_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.Revive()
s.NoError(err)
Expand All @@ -380,6 +423,8 @@ func (s *workflowSuite) TestRevive_NonZombie_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.Revive()
s.NoError(err)
Expand Down Expand Up @@ -410,6 +455,8 @@ func (s *workflowSuite) TestFlushBufferedEvents_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.FlushBufferedEvents()
s.NoError(err)
Expand All @@ -425,6 +472,8 @@ func (s *workflowSuite) TestFlushBufferedEvents_NoBuffer_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.FlushBufferedEvents()
s.NoError(err)
Expand All @@ -447,6 +496,8 @@ func (s *workflowSuite) TestFlushBufferedEvents_NoDecision_Success() {
s.mockContext,
s.mockMutableState,
NoopReleaseFn,
s.logger,
s.metricsClient,
)
err := nDCWorkflow.FlushBufferedEvents()
s.NoError(err)
Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/branch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (r *branchManagerImpl) flushBufferedEvents(
r.context,
r.mutableState,
execution.NoopReleaseFn,
r.logger,
r.shard.GetMetricsClient(),
)
if err := targetWorkflow.FlushBufferedEvents(); err != nil {
return 0, nil, err
Expand Down
Loading

0 comments on commit f3a7f5d

Please sign in to comment.