Skip to content

Commit

Permalink
Merge branch 'master' into tchan
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Aug 3, 2017
2 parents ebe7c5c + 01c17dd commit 6e592f3
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 115 deletions.
3 changes: 3 additions & 0 deletions client.go
Expand Up @@ -200,6 +200,9 @@ func NewClient(service m.TChanWorkflowService, domain string, options *ClientOpt
if options != nil {
metricScope = options.MetricsScope
}
if metricScope == nil {
metricScope = tally.NoopScope
}
return &workflowClient{
workflowService: service,
domain: domain,
Expand Down
56 changes: 41 additions & 15 deletions common/metrics/constants.go
Expand Up @@ -22,19 +22,45 @@ package metrics

// Workflow Creation metrics
const (
WorkflowsStartTotalCounter = "workflows-start-total"
ActivitiesTotalCounter = "activities-total"
DecisionsTotalCounter = "decisions-total"
DecisionsTimeoutCounter = "decisions-timeout"
WorkflowsCompletionTotalCounter = "workflows-completion-total"
WorkflowEndToEndLatency = "workflows-endtoend-latency"
ActivityEndToEndLatency = "activities-endtoend-latency"
DecisionsEndToEndLatency = "decisions-endtoend-latency"
ActivityPollLatency = "activities-poll-latency"
DecisionsPollLatency = "decisions-poll-latency"
ActivityExecutionLatency = "activities-execution-latency"
DecisionsExecutionLatency = "decisions-execution-latency"
ActivityResponseLatency = "activities-response-latency"
DecisionsResponseLatency = "decisions-response-latency"
UnhandledSignalsTotalCounter = "unhandled-signals-total"
WorkflowStartCounter = "workflow-start"
WorkflowCompletedCounter = "workflow-completed"
WorkflowCanceledCounter = "workflow-canceled"
WorkflowFailedCounter = "workflow-failed"
WorkflowContinueAsNewCounter = "workflow-continue-as-new"
WorkflowEndToEndLatency = "workflow-endtoend-latency" // measure workflow execution from start to close
WorkflowGetHistoryCounter = "workflow-get-history-total"
WorkflowGetHistoryFailedCounter = "workflow-get-history-failed"
WorkflowGetHistorySucceedCounter = "workflow-get-history-succeed"
WorkflowGetHistoryLatency = "workflow-get-history-latency"
DecisionTimeoutCounter = "decision-timeout"

DecisionPollCounter = "decision-poll-total"
DecisionPollFailedCounter = "decision-poll-failed"
DecisionPollNoTaskCounter = "decision-poll-no-task"
DecisionPollSucceedCounter = "decision-poll-succeed"
DecisionPollLatency = "decision-poll-latency" // measure succeed poll request latency
DecisionExecutionFailedCounter = "decision-execution-failed"
DecisionExecutionLatency = "decision-execution-latency"
DecisionResponseFailedCounter = "decision-response-failed"
DecisionResponseLatency = "decision-response-latency"
DecisionEndToEndLatency = "decision-endtoend-latency" // measure from poll request start to response completed
DecisionTaskPanicCounter = "decision-task-panic"
DecisionTaskCompletedCounter = "decision-task-completed"

ActivityPollCounter = "activity-poll-total"
ActivityPollFailedCounter = "activity-poll-failed"
ActivityPollNoTaskCounter = "activity-poll-no-task"
ActivityPollSucceedCounter = "activity-poll-succeed"
ActivityPollLatency = "activity-poll-latency"
ActivityExecutionFailedCounter = "activity-execution-failed"
ActivityExecutionLatency = "activity-execution-latency"
ActivityResponseLatency = "activity-response-latency"
ActivityResponseFailedCounter = "activity-response-failed"
ActivityEndToEndLatency = "activity-endtoend-latency"
ActivityTaskPanicCounter = "activity-task-panic"
ActivityTaskCompletedCounter = "activity-task-completed"
ActivityTaskFailedCounter = "activity-task-failed"
ActivityTaskCanceledCounter = "activity-task-canceled"

UnhandledSignalsCounter = "unhandled-signals"
)
1 change: 0 additions & 1 deletion internal_public.go
Expand Up @@ -118,7 +118,6 @@ func NewWorkflowTaskHandler(domain string, identity string, logger *zap.Logger)
Identity: identity,
Logger: logger,
}
ensureRequiredParams(&params)
return newWorkflowTaskHandler(
getWorkflowDefinitionFactory(newRegisteredWorkflowFactory()),
domain,
Expand Down
61 changes: 31 additions & 30 deletions internal_task_handlers.go
Expand Up @@ -61,11 +61,13 @@ type (
workflowTask struct {
task *s.PollForDecisionTaskResponse
getHistoryPageFunc GetHistoryPage
pollStartTime time.Time
}

// activityTask wraps a activity task.
activityTask struct {
task *s.PollForActivityTaskResponse
task *s.PollForActivityTaskResponse
pollStartTime time.Time
}
)

Expand Down Expand Up @@ -117,13 +119,13 @@ func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandler
return result
}

// Get workflow start attributes.
func (eh *history) GetWorkflowStartedAttr() (*s.WorkflowExecutionStartedEventAttributes, error) {
// Get workflow start event.
func (eh *history) GetWorkflowStartedEvent() (*s.HistoryEvent, error) {
events := eh.workflowTask.task.History.Events
if len(events) == 0 || events[0].GetEventType() != s.EventType_WorkflowExecutionStarted {
return nil, errors.New("unable to find WorkflowExecutionStartedEventAttributes in the history")
}
return events[0].WorkflowExecutionStartedEventAttributes, nil
return events[0], nil
}

// Get last non replayed event ID.
Expand Down Expand Up @@ -292,6 +294,7 @@ OrderEvents:
// newWorkflowTaskHandler returns an implementation of workflow task handler.
func newWorkflowTaskHandler(factory workflowDefinitionFactory, domain string,
params workerExecutionParameters, ppMgr pressurePointMgr) WorkflowTaskHandler {
ensureRequiredParams(&params)
return &workflowTaskHandlerImpl{
workflowDefFactory: factory,
domain: domain,
Expand All @@ -309,14 +312,6 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
getHistoryPage GetHistoryPage,
emitStack bool,
) (result *s.RespondDecisionTaskCompletedRequest, stackTrace string, err error) {
currentTime := time.Now()
defer func() {
deltaTime := time.Now().Sub(currentTime)
if wth.metricsScope != nil {
wth.metricsScope.Timer(metrics.DecisionsExecutionLatency).Record(deltaTime)
}
}()

if task == nil {
return nil, "", errors.New("nil workflowtask provided")
}
Expand Down Expand Up @@ -376,8 +371,6 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
replayDecisions := []*s.Decision{}
respondEvents := []*s.HistoryEvent{}

startTime := time.Now()

// Process events
ProcessEvents:
for {
Expand Down Expand Up @@ -438,25 +431,40 @@ ProcessEvents:
return nil, "", err
}

startAttributes, err := reorderedHistory.GetWorkflowStartedAttr()
startEvent, err := reorderedHistory.GetWorkflowStartedEvent()
if err != nil {
wth.logger.Error("Unable to read workflow start attributes.", zap.Error(err))
return nil, "", err
}

if _, ok := failure.(*PanicError); ok {
if panicErr, ok := failure.(*PanicError); ok {
// Timeout the Decision instead of failing workflow.
// TODO: Pump this stack trace on to workflow history for debuggability by exposing decision type fail to client.
wth.metricsScope.Counter(metrics.DecisionTaskPanicCounter).Inc(1)
wth.logger.Error("Workflow panic.",
zap.String("PanicError", panicErr.Error()),
zap.String("PanicStack", panicErr.StackTrace()))

return nil, "", failure
}

startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
closeDecision := wth.completeWorkflow(isWorkflowCompleted, completionResult, failure, startAttributes)
if closeDecision != nil {
decisions = append(decisions, closeDecision)
if wth.metricsScope != nil {
wth.metricsScope.Counter(metrics.WorkflowsCompletionTotalCounter).Inc(1)
elapsed := time.Now().Sub(startTime)
wth.metricsScope.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)

wfStartTime := time.Unix(0, startEvent.GetTimestamp())
elapsed := time.Now().Sub(wfStartTime)
wth.metricsScope.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)

switch closeDecision.GetDecisionType() {
case s.DecisionType_CompleteWorkflowExecution:
wth.metricsScope.Counter(metrics.WorkflowCompletedCounter).Inc(1)
case s.DecisionType_FailWorkflowExecution:
wth.metricsScope.Counter(metrics.WorkflowFailedCounter).Inc(1)
case s.DecisionType_CancelWorkflowExecution:
wth.metricsScope.Counter(metrics.WorkflowCanceledCounter).Inc(1)
case s.DecisionType_ContinueAsNewWorkflowExecution:
wth.metricsScope.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
}
}

Expand Down Expand Up @@ -757,7 +765,7 @@ func (wth *workflowTaskHandlerImpl) reportAnyMetrics(event *s.HistoryEvent, isIn
if wth.metricsScope != nil && !isInReplay {
switch event.GetEventType() {
case s.EventType_DecisionTaskTimedOut:
wth.metricsScope.Counter(metrics.DecisionsTimeoutCounter).Inc(1)
wth.metricsScope.Counter(metrics.DecisionTimeoutCounter).Inc(1)
}
}
}
Expand Down Expand Up @@ -900,14 +908,6 @@ func newServiceInvoker(

// Execute executes an implementation of the activity.
func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (result interface{}, err error) {
startTime := time.Now()
defer func() {
deltaTime := time.Now().Sub(startTime)
if ath.metricsScope != nil {
ath.metricsScope.Timer(metrics.ActivityExecutionLatency).Record(deltaTime)
}
}()

traceLog(func() {
ath.logger.Debug("Processing new activity task",
zap.String(tagWorkflowID, t.GetWorkflowExecution().GetWorkflowId()),
Expand Down Expand Up @@ -938,6 +938,7 @@ func (ath *activityTaskHandlerImpl) Execute(t *s.PollForActivityTaskResponse) (r
ath.logger.Error("Activity panic.",
zap.String("PanicError", fmt.Sprintf("%v", p)),
zap.String("PanicStack", st))
ath.metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
panicErr := newPanicError(p, st)
result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr), nil
}
Expand Down

0 comments on commit 6e592f3

Please sign in to comment.