Skip to content

Commit

Permalink
Decouple Client from worker part of SDK (#1100)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius committed Jun 4, 2021
1 parent 16d20b1 commit ca6e026
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 24 deletions.
3 changes: 2 additions & 1 deletion internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ type ServiceInvoker interface {
// without detail.
BackgroundHeartbeat() error
Close(flushBufferedHeartbeat bool)
GetClient(domain string, options *ClientOptions) Client

SignalWorkflow(ctx context.Context, domain, workflowID, runID, signalName string, signalInput []byte) error
}

// WithActivityTask adds activity specific information into context.
Expand Down
32 changes: 30 additions & 2 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,8 +1812,8 @@ func (i *cadenceInvoker) Close(flushBufferedHeartbeat bool) {
}
}

func (i *cadenceInvoker) GetClient(domain string, options *ClientOptions) Client {
return NewClient(i.service, domain, options)
func (i *cadenceInvoker) SignalWorkflow(ctx context.Context, domain, workflowID, runID, signalName string, signalInput []byte) error {
return signalWorkflow(ctx, i.service, i.identity, domain, workflowID, runID, signalName, signalInput)
}

func newServiceInvoker(
Expand Down Expand Up @@ -1968,6 +1968,34 @@ func createNewDecision(decisionType s.DecisionType) *s.Decision {
DecisionType: common.DecisionTypePtr(decisionType),
}
}
func signalWorkflow(
ctx context.Context,
service workflowserviceclient.Interface,
identity string,
domain string,
workflowID string,
runID string,
signalName string,
signalInput []byte,
) error {
request := &s.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domain),
WorkflowExecution: &s.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: getRunID(runID),
},
SignalName: common.StringPtr(signalName),
Input: signalInput,
Identity: common.StringPtr(identity),
}

return backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx)
defer cancel()
return service.SignalWorkflowExecution(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}

func recordActivityHeartbeat(
ctx context.Context,
Expand Down
19 changes: 1 addition & 18 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,24 +326,7 @@ func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string,
if err != nil {
return err
}

request := &s.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(wc.domain),
WorkflowExecution: &s.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: getRunID(runID),
},
SignalName: common.StringPtr(signalName),
Input: input,
Identity: common.StringPtr(wc.identity),
}

return backoff.Retry(ctx,
func() error {
tchCtx, cancel, opt := newChannelContext(ctx)
defer cancel()
return wc.workflowService.SignalWorkflowExecution(tchCtx, request, opt...)
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
return signalWorkflow(ctx, wc.workflowService, wc.identity, wc.domain, workflowID, runID, signalName, input)
}

// SignalWithStartWorkflow sends a signal to a running workflow.
Expand Down
17 changes: 14 additions & 3 deletions internal/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,20 @@ func (env *sessionEnvironmentImpl) AddSessionToken() {

func (env *sessionEnvironmentImpl) SignalCreationResponse(ctx context.Context, sessionID string) error {
activityEnv := getActivityEnv(ctx)
client := activityEnv.serviceInvoker.GetClient(activityEnv.workflowDomain, &ClientOptions{})
return client.SignalWorkflow(ctx, activityEnv.workflowExecution.ID, activityEnv.workflowExecution.RunID,
sessionID, env.getCreationResponse())

signalInput, err := encodeArg(getDefaultDataConverter(), env.getCreationResponse())
if err != nil {
return err
}

return activityEnv.serviceInvoker.SignalWorkflow(
ctx,
activityEnv.workflowDomain,
activityEnv.workflowExecution.ID,
activityEnv.workflowExecution.RunID,
sessionID,
signalInput,
)
}

func (env *sessionEnvironmentImpl) getCreationResponse() *sessionCreationResponse {
Expand Down

0 comments on commit ca6e026

Please sign in to comment.