From 01c17dd1b175d411483a62e0471c854129bc7a0f Mon Sep 17 00:00:00 2001 From: yiminc Date: Thu, 3 Aug 2017 10:18:01 -0700 Subject: [PATCH] add client side metrics (#197) * add client side metrics * address comments --- client.go | 3 + common/metrics/constants.go | 56 +++++++++++++----- internal_public.go | 1 - internal_task_handlers.go | 61 ++++++++++---------- internal_task_pollers.go | 110 +++++++++++++++++++----------------- internal_worker.go | 5 ++ internal_workflow.go | 28 ++++----- internal_workflow_client.go | 3 +- 8 files changed, 152 insertions(+), 115 deletions(-) diff --git a/client.go b/client.go index fb4580e74..d25fd9b0d 100644 --- a/client.go +++ b/client.go @@ -200,6 +200,9 @@ func NewClient(service m.TChanWorkflowService, domain string, options *ClientOpt if options != nil { metricScope = options.MetricsScope } + if metricScope == nil { + metricScope = tally.NoopScope + } return &workflowClient{ workflowService: service, domain: domain, diff --git a/common/metrics/constants.go b/common/metrics/constants.go index fb0376b89..8c6e1c834 100644 --- a/common/metrics/constants.go +++ b/common/metrics/constants.go @@ -22,19 +22,45 @@ package metrics // Workflow Creation metrics const ( - WorkflowsStartTotalCounter = "workflows-start-total" - ActivitiesTotalCounter = "activities-total" - DecisionsTotalCounter = "decisions-total" - DecisionsTimeoutCounter = "decisions-timeout" - WorkflowsCompletionTotalCounter = "workflows-completion-total" - WorkflowEndToEndLatency = "workflows-endtoend-latency" - ActivityEndToEndLatency = "activities-endtoend-latency" - DecisionsEndToEndLatency = "decisions-endtoend-latency" - ActivityPollLatency = "activities-poll-latency" - DecisionsPollLatency = "decisions-poll-latency" - ActivityExecutionLatency = "activities-execution-latency" - DecisionsExecutionLatency = "decisions-execution-latency" - ActivityResponseLatency = "activities-response-latency" - DecisionsResponseLatency = "decisions-response-latency" - UnhandledSignalsTotalCounter = "unhandled-signals-total" + WorkflowStartCounter = "workflow-start" + WorkflowCompletedCounter = "workflow-completed" + WorkflowCanceledCounter = "workflow-canceled" + WorkflowFailedCounter = "workflow-failed" + WorkflowContinueAsNewCounter = "workflow-continue-as-new" + WorkflowEndToEndLatency = "workflow-endtoend-latency" // measure workflow execution from start to close + WorkflowGetHistoryCounter = "workflow-get-history-total" + WorkflowGetHistoryFailedCounter = "workflow-get-history-failed" + WorkflowGetHistorySucceedCounter = "workflow-get-history-succeed" + WorkflowGetHistoryLatency = "workflow-get-history-latency" + DecisionTimeoutCounter = "decision-timeout" + + DecisionPollCounter = "decision-poll-total" + DecisionPollFailedCounter = "decision-poll-failed" + DecisionPollNoTaskCounter = "decision-poll-no-task" + DecisionPollSucceedCounter = "decision-poll-succeed" + DecisionPollLatency = "decision-poll-latency" // measure succeed poll request latency + DecisionExecutionFailedCounter = "decision-execution-failed" + DecisionExecutionLatency = "decision-execution-latency" + DecisionResponseFailedCounter = "decision-response-failed" + DecisionResponseLatency = "decision-response-latency" + DecisionEndToEndLatency = "decision-endtoend-latency" // measure from poll request start to response completed + DecisionTaskPanicCounter = "decision-task-panic" + DecisionTaskCompletedCounter = "decision-task-completed" + + ActivityPollCounter = "activity-poll-total" + ActivityPollFailedCounter = "activity-poll-failed" + ActivityPollNoTaskCounter = "activity-poll-no-task" + ActivityPollSucceedCounter = "activity-poll-succeed" + ActivityPollLatency = "activity-poll-latency" + ActivityExecutionFailedCounter = "activity-execution-failed" + ActivityExecutionLatency = "activity-execution-latency" + ActivityResponseLatency = "activity-response-latency" + ActivityResponseFailedCounter = "activity-response-failed" + ActivityEndToEndLatency = "activity-endtoend-latency" + ActivityTaskPanicCounter = "activity-task-panic" + ActivityTaskCompletedCounter = "activity-task-completed" + ActivityTaskFailedCounter = "activity-task-failed" + ActivityTaskCanceledCounter = "activity-task-canceled" + + UnhandledSignalsCounter = "unhandled-signals" ) diff --git a/internal_public.go b/internal_public.go index 344e67a9d..a0ebd0e8a 100644 --- a/internal_public.go +++ b/internal_public.go @@ -118,7 +118,6 @@ func NewWorkflowTaskHandler(domain string, identity string, logger *zap.Logger) Identity: identity, Logger: logger, } - ensureRequiredParams(¶ms) return newWorkflowTaskHandler( getWorkflowDefinitionFactory(newRegisteredWorkflowFactory()), domain, diff --git a/internal_task_handlers.go b/internal_task_handlers.go index aceb2e0b5..f7c8b4f6a 100644 --- a/internal_task_handlers.go +++ b/internal_task_handlers.go @@ -61,11 +61,13 @@ type ( workflowTask struct { task *s.PollForDecisionTaskResponse getHistoryPageFunc GetHistoryPage + pollStartTime time.Time } // activityTask wraps a activity task. activityTask struct { - task *s.PollForActivityTaskResponse + task *s.PollForActivityTaskResponse + pollStartTime time.Time } ) @@ -117,13 +119,13 @@ func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandler return result } -// Get workflow start attributes. -func (eh *history) GetWorkflowStartedAttr() (*s.WorkflowExecutionStartedEventAttributes, error) { +// Get workflow start event. +func (eh *history) GetWorkflowStartedEvent() (*s.HistoryEvent, error) { events := eh.workflowTask.task.History.Events if len(events) == 0 || events[0].GetEventType() != s.EventType_WorkflowExecutionStarted { return nil, errors.New("unable to find WorkflowExecutionStartedEventAttributes in the history") } - return events[0].WorkflowExecutionStartedEventAttributes, nil + return events[0], nil } // Get last non replayed event ID. @@ -292,6 +294,7 @@ OrderEvents: // newWorkflowTaskHandler returns an implementation of workflow task handler. func newWorkflowTaskHandler(factory workflowDefinitionFactory, domain string, params workerExecutionParameters, ppMgr pressurePointMgr) WorkflowTaskHandler { + ensureRequiredParams(¶ms) return &workflowTaskHandlerImpl{ workflowDefFactory: factory, domain: domain, @@ -309,14 +312,6 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( getHistoryPage GetHistoryPage, emitStack bool, ) (result *s.RespondDecisionTaskCompletedRequest, stackTrace string, err error) { - currentTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(currentTime) - if wth.metricsScope != nil { - wth.metricsScope.Timer(metrics.DecisionsExecutionLatency).Record(deltaTime) - } - }() - if task == nil { return nil, "", errors.New("nil workflowtask provided") } @@ -376,8 +371,6 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( replayDecisions := []*s.Decision{} respondEvents := []*s.HistoryEvent{} - startTime := time.Now() - // Process events ProcessEvents: for { @@ -438,25 +431,40 @@ ProcessEvents: return nil, "", err } - startAttributes, err := reorderedHistory.GetWorkflowStartedAttr() + startEvent, err := reorderedHistory.GetWorkflowStartedEvent() if err != nil { wth.logger.Error("Unable to read workflow start attributes.", zap.Error(err)) return nil, "", err } - if _, ok := failure.(*PanicError); ok { + if panicErr, ok := failure.(*PanicError); ok { // Timeout the Decision instead of failing workflow. // TODO: Pump this stack trace on to workflow history for debuggability by exposing decision type fail to client. + wth.metricsScope.Counter(metrics.DecisionTaskPanicCounter).Inc(1) + wth.logger.Error("Workflow panic.", + zap.String("PanicError", panicErr.Error()), + zap.String("PanicStack", panicErr.StackTrace())) + return nil, "", failure } - + startAttributes := startEvent.WorkflowExecutionStartedEventAttributes closeDecision := wth.completeWorkflow(isWorkflowCompleted, completionResult, failure, startAttributes) if closeDecision != nil { decisions = append(decisions, closeDecision) - if wth.metricsScope != nil { - wth.metricsScope.Counter(metrics.WorkflowsCompletionTotalCounter).Inc(1) - elapsed := time.Now().Sub(startTime) - wth.metricsScope.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed) + + wfStartTime := time.Unix(0, startEvent.GetTimestamp()) + elapsed := time.Now().Sub(wfStartTime) + wth.metricsScope.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed) + + switch closeDecision.GetDecisionType() { + case s.DecisionType_CompleteWorkflowExecution: + wth.metricsScope.Counter(metrics.WorkflowCompletedCounter).Inc(1) + case s.DecisionType_FailWorkflowExecution: + wth.metricsScope.Counter(metrics.WorkflowFailedCounter).Inc(1) + case s.DecisionType_CancelWorkflowExecution: + wth.metricsScope.Counter(metrics.WorkflowCanceledCounter).Inc(1) + case s.DecisionType_ContinueAsNewWorkflowExecution: + wth.metricsScope.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1) } } @@ -757,7 +765,7 @@ func (wth *workflowTaskHandlerImpl) reportAnyMetrics(event *s.HistoryEvent, isIn if wth.metricsScope != nil && !isInReplay { switch event.GetEventType() { case s.EventType_DecisionTaskTimedOut: - wth.metricsScope.Counter(metrics.DecisionsTimeoutCounter).Inc(1) + wth.metricsScope.Counter(metrics.DecisionTimeoutCounter).Inc(1) } } } @@ -900,14 +908,6 @@ func newServiceInvoker( // Execute executes an implementation of the activity. func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (result interface{}, err error) { - startTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(startTime) - if ath.metricsScope != nil { - ath.metricsScope.Timer(metrics.ActivityExecutionLatency).Record(deltaTime) - } - }() - traceLog(func() { ath.logger.Debug("Processing new activity task", zap.String(tagWorkflowID, t.GetWorkflowExecution().GetWorkflowId()), @@ -938,6 +938,7 @@ func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (r ath.logger.Error("Activity panic.", zap.String("PanicError", fmt.Sprintf("%v", p)), zap.String("PanicStack", st)) + ath.metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1) panicErr := newPanicError(p, st) result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr), nil } diff --git a/internal_task_pollers.go b/internal_task_pollers.go index 87187f59a..39b326d07 100644 --- a/internal_task_pollers.go +++ b/internal_task_pollers.go @@ -129,20 +129,12 @@ func newWorkflowTaskPoller(taskHandler WorkflowTaskHandler, service m.TChanWorkf // PollTask polls a new task func (wtp *workflowTaskPoller) PollTask() (interface{}, error) { - startTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(startTime) - if wtp.metricsScope != nil { - wtp.metricsScope.Counter(metrics.DecisionsTotalCounter).Inc(1) - wtp.metricsScope.Timer(metrics.DecisionsEndToEndLatency).Record(deltaTime) - } - }() - // Get the task. workflowTask, err := wtp.poll() if err != nil { return nil, err } + return workflowTask, nil } @@ -157,20 +149,16 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error { return nil } + executionStartTime := time.Now() // Process the task. completedRequest, _, err := wtp.taskHandler.ProcessWorkflowTask(workflowTask.task, workflowTask.getHistoryPageFunc, false) if err != nil { + wtp.metricsScope.Counter(metrics.DecisionExecutionFailedCounter).Inc(1) return err } + wtp.metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(executionStartTime)) responseStartTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(responseStartTime) - if wtp.metricsScope != nil { - wtp.metricsScope.Timer(metrics.DecisionsResponseLatency).Record(deltaTime) - } - }() - // Respond task completion. err = backoff.Retry( func() error { @@ -186,21 +174,21 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error { }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { + wtp.metricsScope.Counter(metrics.DecisionResponseFailedCounter).Inc(1) return err } - return nil + wtp.metricsScope.Timer(metrics.DecisionResponseLatency).Record(time.Now().Sub(responseStartTime)) + wtp.metricsScope.Timer(metrics.DecisionEndToEndLatency).Record(time.Now().Sub(workflowTask.pollStartTime)) + wtp.metricsScope.Counter(metrics.DecisionTaskCompletedCounter).Inc(1) + + return nil } // Poll for a single workflow task from the service func (wtp *workflowTaskPoller) poll() (*workflowTask, error) { startTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(startTime) - if wtp.metricsScope != nil { - wtp.metricsScope.Timer(metrics.DecisionsPollLatency).Record(deltaTime) - } - }() + wtp.metricsScope.Counter(metrics.DecisionPollCounter).Inc(1) traceLog(func() { wtp.logger.Debug("workflowTaskPoller::Poll") @@ -216,15 +204,20 @@ func (wtp *workflowTaskPoller) poll() (*workflowTask, error) { response, err := wtp.service.PollForDecisionTask(ctx, request) if err != nil { + wtp.metricsScope.Counter(metrics.DecisionPollFailedCounter).Inc(1) return nil, err } + if response == nil || len(response.GetTaskToken()) == 0 { + wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1) return &workflowTask{}, nil } execution := response.GetWorkflowExecution() - iterator := newGetHistoryPageFunc(wtp.service, wtp.domain, execution, math.MaxInt64) - task := &workflowTask{task: response, getHistoryPageFunc: iterator} + iterator := newGetHistoryPageFunc(wtp.service, wtp.domain, execution, math.MaxInt64, wtp.metricsScope) + task := &workflowTask{task: response, getHistoryPageFunc: iterator, pollStartTime: startTime} + wtp.metricsScope.Counter(metrics.DecisionPollSucceedCounter).Inc(1) + wtp.metricsScope.Timer(metrics.DecisionPollLatency).Record(time.Now().Sub(startTime)) return task, nil } @@ -232,8 +225,12 @@ func newGetHistoryPageFunc( service m.TChanWorkflowService, domain string, execution *s.WorkflowExecution, - atDecisionTaskCompletedEventID int64) func(nextPageToken []byte) (*s.History, []byte, error) { + atDecisionTaskCompletedEventID int64, + metricsScope tally.Scope, +) func(nextPageToken []byte) (*s.History, []byte, error) { return func(nextPageToken []byte) (*s.History, []byte, error) { + metricsScope.Counter(metrics.WorkflowGetHistoryCounter).Inc(1) + startTime := time.Now() var resp *s.GetWorkflowExecutionHistoryResponse err := backoff.Retry( func() error { @@ -249,8 +246,12 @@ func newGetHistoryPageFunc( return err1 }, serviceOperationRetryPolicy, isServiceTransientError) if err != nil { + metricsScope.Counter(metrics.WorkflowGetHistoryFailedCounter).Inc(1) return nil, nil, err } + + metricsScope.Counter(metrics.WorkflowGetHistorySucceedCounter).Inc(1) + metricsScope.Timer(metrics.WorkflowGetHistoryLatency).Record(time.Now().Sub(startTime)) h := resp.GetHistory() size := len(h.Events) if size > 0 && atDecisionTaskCompletedEventID > 0 && @@ -282,12 +283,8 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service m.TChanWorkf // Poll for a single activity task from the service func (atp *activityTaskPoller) poll() (*activityTask, error) { startTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(startTime) - if atp.metricsScope != nil { - atp.metricsScope.Timer(metrics.ActivityPollLatency).Record(deltaTime) - } - }() + + atp.metricsScope.Counter(metrics.ActivityPollCounter).Inc(1) traceLog(func() { atp.logger.Debug("activityTaskPoller::Poll") @@ -303,25 +300,21 @@ func (atp *activityTaskPoller) poll() (*activityTask, error) { response, err := atp.service.PollForActivityTask(ctx, request) if err != nil { + atp.metricsScope.Counter(metrics.ActivityPollFailedCounter).Inc(1) return nil, err } if response == nil || len(response.GetTaskToken()) == 0 { + atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1) return &activityTask{}, nil } - return &activityTask{task: response}, nil + + atp.metricsScope.Counter(metrics.ActivityPollSucceedCounter).Inc(1) + atp.metricsScope.Timer(metrics.ActivityPollLatency).Record(time.Now().Sub(startTime)) + return &activityTask{task: response, pollStartTime: startTime}, nil } // PollTask polls a new task func (atp *activityTaskPoller) PollTask() (interface{}, error) { - startTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(startTime) - if atp.metricsScope != nil { - atp.metricsScope.Counter(metrics.ActivitiesTotalCounter).Inc(1) - atp.metricsScope.Timer(metrics.ActivityEndToEndLatency).Record(deltaTime) - } - }() - // Get the task. activityTask, err := atp.poll() if err != nil { @@ -341,20 +334,33 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { return nil } + executionStartTime := time.Now() // Process the activity task. request, err := atp.taskHandler.Execute(activityTask.task) if err != nil { + atp.metricsScope.Counter(metrics.ActivityExecutionFailedCounter).Inc(1) return err } + atp.metricsScope.Timer(metrics.ActivityExecutionLatency).Record(time.Now().Sub(executionStartTime)) + + if request == nil { + // this could be true when activity returns ErrActivityResultPending. + return nil + } + responseStartTime := time.Now() reportErr := reportActivityComplete(atp.service, request, atp.metricsScope) if reportErr != nil { + atp.metricsScope.Counter(metrics.ActivityResponseFailedCounter).Inc(1) traceLog(func() { atp.logger.Debug("reportActivityComplete failed", zap.Error(reportErr)) }) + return reportErr } - return reportErr + atp.metricsScope.Timer(metrics.ActivityResponseLatency).Record(time.Now().Sub(responseStartTime)) + atp.metricsScope.Timer(metrics.ActivityEndToEndLatency).Record(time.Now().Sub(activityTask.pollStartTime)) + return nil } func reportActivityComplete(service m.TChanWorkflowService, request interface{}, metricsScope tally.Scope) error { @@ -363,14 +369,6 @@ func reportActivityComplete(service m.TChanWorkflowService, request interface{}, return nil } - startTime := time.Now() - defer func() { - deltaTime := time.Now().Sub(startTime) - if metricsScope != nil { - metricsScope.Timer(metrics.ActivityResponseLatency).Record(deltaTime) - } - }() - ctx, cancel := newTChannelContext() defer cancel() var reportErr error @@ -391,6 +389,16 @@ func reportActivityComplete(service m.TChanWorkflowService, request interface{}, return service.RespondActivityTaskCompleted(ctx, request) }, serviceOperationRetryPolicy, isServiceTransientError) } + if reportErr == nil { + switch request.(type) { + case *s.RespondActivityTaskCanceledRequest: + metricsScope.Counter(metrics.ActivityTaskCanceledCounter).Inc(1) + case *s.RespondActivityTaskFailedRequest: + metricsScope.Counter(metrics.ActivityTaskFailedCounter).Inc(1) + case *s.RespondActivityTaskCompletedRequest: + metricsScope.Counter(metrics.ActivityTaskCompletedCounter).Inc(1) + } + } return reportErr } diff --git a/internal_worker.go b/internal_worker.go index 7f34979c3..e4334ed46 100644 --- a/internal_worker.go +++ b/internal_worker.go @@ -159,6 +159,7 @@ func ensureRequiredParams(params *workerExecutionParameters) { if params.MetricsScope == nil { params.MetricsScope = tally.NoopScope + params.Logger.Info("No metrics scope configured for cadence worker. Use NoopScope as default.") } } @@ -845,6 +846,10 @@ func newAggregatedWorker( } ensureRequiredParams(&workerParams) + tags := map[string]string{ + tagDomain: domain, + } + workerParams.MetricsScope = workerParams.MetricsScope.Tagged(tags) workerParams.Logger = workerParams.Logger.With( zapcore.Field{Key: tagDomain, Type: zapcore.StringType, String: domain}, zapcore.Field{Key: tagTaskList, Type: zapcore.StringType, String: taskList}, diff --git a/internal_workflow.go b/internal_workflow.go index 6c2671c34..91abf8d02 100644 --- a/internal_workflow.go +++ b/internal_workflow.go @@ -34,6 +34,7 @@ import ( "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/common" + "go.uber.org/cadence/common/metrics" "go.uber.org/zap" ) @@ -410,33 +411,26 @@ func getDispatcher(ctx Context) dispatcher { // executeDispatcher executed coroutines in the calling thread and calls workflow completion callbacks // if root workflow function returned func executeDispatcher(ctx Context, dispatcher dispatcher) { - checkUnhandledSigFn := func(ctx Context) { - us := getWorkflowEnvOptions(ctx).getUnhandledSignals() - if len(us) > 0 { - getWorkflowEnvironment(ctx).GetLogger().Warn("Workflow has unhandled signals", - zap.Strings("SignalNames", us)) - // TODO: We don't have a metrics added to workflow environment yet, - // this need to be reported as a metric. - } - } - + env := getWorkflowEnvironment(ctx) panicErr := dispatcher.ExecuteUntilAllBlocked() if panicErr != nil { - env := getWorkflowEnvironment(ctx) - env.GetLogger().Error("Dispatcher panic.", - zap.String("PanicError", panicErr.Error()), - zap.String("PanicStack", panicErr.StackTrace())) - checkUnhandledSigFn(ctx) env.Complete(nil, panicErr) return } + rp := *getWorkflowResultPointerPointer(ctx) if rp == nil { // Result is not set, so workflow is still executing return } - checkUnhandledSigFn(ctx) - getWorkflowEnvironment(ctx).Complete(rp.workflowResult, rp.error) + + us := getWorkflowEnvOptions(ctx).getUnhandledSignals() + if len(us) > 0 { + env.GetLogger().Warn("Workflow has unhandled signals", zap.Strings("SignalNames", us)) + env.GetMetricsScope().Counter(metrics.UnhandledSignalsCounter).Inc(1) + } + + env.Complete(rp.workflowResult, rp.error) } // For troubleshooting stack pretty printing only. diff --git a/internal_workflow_client.go b/internal_workflow_client.go index 3f3b0fa45..3f99f4f44 100644 --- a/internal_workflow_client.go +++ b/internal_workflow_client.go @@ -131,7 +131,7 @@ func (wc *workflowClient) StartWorkflow( } if wc.metricsScope != nil { - wc.metricsScope.Counter(metrics.WorkflowsStartTotalCounter).Inc(1) + wc.metricsScope.Counter(metrics.WorkflowStartCounter).Inc(1) } executionInfo := &WorkflowExecution{ @@ -256,6 +256,7 @@ func (wc *workflowClient) GetWorkflowStackTrace(workflowID string, runID string, wc.domain, &s.WorkflowExecution{WorkflowId: common.StringPtr(workflowID), RunId: common.StringPtr(runID)}, atDecisionTaskCompletedEventID, + wc.metricsScope, ) return getWorkflowStackTraceImpl(workflowID, runID, getHistoryPage) }