Skip to content

Commit

Permalink
Update mutable state to generate workflow requests (#5821)
Browse files Browse the repository at this point in the history
What changed?
This PR is built on top of #5826.
In this PR, we generate workflow requests from external API requests and replication events and store them in database to detect duplicated requests. If a duplicated requests is detected, a DuplicateRequestError is returned by persistence layer with the run_id telling upper layer which run the request has been applied to. And when this error is returned, the API does no-op and return the run_id to the caller.

Why?
To improve idempotency of Cadence APIs

How did you test it?
unit tests

Potential risks
We have a feature flag to turn on/off this feature. And we'll rollout this feature at domain level.
  • Loading branch information
Shaddoll committed Apr 17, 2024
1 parent 6d8466c commit baac621
Show file tree
Hide file tree
Showing 37 changed files with 1,292 additions and 100 deletions.
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -2003,6 +2003,13 @@ const (
// Allowed filters: DomainName
EnableRetryForChecksumFailure

// EnableStrongIdempotency enables strong idempotency for APIs
// KeyName: history.enableStrongIdempotency
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableStrongIdempotency

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4310,6 +4317,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableRetryForChecksumFailure enables retry if mutable state checksum verification fails",
DefaultValue: false,
},
EnableStrongIdempotency: DynamicBool{
KeyName: "history.enableStrongIdempotency",
Filters: []Filter{DomainName},
Description: "EnableStrongIdempotency enables strong idempotency for APIs",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -123,6 +123,11 @@ func WorkflowSignalName(signalName string) Tag {
return newStringTag("wf-signal-name", signalName)
}

// WorkflowRequestID returns tag for WorkflowRequestID
func WorkflowRequestID(requestID string) Tag {
return newStringTag("wf-request-id", requestID)
}

// WorkflowState returns tag for WorkflowState
func WorkflowState(s int) Tag {
return newInt("wf-state", s)
Expand Down
3 changes: 3 additions & 0 deletions config/dynamicconfig/development.yaml
Expand Up @@ -8,6 +8,9 @@ history.EnableConsistentQueryByDomain:
- value: true
constraints: {}
history.useNewInitialFailoverVersion:
- value: true
constraints: {}
history.enableStrongIdempotency:
- value: true
constraints: {}
frontend.validSearchAttributes:
Expand Down
5 changes: 5 additions & 0 deletions service/history/config/config.go
Expand Up @@ -338,6 +338,8 @@ type Config struct {
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn

EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter

// HostName for machine running the service
HostName string
}
Expand Down Expand Up @@ -596,6 +598,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),

EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency),

HostName: hostname,
}

Expand Down Expand Up @@ -637,6 +641,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
}))
panicIfErr(inMem.UpdateValue(dynamicconfig.QueueProcessorRandomSplitProbability, 0.5))
panicIfErr(inMem.UpdateValue(dynamicconfig.ReplicationTaskFetcherEnableGracefulSyncShutdown, true))
panicIfErr(inMem.UpdateValue(dynamicconfig.EnableStrongIdempotency, true))

dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false, "")
Expand Down
36 changes: 36 additions & 0 deletions service/history/engine/engineimpl/historyEngine.go
Expand Up @@ -787,7 +787,17 @@ func (e *historyEngineImpl) startWorkflowHelper(
createMode,
prevRunID,
prevLastWriteVersion,
persistence.CreateWorkflowRequestModeNew,
)
if t, ok := err.(*persistence.DuplicateRequestError); ok {
if t.RequestType == persistence.WorkflowRequestTypeStart || (isSignalWithStart && t.RequestType == persistence.WorkflowRequestTypeSignal) {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
// handle already started error
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {

Expand Down Expand Up @@ -853,7 +863,17 @@ func (e *historyEngineImpl) startWorkflowHelper(
createMode,
prevRunID,
t.LastWriteVersion,
persistence.CreateWorkflowRequestModeNew,
)
if t, ok := err.(*persistence.DuplicateRequestError); ok {
if t.RequestType == persistence.WorkflowRequestTypeStart || (isSignalWithStart && t.RequestType == persistence.WorkflowRequestTypeSignal) {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2620,6 +2640,13 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil {
if t, ok := err.(*persistence.DuplicateRequestError); ok {
if t.RequestType == persistence.WorkflowRequestTypeSignal {
return &types.StartWorkflowExecutionResponse{RunID: t.RunID}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
if execution.IsConflictError(err) {
continue Just_Signal_Loop
}
Expand Down Expand Up @@ -3014,6 +3041,15 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
nil,
request.GetSkipSignalReapply(),
); err != nil {
if t, ok := err.(*persistence.DuplicateRequestError); ok {
if t.RequestType == persistence.WorkflowRequestTypeReset {
return &types.ResetWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
e.logger.Error("A bug is detected for idempotency improvement", tag.Dynamic("request-type", t.RequestType))
return nil, t
}
return nil, err
}
return &types.ResetWorkflowExecutionResponse{
Expand Down

0 comments on commit baac621

Please sign in to comment.