Skip to content

Commit

Permalink
Move history API logic into its own package (#2778)
Browse files Browse the repository at this point in the history
* Move workflow creation logic util into api package
* Simplify history engine impl
  • Loading branch information
wxing1292 committed Apr 28, 2022
1 parent 5540505 commit 130d36e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 102 deletions.
128 changes: 128 additions & 0 deletions service/history/api/start_workflow_uti.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package api

import (
commonpb "go.temporal.io/api/common/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func NewWorkflowWithSignal(
shard shard.Context,
namespaceEntry *namespace.Namespace,
execution commonpb.WorkflowExecution,
startRequest *historyservice.StartWorkflowExecutionRequest,
signalWithStartRequest *workflowservice.SignalWithStartWorkflowExecutionRequest,
) (workflow.Context, workflow.MutableState, error) {
newMutableState, err := CreateMutableState(shard, namespaceEntry, execution.GetRunId())
if err != nil {
return nil, nil, err
}

startEvent, err := newMutableState.AddWorkflowExecutionStartedEvent(
execution,
startRequest,
)
if err != nil {
return nil, nil, err
}

if signalWithStartRequest != nil {
if signalWithStartRequest.GetRequestId() != "" {
newMutableState.AddSignalRequested(signalWithStartRequest.GetRequestId())
}
if _, err := newMutableState.AddWorkflowExecutionSignaled(
signalWithStartRequest.GetSignalName(),
signalWithStartRequest.GetSignalInput(),
signalWithStartRequest.GetIdentity(),
signalWithStartRequest.GetHeader(),
); err != nil {
return nil, nil, err
}
}

// Generate first workflow task event if not child WF and no first workflow task backoff
if err := GenerateFirstWorkflowTask(
newMutableState,
startRequest.ParentExecutionInfo,
startEvent,
); err != nil {
return nil, nil, err
}

newWorkflowContext := workflow.NewContext(
shard,
definition.NewWorkflowKey(
namespaceEntry.ID().String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
shard.GetLogger(),
)
return newWorkflowContext, newMutableState, nil
}

func CreateMutableState(
shard shard.Context,
namespaceEntry *namespace.Namespace,
runID string,
) (workflow.MutableState, error) {
newMutableState := workflow.NewMutableState(
shard,
shard.GetEventsCache(),
shard.GetLogger(),
namespaceEntry,
shard.GetTimeSource().Now(),
)
if err := newMutableState.SetHistoryTree(runID); err != nil {
return nil, err
}
return newMutableState, nil
}

func GenerateFirstWorkflowTask(
mutableState workflow.MutableState,
parentInfo *workflowspb.ParentExecutionInfo,
startEvent *historypb.HistoryEvent,
) error {

if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
if err := mutableState.AddFirstWorkflowTaskScheduled(
startEvent,
); err != nil {
return err
}
}
return nil
}
104 changes: 5 additions & 99 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/tasks"

Expand All @@ -58,7 +59,6 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/client/admin"
"go.temporal.io/server/client/history"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -464,46 +464,6 @@ func (e *historyEngineImpl) handleClusterMetadataUpdate(
}
}

func createMutableState(
shard shard.Context,
namespaceEntry *namespace.Namespace,
runID string,
) (workflow.MutableState, error) {

var newMutableState workflow.MutableState
// version history applies to both local and global namespace
newMutableState = workflow.NewMutableState(
shard,
shard.GetEventsCache(),
shard.GetLogger(),
namespaceEntry,
shard.GetTimeSource().Now(),
)

if err := newMutableState.SetHistoryTree(runID); err != nil {
return nil, err
}

return newMutableState, nil
}

func (e *historyEngineImpl) generateFirstWorkflowTask(
mutableState workflow.MutableState,
parentInfo *workflowspb.ParentExecutionInfo,
startEvent *historypb.HistoryEvent,
) error {

if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
if err := mutableState.AddFirstWorkflowTaskScheduled(
startEvent,
); err != nil {
return err
}
}
return nil
}

// StartWorkflowExecution starts a workflow execution
// Consistency guarantee: always write
func (e *historyEngineImpl) StartWorkflowExecution(
Expand Down Expand Up @@ -542,7 +502,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(
RunId: uuid.New(),
}

weContext, mutableState, err := e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, nil)
weContext, mutableState, err := api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -629,7 +589,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(
),
prevExecutionUpdateAction,
func() (workflow.Context, workflow.MutableState, error) {
return e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, nil)
return api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, nil)
},
)
switch err {
Expand Down Expand Up @@ -662,60 +622,6 @@ func (e *historyEngineImpl) StartWorkflowExecution(
}, nil
}

func (e *historyEngineImpl) newWorkflowWithSignal(
namespaceEntry *namespace.Namespace,
execution commonpb.WorkflowExecution,
startRequest *historyservice.StartWorkflowExecutionRequest,
signalWithStartRequest *workflowservice.SignalWithStartWorkflowExecutionRequest,
) (workflow.Context, workflow.MutableState, error) {
newMutableState, err := createMutableState(e.shard, namespaceEntry, execution.GetRunId())
if err != nil {
return nil, nil, err
}

startEvent, err := newMutableState.AddWorkflowExecutionStartedEvent(
execution,
startRequest,
)
if err != nil {
return nil, nil, err
}

if signalWithStartRequest != nil {
if signalWithStartRequest.GetRequestId() != "" {
newMutableState.AddSignalRequested(signalWithStartRequest.GetRequestId())
}
if _, err := newMutableState.AddWorkflowExecutionSignaled(
signalWithStartRequest.GetSignalName(),
signalWithStartRequest.GetSignalInput(),
signalWithStartRequest.GetIdentity(),
signalWithStartRequest.GetHeader(),
); err != nil {
return nil, nil, err
}
}

// Generate first workflow task event if not child WF and no first workflow task backoff
if err := e.generateFirstWorkflowTask(
newMutableState,
startRequest.ParentExecutionInfo,
startEvent,
); err != nil {
return nil, nil, err
}

newWorkflowContext := workflow.NewContext(
e.shard,
definition.NewWorkflowKey(
namespaceEntry.ID().String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
e.logger,
)
return newWorkflowContext, newMutableState, nil
}

// GetMutableState retrieves the mutable state of the workflow execution
func (e *historyEngineImpl) GetMutableState(
ctx context.Context,
Expand Down Expand Up @@ -2201,7 +2107,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
newWorkflowContext(prevContext, release, prevMutableState),
prevExecutionUpdateAction,
func() (workflow.Context, workflow.MutableState, error) {
return e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, sRequest)
return api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, sRequest)
},
)
switch err {
Expand All @@ -2219,7 +2125,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
}

now := e.timeSource.Now()
context, mutableState, err := e.newWorkflowWithSignal(namespaceEntry, execution, startRequest, sRequest)
context, mutableState, err := api.NewWorkflowWithSignal(e.shard, namespaceEntry, execution, startRequest, sRequest)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion service/history/timerQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -557,7 +558,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask(
}
startAttr := startEvent.GetWorkflowExecutionStartedEventAttributes()

newMutableState, err := createMutableState(
newMutableState, err := api.CreateMutableState(
t.shard,
mutableState.GetNamespaceEntry(),
newRunID,
Expand Down
5 changes: 3 additions & 2 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
Expand Down Expand Up @@ -1031,7 +1032,7 @@ func (handler *workflowTaskHandlerImpl) handleRetry(
}
startAttr := startEvent.GetWorkflowExecutionStartedEventAttributes()

newStateBuilder, err := createMutableState(
newStateBuilder, err := api.CreateMutableState(
handler.shard,
handler.mutableState.GetNamespaceEntry(),
newRunID,
Expand Down Expand Up @@ -1075,7 +1076,7 @@ func (handler *workflowTaskHandlerImpl) handleCron(
lastCompletionResult = startAttr.LastCompletionResult
}

newStateBuilder, err := createMutableState(
newStateBuilder, err := api.CreateMutableState(
handler.shard,
handler.mutableState.GetNamespaceEntry(),
newRunID,
Expand Down

0 comments on commit 130d36e

Please sign in to comment.