From f74966f1569450fea0aa192c1e74e5a1acc14586 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 16 May 2023 12:50:19 -0700 Subject: [PATCH 1/3] Increase default number of Elasticsearch bulk processor workers to 2 (#3738) --- service/history/configs/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 1eeb1bcff38..9eaad3b5088 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -514,7 +514,7 @@ func NewConfig( SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024), SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024), IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 100), - ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 1), + ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 2), // Should not be greater than number of visibility task queue workers VisibilityProcessorSchedulerWorkerCount (default 512) // Otherwise, visibility queue processors won't be able to fill up bulk with documents (even under heavy load) and bulk will flush due to interval, not number of actions. ESProcessorBulkActions: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkActions, 500), From b9a9029e761dbd06bcfb7424f7415700187443b0 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Tue, 16 May 2023 15:52:51 -0700 Subject: [PATCH 2/3] Use the correct event version for task refresh (#4349) * Use the correct event version for task refresh --- service/history/api/refreshworkflow/api.go | 1 - service/history/ndc/history_replicator.go | 2 +- service/history/ndc/state_rebuilder.go | 1 - service/history/workflow/task_refresher.go | 86 ++-------------------- 4 files changed, 6 insertions(+), 84 deletions(-) diff --git a/service/history/api/refreshworkflow/api.go b/service/history/api/refreshworkflow/api.go index f21cb33b2d4..a1440094c70 100644 --- a/service/history/api/refreshworkflow/api.go +++ b/service/history/api/refreshworkflow/api.go @@ -63,7 +63,6 @@ func Invoke( shard, shard.GetConfig(), shard.GetNamespaceRegistry(), - shard.GetEventsCache(), shard.GetLogger(), ) diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 5bbe7dadfe9..e8dc3e6aaa0 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -370,7 +370,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( return err } - taskRefresh := workflow.NewTaskRefresher(r.shard, r.shard.GetConfig(), r.namespaceRegistry, r.shard.GetEventsCache(), r.logger) + taskRefresh := workflow.NewTaskRefresher(r.shard, r.shard.GetConfig(), r.namespaceRegistry, r.logger) err = taskRefresh.RefreshTasks(ctx, mutableState) if err != nil { return err diff --git a/service/history/ndc/state_rebuilder.go b/service/history/ndc/state_rebuilder.go index 8fa5cb1155e..80b5a84176f 100644 --- a/service/history/ndc/state_rebuilder.go +++ b/service/history/ndc/state_rebuilder.go @@ -99,7 +99,6 @@ func NewStateRebuilder( shard, shard.GetConfig(), shard.GetNamespaceRegistry(), - shard.GetEventsCache(), logger, ), rebuiltHistorySize: 0, diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 43740360b04..6d0166502f4 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -38,7 +38,6 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" - "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" ) @@ -51,7 +50,6 @@ type ( shard shard.Context config *configs.Config namespaceRegistry namespace.Registry - eventsCache events.Cache logger log.Logger } ) @@ -60,7 +58,6 @@ func NewTaskRefresher( shard shard.Context, config *configs.Config, namespaceRegistry namespace.Registry, - eventsCache events.Cache, logger log.Logger, ) *TaskRefresherImpl { @@ -68,7 +65,6 @@ func NewTaskRefresher( shard: shard, config: config, namespaceRegistry: namespaceRegistry, - eventsCache: eventsCache, logger: logger, } } @@ -269,15 +265,8 @@ func (r *TaskRefresherImpl) refreshTasksForActivity( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingActivityInfos := mutableState.GetPendingActivityInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - Loop: for _, activityInfo := range pendingActivityInfos { // clear all activity timer task mask for later activity timer task re-generation @@ -294,18 +283,7 @@ Loop: continue Loop } - scheduleEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: activityInfo.ScheduledEventId, - Version: activityInfo.Version, - }, - activityInfo.ScheduledEventBatchId, - currentBranchToken, - ) + scheduleEvent, err := mutableState.GetActivityScheduledEvent(ctx, activityInfo.ScheduledEventId) if err != nil { return err } @@ -359,33 +337,14 @@ func (r *TaskRefresherImpl) refreshTasksForChildWorkflow( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingChildWorkflowInfos := mutableState.GetPendingChildExecutionInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - Loop: for _, childWorkflowInfo := range pendingChildWorkflowInfos { if childWorkflowInfo.StartedEventId != common.EmptyEventID { continue Loop } - - scheduleEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: childWorkflowInfo.InitiatedEventId, - Version: childWorkflowInfo.Version, - }, - childWorkflowInfo.InitiatedEventBatchId, - currentBranchToken, - ) + scheduleEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, childWorkflowInfo.InitiatedEventId) if err != nil { return err } @@ -406,28 +365,10 @@ func (r *TaskRefresherImpl) refreshTasksForRequestCancelExternalWorkflow( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingRequestCancelInfos := mutableState.GetPendingRequestCancelExternalInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - for _, requestCancelInfo := range pendingRequestCancelInfos { - initiateEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: requestCancelInfo.GetInitiatedEventId(), - Version: requestCancelInfo.GetVersion(), - }, - requestCancelInfo.GetInitiatedEventBatchId(), - currentBranchToken, - ) + initiateEvent, err := mutableState.GetRequesteCancelExternalInitiatedEvent(ctx, requestCancelInfo.GetInitiatedEventId()) if err != nil { return err } @@ -448,28 +389,11 @@ func (r *TaskRefresherImpl) refreshTasksForSignalExternalWorkflow( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingSignalInfos := mutableState.GetPendingSignalExternalInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - for _, signalInfo := range pendingSignalInfos { - initiateEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: signalInfo.GetInitiatedEventId(), - Version: signalInfo.GetVersion(), - }, - signalInfo.GetInitiatedEventBatchId(), - currentBranchToken, - ) + + initiateEvent, err := mutableState.GetSignalExternalInitiatedEvent(ctx, signalInfo.GetInitiatedEventId()) if err != nil { return err } From ab2f6b9b3757fae7cf3328056eddcf41920e3b1b Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Tue, 16 May 2023 23:03:30 -0400 Subject: [PATCH 3/3] Ensure workflow key is fully specified (#4351) --- service/history/api/updateworkflow/api.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index b79326e84ce..68dba676be0 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -117,6 +117,10 @@ func Invoke( return consts.ErrWorkflowCompleted } + // wfKey built from request may have blank RunID so assign a fully + // populated version + wfKey = ms.GetWorkflowKey() + if req.GetRequest().GetFirstExecutionRunId() != "" && ms.GetExecutionInfo().GetFirstExecutionRunId() != req.GetRequest().GetFirstExecutionRunId() { return consts.ErrWorkflowExecutionNotFound }