Skip to content

Commit

Permalink
Add SignalExternalWorkflow Decision
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Jan 5, 2018
1 parent 060aa91 commit 4d5ebf0
Show file tree
Hide file tree
Showing 14 changed files with 1,915 additions and 112 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

1,746 changes: 1,650 additions & 96 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Expand Up @@ -79,6 +79,7 @@ enum DecisionType {
RecordMarker,
ContinueAsNewWorkflowExecution,
StartChildWorkflowExecution,
SignalExternalWorkflowExecution,
}

enum EventType {
Expand Down Expand Up @@ -109,6 +110,9 @@ enum EventType {
RequestCancelExternalWorkflowExecutionFailed,
ExternalWorkflowExecutionCancelRequested,
MarkerRecorded,
SignalExternalWorkflowExecutionInitiated,
SignalExternalWorkflowExecutionFailed,
ExternalWorkflowExecutionSignalRequested,
WorkflowExecutionSignaled,
WorkflowExecutionTerminated,
WorkflowExecutionContinuedAsNew,
Expand All @@ -132,6 +136,7 @@ enum DecisionTaskFailedCause {
BAD_COMPLETE_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_FAIL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_CANCEL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_CONTINUE_AS_NEW_ATTRIBUTES,
START_TIMER_DUPLICATE_ID,
Expand All @@ -143,6 +148,10 @@ enum CancelExternalWorkflowExecutionFailedCause {
UNKNOWN_EXTERNAL_WORKFLOW_EXECUTION,
}

enum SignalExternalWorkflowExecutionFailedCause {
UNKNOWN_EXTERNAL_WORKFLOW_EXECUTION,
}

enum ChildWorkflowExecutionFailedCause {
WORKFLOW_ALREADY_RUNNING,
}
Expand Down Expand Up @@ -254,6 +263,15 @@ struct RequestCancelExternalWorkflowExecutionDecisionAttributes {
40: optional binary control
}

struct SignalExternalWorkflowExecutionDecisionAttributes {
10: optional string domain
20: optional string workflowId
30: optional string runId
40: optional string signalName
50: optional binary input
60: optional binary control
}

struct RecordMarkerDecisionAttributes {
10: optional string markerName
20: optional binary details
Expand Down Expand Up @@ -292,6 +310,7 @@ struct Decision {
80: optional RecordMarkerDecisionAttributes recordMarkerDecisionAttributes
90: optional ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewWorkflowExecutionDecisionAttributes
100: optional StartChildWorkflowExecutionDecisionAttributes startChildWorkflowExecutionDecisionAttributes
110: optional SignalExternalWorkflowExecutionDecisionAttributes signalExternalWorkflowExecutionDecisionAttributes
}

struct WorkflowExecutionStartedEventAttributes {
Expand Down Expand Up @@ -498,6 +517,31 @@ struct ExternalWorkflowExecutionCancelRequestedEventAttributes {
30: optional WorkflowExecution workflowExecution
}

struct SignalExternalWorkflowExecutionInitiatedEventAttributes {
10: optional i64 (js.type = "Long") decisionTaskCompletedEventId
20: optional string domain
30: optional WorkflowExecution workflowExecution
40: optional string signalName
50: optional binary input
60: optional binary control
}

struct SignalExternalWorkflowExecutionFailedEventAttributes {
10: optional SignalExternalWorkflowExecutionFailedCause cause,
20: optional i64 (js.type = "Long") decisionTaskCompletedEventId
30: optional string domain
40: optional WorkflowExecution workflowExecution
50: optional i64 (js.type = "Long") initiatedEventId
60: optional binary control
}

struct ExternalWorkflowExecutionSignalRequestedEventAttributes {
10: optional i64 (js.type = "Long") initiatedEventId
20: optional string domain
30: optional WorkflowExecution workflowExecution
40: optional binary control
}

struct StartChildWorkflowExecutionInitiatedEventAttributes {
10: optional string domain
20: optional string workflowId
Expand Down Expand Up @@ -615,6 +659,9 @@ struct HistoryEvent {
390: optional ChildWorkflowExecutionCanceledEventAttributes childWorkflowExecutionCanceledEventAttributes
400: optional ChildWorkflowExecutionTimedOutEventAttributes childWorkflowExecutionTimedOutEventAttributes
410: optional ChildWorkflowExecutionTerminatedEventAttributes childWorkflowExecutionTerminatedEventAttributes
420: optional SignalExternalWorkflowExecutionInitiatedEventAttributes signalExternalWorkflowExecutionInitiatedEventAttributes
430: optional SignalExternalWorkflowExecutionFailedEventAttributes signalExternalWorkflowExecutionFailedEventAttributes
440: optional ExternalWorkflowExecutionSignalRequestedEventAttributes externalWorkflowExecutionSignalRequestedEventAttributes
}

struct History {
Expand Down Expand Up @@ -840,6 +887,14 @@ struct SignalWorkflowExecutionRequest {
30: optional string signalName
40: optional binary input
50: optional string identity
60: optional string requestId
}

struct DeleteWorkflowExecutionSignalRequest {
10: optional string domain
20: optional WorkflowExecution workflowExecution
30: optional string identity
40: optional string requestId
}

struct TerminateWorkflowExecutionRequest {
Expand Down
64 changes: 64 additions & 0 deletions internal/internal_decision_state_machine.go
Expand Up @@ -23,6 +23,7 @@ package internal
import (
"fmt"

"github.com/pborman/uuid"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/util"
Expand Down Expand Up @@ -92,6 +93,10 @@ type (
*naiveDecisionStateMachine
}

signalExternalWorkflowDecisionStateMachine struct {
*naiveDecisionStateMachine
}

// only possible state transition is: CREATED->SENT->COMPLETED
markerDecisionStateMachine struct {
*naiveDecisionStateMachine
Expand Down Expand Up @@ -124,6 +129,7 @@ const (
decisionTypeExternalWorkflow decisionType = 2
decisionTypeMarker decisionType = 3
decisionTypeTimer decisionType = 4
decisionTypeSignal decisionType = 5
)

const (
Expand Down Expand Up @@ -251,6 +257,14 @@ func newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWo
}
}

func newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine {
d := createNewDecision(s.DecisionTypeSignalExternalWorkflowExecution)
d.SignalExternalWorkflowExecutionDecisionAttributes = attributes
return &signalExternalWorkflowDecisionStateMachine{
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d),
}
}

func (d *decisionStateMachineBase) getState() decisionState {
return d.state
}
Expand Down Expand Up @@ -585,6 +599,24 @@ func (d *cancelExternalWorkflowDecisionStateMachine) handleCompletionEvent() {
}
}

func (d *signalExternalWorkflowDecisionStateMachine) handleInitiatedEvent() {
switch d.state {
case decisionStateDecisionSent:
d.moveState(decisionStateInitiated, eventInitiated)
default:
d.failStateTransition(eventInitiated)
}
}

func (d *signalExternalWorkflowDecisionStateMachine) handleCompletionEvent() {
switch d.state {
case decisionStateInitiated:
d.moveState(decisionStateCompleted, eventCompletion)
default:
d.failStateTransition(eventCompletion)
}
}

func (d *markerDecisionStateMachine) handleCompletionEvent() {
// Marker decision transit from SENT to COMPLETED on EventType_MarkerRecorded event
switch d.state {
Expand Down Expand Up @@ -788,6 +820,38 @@ func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionFailed(wor
}
}

func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, runID, signalName string, input []byte) decisionStateMachine {
signalID := uuid.New()
attributes := &s.SignalExternalWorkflowExecutionDecisionAttributes{
Domain: common.StringPtr(domain),
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
SignalName: common.StringPtr(signalName),
Input: input,
Control: []byte(signalID),
}
decision := newSignalExternalWorkflowStateMachine(attributes, signalID)
h.addDecision(decision)
return decision
}

func (h *decisionsHelper) handleSignalExternalWorkflowExecutionInitiated(signalID string) {
decision := h.getDecision(makeDecisionID(decisionTypeSignal, signalID))
decision.handleInitiatedEvent()
}

func (h *decisionsHelper) handleSignalExternalWorkflowExecutionRequested(signalID string) decisionStateMachine {
decision := h.getDecision(makeDecisionID(decisionTypeSignal, signalID))
decision.handleCompletionEvent()
return decision
}

func (h *decisionsHelper) handleSignalExternalWorkflowExecutionFailed(signalID string) decisionStateMachine {
decision := h.getDecision(makeDecisionID(decisionTypeSignal, signalID))
decision.handleCompletionEvent()
return decision
}

func (h *decisionsHelper) startTimer(attributes *s.StartTimerDecisionAttributes) decisionStateMachine {
decision := newTimerDecisionStateMachine(attributes)
h.addDecision(decision)
Expand Down
68 changes: 67 additions & 1 deletion internal/internal_event_handlers.go
Expand Up @@ -68,6 +68,11 @@ type (
handled bool
}

scheduledSignal struct {
callback resultHandler
handled bool
}

// workflowEnvironmentImpl an implementation of workflowEnvironment represents a environment for workflow execution.
workflowEnvironmentImpl struct {
workflowInfo *WorkflowInfo
Expand Down Expand Up @@ -172,6 +177,14 @@ func (s *scheduledChildWorkflow) handle(result []byte, err error) {
s.resultCallback(result, err)
}

func (s *scheduledSignal) handle(result []byte, err error) {
if s.handled {
panic(fmt.Sprintf("signal already handled %v", s))
}
s.handled = true
s.callback(result, err)
}

func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
return wc.workflowInfo
}
Expand Down Expand Up @@ -203,6 +216,22 @@ func (wc *workflowEnvironmentImpl) RequestCancelWorkflow(domainName, workflowID,
return nil
}

func (wc *workflowEnvironmentImpl) SignalExternalWorkflow(domainName, workflowID, runID, signalName string,
input []byte, callback resultHandler) {

if domainName == "" {
callback(nil, errors.New("domain is empty"))
return
}
if workflowID == "" {
callback(nil, errors.New("workflow ID is empty"))
return
}

decision := wc.decisionsHelper.signalExternalWorkflowExecution(domainName, workflowID, runID, signalName, input)
decision.setData(&scheduledSignal{callback: callback})
}

func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
wc.cancelHandler = handler
}
Expand Down Expand Up @@ -405,7 +434,7 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() ([]byte, error), callback
panic(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
sideEffectID, keys))
}
wc.logger.Debug("SideEffect returning already caclulated result.",
wc.logger.Debug("SideEffect returning already calculated result.",
zap.Int32(tagSideEffectID, sideEffectID))
details = result
} else {
Expand Down Expand Up @@ -528,6 +557,16 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case m.EventTypeWorkflowExecutionSignaled:
weh.handleWorkflowExecutionSignaled(event.WorkflowExecutionSignaledEventAttributes)

case m.EventTypeSignalExternalWorkflowExecutionInitiated:
weh.decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(
event.SignalExternalWorkflowExecutionInitiatedEventAttributes.WorkflowExecution.GetWorkflowId())

case m.EventTypeSignalExternalWorkflowExecutionFailed:
weh.handleSignalExternalWorkflowExecutionFailed(event)

case m.EventTypeExternalWorkflowExecutionSignalRequested:
weh.handleSignalExternalWorkflowExecutionRequested(event)

case m.EventTypeMarkerRecorded:
err = weh.handleMarkerRecorded(event.GetEventId(), event.MarkerRecordedEventAttributes)

Expand Down Expand Up @@ -818,3 +857,30 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTermin

return nil
}

func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionRequested(event *m.HistoryEvent) error {
attributes := event.ExternalWorkflowExecutionSignalRequestedEventAttributes
signalID := string(attributes.Control)
decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionRequested(signalID)
signal := decision.getData().(*scheduledSignal)
if signal.handled {
return nil
}
signal.handle(nil, nil)

return nil
}

func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *m.HistoryEvent) error {
attributes := event.SignalExternalWorkflowExecutionFailedEventAttributes
signalID := string(attributes.Control)
decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionFailed(signalID)
signal := decision.getData().(*scheduledSignal)
if signal.handled {
return nil
}
err := fmt.Errorf("signal external workflow failed, %v", attributes.GetCause())
signal.handle(nil, err)

return nil
}
4 changes: 2 additions & 2 deletions internal/internal_task_handlers.go
Expand Up @@ -535,7 +535,7 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(task *s.PollForDe
return
}

// ProcessWorkflowTask processes each all the events of the workflow task.
// ProcessWorkflowTask processes all the events of the workflow task.
func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
task *s.PollForDecisionTaskResponse,
historyIterator HistoryIterator,
Expand Down Expand Up @@ -572,7 +572,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
defer func() {
if err != nil || workflowClosed {
// TODO: in case of error, ideally, we should notify server to clear the stickiness.
// TODO: in case of closed, it asumes the close decision always succeed. need server side change to return
// TODO: in case of closed, it assumes the close decision always succeed. need server side change to return
// error to indicate the close failure case. This should be rear case. For now, always remove the cache, and
// if the close decision failed, the next decision will have to rebuild the state.
workflowContext.destroyCachedState()
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_worker.go
Expand Up @@ -691,7 +691,7 @@ func (th *hostEnvImpl) getWorkflowDefinition(wt WorkflowType) (workflowDefinitio
wf, ok := th.getWorkflowFn(lookup)
if !ok {
supported := strings.Join(th.getRegisteredWorkflowTypes(), ", ")
return nil, fmt.Errorf("Unable to find workflow type: %v. Supported types: [%v]", lookup, supported)
return nil, fmt.Errorf("unable to find workflow type: %v. Supported types: [%v]", lookup, supported)
}
wd := &workflowExecutor{name: lookup, fn: wf}
return newWorkflowDefinition(wd), nil
Expand Down Expand Up @@ -888,7 +888,7 @@ func (we *workflowExecutor) Execute(ctx Context, input []byte) ([]byte, error) {
decoded, err := getHostEnvironment().decodeArgs(fnType, input)
if err != nil {
return nil, fmt.Errorf(
"Unable to decode the workflow function input bytes with error: %v, function name: %v",
"unable to decode the workflow function input bytes with error: %v, function name: %v",
err, we.name)
}
args = append(args, decoded...)
Expand Down Expand Up @@ -929,7 +929,7 @@ func (ae *activityExecutor) Execute(ctx context.Context, input []byte) ([]byte,
decoded, err := getHostEnvironment().decodeArgs(fnType, input)
if err != nil {
return nil, fmt.Errorf(
"Unable to decode the activity function input bytes with error: %v for function name: %v",
"unable to decode the activity function input bytes with error: %v for function name: %v",
err, ae.name)
}
args = append(args, decoded...)
Expand Down
1 change: 1 addition & 0 deletions internal/internal_worker_base.go
Expand Up @@ -63,6 +63,7 @@ type (
GetLogger() *zap.Logger
GetMetricsScope() tally.Scope
RegisterSignalHandler(handler func(name string, input []byte))
SignalExternalWorkflow(domainName, workflowID, runID, signalName string, input []byte, callback resultHandler)
RegisterQueryHandler(handler func(queryType string, queryArgs []byte) ([]byte, error))
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow.go
Expand Up @@ -936,7 +936,7 @@ func (s *selectorImpl) Select(ctx Context) {
}
}

// NewWorkflowDefinition creates a WorkflowDefinition from a Workflow
// NewWorkflowDefinition creates a WorkflowDefinition from a Workflow
func newWorkflowDefinition(workflow workflow) workflowDefinition {
return &syncWorkflowDefinition{workflow: workflow}
}
Expand Down

0 comments on commit 4d5ebf0

Please sign in to comment.