Skip to content

Commit

Permalink
Add JitterStart support to client
Browse files Browse the repository at this point in the history
  • Loading branch information
ZackLK committed Jul 14, 2022
1 parent 08bc387 commit 7a8ab81
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 14 deletions.
336 changes: 326 additions & 10 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -16,7 +16,7 @@ require (
github.com/stretchr/testify v1.4.0
github.com/twmb/murmur3 v1.1.6 // indirect
github.com/uber-go/tally/v4 v4.1.1
github.com/uber/cadence-idl v0.0.0-20220223020740-f2f5b7fc2bbd
github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/tchannel-go v1.16.0
go.uber.org/atomic v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -194,8 +194,8 @@ github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9P
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber-go/tally/v4 v4.1.1 h1:jhy6WOZp4nHyCqeV43x3Wz370LXUGBhgW2JmzOIHCWI=
github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/uwCIf/vM=
github.com/uber/cadence-idl v0.0.0-20220223020740-f2f5b7fc2bbd h1:KGLWWhlM4I1++coccaIqGJyC4x8Lyof+42lPZP05aaI=
github.com/uber/cadence-idl v0.0.0-20220223020740-f2f5b7fc2bbd/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e h1:N5yzwaoEzjQ7YlXU0UDJeOBrTt/EGLsrqmtu1YSbvdk=
github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated 62 files
+6 −3 Makefile
+1,915 −0 go/proto/admin/v1/cluster.pb.go
+63 −0 go/proto/admin/v1/cluster.pb.yarpc.go
+620 −0 go/proto/admin/v1/history.pb.go
+47 −0 go/proto/admin/v1/history.pb.yarpc.go
+5,972 −0 go/proto/admin/v1/queue.pb.go
+673 −0 go/proto/admin/v1/queue.pb.yarpc.go
+5,004 −0 go/proto/admin/v1/replication.pb.go
+295 −0 go/proto/admin/v1/replication.pb.yarpc.go
+15,013 −0 go/proto/admin/v1/service.pb.go
+2,685 −0 go/proto/admin/v1/service.pb.yarpc.go
+54 −54 go/proto/api/v1/common.pb.go
+52 −51 go/proto/api/v1/common.pb.yarpc.go
+161 −99 go/proto/api/v1/decision.pb.go
+341 −338 go/proto/api/v1/decision.pb.yarpc.go
+5 −5 go/proto/api/v1/domain.pb.go
+58 −58 go/proto/api/v1/domain.pb.yarpc.go
+163 −31 go/proto/api/v1/error.pb.go
+30 −29 go/proto/api/v1/error.pb.yarpc.go
+290 −227 go/proto/api/v1/history.pb.go
+468 −466 go/proto/api/v1/history.pb.yarpc.go
+36 −36 go/proto/api/v1/query.pb.go
+276 −275 go/proto/api/v1/query.pb.yarpc.go
+66 −66 go/proto/api/v1/service_domain.pb.go
+122 −122 go/proto/api/v1/service_domain.pb.yarpc.go
+7 −7 go/proto/api/v1/service_meta.pb.go
+15 −15 go/proto/api/v1/service_meta.pb.yarpc.go
+53 −53 go/proto/api/v1/service_visibility.pb.go
+320 −318 go/proto/api/v1/service_visibility.pb.yarpc.go
+125 −125 go/proto/api/v1/service_worker.pb.go
+725 −721 go/proto/api/v1/service_worker.pb.yarpc.go
+200 −137 go/proto/api/v1/service_workflow.pb.go
+639 −636 go/proto/api/v1/service_workflow.pb.yarpc.go
+36 −36 go/proto/api/v1/tasklist.pb.go
+47 −47 go/proto/api/v1/tasklist.pb.yarpc.go
+30 −30 go/proto/api/v1/visibility.pb.go
+269 −268 go/proto/api/v1/visibility.pb.yarpc.go
+6 −6 go/proto/api/v1/workflow.pb.go
+242 −241 go/proto/api/v1/workflow.pb.yarpc.go
+62 −0 proto/uber/cadence/admin/v1/cluster.proto
+37 −0 proto/uber/cadence/admin/v1/history.proto
+182 −0 proto/uber/cadence/admin/v1/queue.proto
+161 −0 proto/uber/cadence/admin/v1/replication.proto
+437 −0 proto/uber/cadence/admin/v1/service.proto
+1 −1 proto/uber/cadence/api/v1/common.proto
+2 −1 proto/uber/cadence/api/v1/decision.proto
+1 −1 proto/uber/cadence/api/v1/domain.proto
+4 −1 proto/uber/cadence/api/v1/error.proto
+2 −1 proto/uber/cadence/api/v1/history.proto
+1 −1 proto/uber/cadence/api/v1/query.proto
+1 −1 proto/uber/cadence/api/v1/service_domain.proto
+1 −1 proto/uber/cadence/api/v1/service_meta.proto
+1 −1 proto/uber/cadence/api/v1/service_visibility.proto
+1 −1 proto/uber/cadence/api/v1/service_worker.proto
+2 −1 proto/uber/cadence/api/v1/service_workflow.proto
+1 −1 proto/uber/cadence/api/v1/tasklist.proto
+1 −1 proto/uber/cadence/api/v1/visibility.proto
+1 −1 proto/uber/cadence/api/v1/workflow.proto
+37 −1 thrift/admin.thrift
+8 −0 thrift/indexer.thrift
+12 −0 thrift/matching.thrift
+8 −0 thrift/shared.thrift
5 changes: 5 additions & 0 deletions internal/client.go
Expand Up @@ -413,6 +413,11 @@ type (
// The resolution is seconds.
// Optional: defaulted to 0 seconds
DelayStart time.Duration

// JitterStart - Seconds to jitter the workflow start. For example, if set to 10, the workflow will start some time between 0-10 seconds.
// This works with CronSchedule and with DelayStart.
// Optional: defaulted to 0 seconds
JitterStart time.Duration
}

// RetryPolicy defines the retry policy.
Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/proto/decision.go
Expand Up @@ -140,6 +140,7 @@ func Decision(d *shared.Decision) *apiv1.Decision {
Header: Header(attr.Header),
Memo: Memo(attr.Memo),
SearchAttributes: SearchAttributes(attr.SearchAttributes),
JitterStart: secondsToDuration(attr.JitterStartSeconds),
},
}
case shared.DecisionTypeStartChildWorkflowExecution:
Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/proto/history.go
Expand Up @@ -540,6 +540,7 @@ func StartChildWorkflowExecutionInitiatedEventAttributes(t *shared.StartChildWor
Memo: Memo(t.Memo),
SearchAttributes: SearchAttributes(t.SearchAttributes),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/compatibility/proto/request.go
Expand Up @@ -408,6 +408,7 @@ func SignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflowEx
SearchAttributes: SearchAttributes(t.SearchAttributes),
Header: Header(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
},
SignalName: t.GetSignalName(),
SignalInput: Payload(t.SignalInput),
Expand Down Expand Up @@ -451,6 +452,7 @@ func StartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *api
SearchAttributes: SearchAttributes(t.SearchAttributes),
Header: Header(t.Header),
DelayStart: secondsToDuration(t.DelayStartSeconds),
JitterStart: secondsToDuration(t.JitterStartSeconds),
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/testdata/decision.go
Expand Up @@ -105,6 +105,7 @@ var (
Header: &Header,
Memo: &Memo,
SearchAttributes: &SearchAttributes,
JitterStart: Duration4,
}
FailWorkflowExecutionDecisionAttributes = apiv1.FailWorkflowExecutionDecisionAttributes{
Failure: &Failure,
Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/thrift/decision.go
Expand Up @@ -134,6 +134,7 @@ func Decision(d *apiv1.Decision) *shared.Decision {
Header: Header(a.Header),
Memo: Memo(a.Memo),
SearchAttributes: SearchAttributes(a.SearchAttributes),
JitterStartSeconds: durationToSeconds(a.JitterStart),
}
case *apiv1.Decision_StartChildWorkflowExecutionDecisionAttributes:
decision.DecisionType = shared.DecisionTypeStartChildWorkflowExecution.Ptr()
Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/thrift/history.go
Expand Up @@ -589,6 +589,7 @@ func StartChildWorkflowExecutionInitiatedEventAttributes(t *apiv1.StartChildWork
Memo: Memo(t.Memo),
SearchAttributes: SearchAttributes(t.SearchAttributes),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/thrift/request.go
Expand Up @@ -461,6 +461,7 @@ func StartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *shar
SearchAttributes: SearchAttributes(t.SearchAttributes),
Header: Header(t.Header),
DelayStartSeconds: durationToSeconds(t.DelayStart),
JitterStartSeconds: durationToSeconds(t.JitterStart),
}
}

Expand Down
12 changes: 12 additions & 0 deletions internal/internal_workflow_client.go
Expand Up @@ -196,6 +196,11 @@ func (wc *workflowClient) StartWorkflow(
return nil, errors.New("Invalid DelayStart option")
}

jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds())
if jitterStartSeconds < 0 {
return nil, errors.New("Invalid JitterStart option")
}

// create a workflow start span and attach it to the context object.
// N.B. we need to finish this immediately as jaeger does not give us a way
// to recreate a span given a span context - which means we will run into
Expand Down Expand Up @@ -227,6 +232,7 @@ func (wc *workflowClient) StartWorkflow(
SearchAttributes: searchAttr,
Header: header,
DelayStartSeconds: common.Int32Ptr(delayStartSeconds),
JitterStartSeconds: common.Int32Ptr(jitterStartSeconds),
}

var response *s.StartWorkflowExecutionResponse
Expand Down Expand Up @@ -383,6 +389,11 @@ func (wc *workflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI
return nil, errors.New("Invalid DelayStart option")
}

jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds())
if jitterStartSeconds < 0 {
return nil, errors.New("Invalid JitterStart option")
}

// create a workflow start span and attach it to the context object. finish it immediately
ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("SignalWithStartWorkflow-%s", workflowType.Name), workflowID)
span.Finish()
Expand All @@ -409,6 +420,7 @@ func (wc *workflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI
WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toThriftPtr(),
Header: header,
DelayStartSeconds: common.Int32Ptr(delayStartSeconds),
JitterStartSeconds: common.Int32Ptr(jitterStartSeconds),
}

var response *s.StartWorkflowExecutionResponse
Expand Down

0 comments on commit 7a8ab81

Please sign in to comment.