Skip to content

Commit

Permalink
Merge branch 'master' into mutable-state-size-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner committed May 17, 2023
2 parents 68d2f1f + ab2f6b9 commit 3041542
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 85 deletions.
1 change: 0 additions & 1 deletion service/history/api/refreshworkflow/api.go
Expand Up @@ -63,7 +63,6 @@ func Invoke(
shard,
shard.GetConfig(),
shard.GetNamespaceRegistry(),
shard.GetEventsCache(),
shard.GetLogger(),
)

Expand Down
4 changes: 4 additions & 0 deletions service/history/api/updateworkflow/api.go
Expand Up @@ -117,6 +117,10 @@ func Invoke(
return consts.ErrWorkflowCompleted
}

// wfKey built from request may have blank RunID so assign a fully
// populated version
wfKey = ms.GetWorkflowKey()

if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() {
return consts.ErrWorkflowExecutionNotFound
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/configs/config.go
Expand Up @@ -518,7 +518,7 @@ func NewConfig(
SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024),
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 100),
ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 1),
ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 2),
// Should not be greater than number of visibility task queue workers VisibilityProcessorSchedulerWorkerCount (default 512)
// Otherwise, visibility queue processors won't be able to fill up bulk with documents (even under heavy load) and bulk will flush due to interval, not number of actions.
ESProcessorBulkActions: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkActions, 500),
Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/history_replicator.go
Expand Up @@ -370,7 +370,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState(
return err
}

taskRefresh := workflow.NewTaskRefresher(r.shard, r.shard.GetConfig(), r.namespaceRegistry, r.shard.GetEventsCache(), r.logger)
taskRefresh := workflow.NewTaskRefresher(r.shard, r.shard.GetConfig(), r.namespaceRegistry, r.logger)
err = taskRefresh.RefreshTasks(ctx, mutableState)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion service/history/ndc/state_rebuilder.go
Expand Up @@ -99,7 +99,6 @@ func NewStateRebuilder(
shard,
shard.GetConfig(),
shard.GetNamespaceRegistry(),
shard.GetEventsCache(),
logger,
),
rebuiltHistorySize: 0,
Expand Down
86 changes: 5 additions & 81 deletions service/history/workflow/task_refresher.go
Expand Up @@ -38,7 +38,6 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/shard"
)

Expand All @@ -51,7 +50,6 @@ type (
shard shard.Context
config *configs.Config
namespaceRegistry namespace.Registry
eventsCache events.Cache
logger log.Logger
}
)
Expand All @@ -60,15 +58,13 @@ func NewTaskRefresher(
shard shard.Context,
config *configs.Config,
namespaceRegistry namespace.Registry,
eventsCache events.Cache,
logger log.Logger,
) *TaskRefresherImpl {

return &TaskRefresherImpl{
shard: shard,
config: config,
namespaceRegistry: namespaceRegistry,
eventsCache: eventsCache,
logger: logger,
}
}
Expand Down Expand Up @@ -269,15 +265,8 @@ func (r *TaskRefresherImpl) refreshTasksForActivity(
taskGenerator TaskGenerator,
) error {

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
pendingActivityInfos := mutableState.GetPendingActivityInfos()

currentBranchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

Loop:
for _, activityInfo := range pendingActivityInfos {
// clear all activity timer task mask for later activity timer task re-generation
Expand All @@ -294,18 +283,7 @@ Loop:
continue Loop
}

scheduleEvent, err := r.eventsCache.GetEvent(
ctx,
events.EventKey{
NamespaceID: namespace.ID(executionInfo.NamespaceId),
WorkflowID: executionInfo.WorkflowId,
RunID: executionState.RunId,
EventID: activityInfo.ScheduledEventId,
Version: activityInfo.Version,
},
activityInfo.ScheduledEventBatchId,
currentBranchToken,
)
scheduleEvent, err := mutableState.GetActivityScheduledEvent(ctx, activityInfo.ScheduledEventId)
if err != nil {
return err
}
Expand Down Expand Up @@ -359,33 +337,14 @@ func (r *TaskRefresherImpl) refreshTasksForChildWorkflow(
taskGenerator TaskGenerator,
) error {

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
pendingChildWorkflowInfos := mutableState.GetPendingChildExecutionInfos()

currentBranchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

Loop:
for _, childWorkflowInfo := range pendingChildWorkflowInfos {
if childWorkflowInfo.StartedEventId != common.EmptyEventID {
continue Loop
}

scheduleEvent, err := r.eventsCache.GetEvent(
ctx,
events.EventKey{
NamespaceID: namespace.ID(executionInfo.NamespaceId),
WorkflowID: executionInfo.WorkflowId,
RunID: executionState.RunId,
EventID: childWorkflowInfo.InitiatedEventId,
Version: childWorkflowInfo.Version,
},
childWorkflowInfo.InitiatedEventBatchId,
currentBranchToken,
)
scheduleEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, childWorkflowInfo.InitiatedEventId)
if err != nil {
return err
}
Expand All @@ -406,28 +365,10 @@ func (r *TaskRefresherImpl) refreshTasksForRequestCancelExternalWorkflow(
taskGenerator TaskGenerator,
) error {

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
pendingRequestCancelInfos := mutableState.GetPendingRequestCancelExternalInfos()

currentBranchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

for _, requestCancelInfo := range pendingRequestCancelInfos {
initiateEvent, err := r.eventsCache.GetEvent(
ctx,
events.EventKey{
NamespaceID: namespace.ID(executionInfo.NamespaceId),
WorkflowID: executionInfo.WorkflowId,
RunID: executionState.RunId,
EventID: requestCancelInfo.GetInitiatedEventId(),
Version: requestCancelInfo.GetVersion(),
},
requestCancelInfo.GetInitiatedEventBatchId(),
currentBranchToken,
)
initiateEvent, err := mutableState.GetRequesteCancelExternalInitiatedEvent(ctx, requestCancelInfo.GetInitiatedEventId())
if err != nil {
return err
}
Expand All @@ -448,28 +389,11 @@ func (r *TaskRefresherImpl) refreshTasksForSignalExternalWorkflow(
taskGenerator TaskGenerator,
) error {

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
pendingSignalInfos := mutableState.GetPendingSignalExternalInfos()

currentBranchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

for _, signalInfo := range pendingSignalInfos {
initiateEvent, err := r.eventsCache.GetEvent(
ctx,
events.EventKey{
NamespaceID: namespace.ID(executionInfo.NamespaceId),
WorkflowID: executionInfo.WorkflowId,
RunID: executionState.RunId,
EventID: signalInfo.GetInitiatedEventId(),
Version: signalInfo.GetVersion(),
},
signalInfo.GetInitiatedEventBatchId(),
currentBranchToken,
)

initiateEvent, err := mutableState.GetSignalExternalInitiatedEvent(ctx, signalInfo.GetInitiatedEventId())
if err != nil {
return err
}
Expand Down

0 comments on commit 3041542

Please sign in to comment.