Skip to content

Commit

Permalink
Use new history builder (#1434)
Browse files Browse the repository at this point in the history
* Wire mutable state with new history builder
* Remove old history builder implementation
  • Loading branch information
wxing1292 committed Apr 6, 2021
1 parent 5767244 commit b536952
Show file tree
Hide file tree
Showing 21 changed files with 567 additions and 3,592 deletions.
1,437 changes: 0 additions & 1,437 deletions service/history/historyBuilder.go

This file was deleted.

1,124 changes: 0 additions & 1,124 deletions service/history/historyBuilder_test.go

This file was deleted.

4 changes: 0 additions & 4 deletions service/history/historyEngine2_test.go
Expand Up @@ -858,10 +858,6 @@ func (s *engine2Suite) createExecutionStartedState(we commonpb.WorkflowExecution
return msBuilder
}

func (s *engine2Suite) printHistory(builder mutableState) string {
return builder.GetHistoryBuilder().GetHistory().String()
}

func (s *engine2Suite) TestRespondWorkflowTaskCompletedRecordMarkerCommand() {
namespaceID := testNamespaceID
we := commonpb.WorkflowExecution{
Expand Down
39 changes: 14 additions & 25 deletions service/history/historyEngine_test.go
Expand Up @@ -1137,7 +1137,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedConflictOnUpdate() {
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
s.NotNil(updatedWorkflowMutation)
s.Equal(int64(16), updatedWorkflowMutation.NextEventID)
s.Equal(workflowTaskStartedEvent2.EventId, updatedWorkflowMutation.ExecutionInfo.LastProcessedEvent)
Expand Down Expand Up @@ -1587,7 +1587,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSingleActivityScheduledAtt
})

if !iVar.expectWorkflowTaskFail {
s.NoError(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -1737,7 +1737,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSingleActivityScheduledWor
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -1933,7 +1933,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedCompleteWorkflowSuccess()
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -1986,7 +1986,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedFailWorkflowSuccess() {
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -2044,7 +2044,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSignalExternalWorkflowSucc
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -2100,7 +2100,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedStartChildWorkflowWithAban
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -2164,7 +2164,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedStartChildWorkflowWithTerm
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(6), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -2643,7 +2643,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() {
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(11), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -2750,7 +2750,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() {
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(9), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -2811,7 +2811,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() {
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(9), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -3186,7 +3186,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() {
Identity: identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
s.NoError(err)
executionBuilder := s.getBuilder(testNamespaceID, we)
s.Equal(int64(12), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
Expand Down Expand Up @@ -4979,10 +4979,6 @@ func (s *engineSuite) getActivityScheduledEvent(msBuilder mutableState,
return event
}

func (s *engineSuite) printHistory(builder mutableState) string {
return builder.GetHistoryBuilder().GetHistory().String()
}

func addWorkflowExecutionStartedEventWithParent(builder mutableState, workflowExecution commonpb.WorkflowExecution,
workflowType, taskQueue string, input *commonpb.Payloads, executionTimeout, runTimeout, taskTimeout time.Duration,
parentInfo *workflowspb.ParentExecutionInfo, identity string) *historypb.HistoryEvent {
Expand Down Expand Up @@ -5273,7 +5269,7 @@ func newMutableStateBuilderWithVersionHistoriesForTest(

func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState {
builder := ms.(*mutableStateBuilder)
_ = builder.FlushBufferedEvents() // nolint:errcheck
builder.FlushBufferedEvents()
info := copyWorkflowExecutionInfo(builder.executionInfo)
state := copyWorkflowExecutionState(builder.executionState)
info.ExecutionStats = &persistencespb.ExecutionStats{}
Expand Down Expand Up @@ -5301,13 +5297,6 @@ func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState {

// FlushBuffer will also be called within the CloseTransactionAsMutation
_, _, _ = builder.CloseTransactionAsMutation(time.Now().UTC(), transactionPolicyActive)
var bufferedEvents []*historypb.HistoryEvent
if len(builder.bufferedEvents) > 0 {
bufferedEvents = append(bufferedEvents, builder.bufferedEvents...)
}
if len(builder.updateBufferedEvents) > 0 {
bufferedEvents = append(bufferedEvents, builder.updateBufferedEvents...)
}
if builder.executionInfo.VersionHistories != nil {
info.VersionHistories = versionhistory.CopyVersionHistories(builder.executionInfo.VersionHistories)
}
Expand All @@ -5318,7 +5307,7 @@ func createMutableState(ms mutableState) *persistencespb.WorkflowMutableState {
NextEventId: builder.GetNextEventID(),
ActivityInfos: activityInfos,
TimerInfos: timerInfos,
BufferedEvents: bufferedEvents,
BufferedEvents: builder.bufferEventsInDB,
SignalInfos: signalInfos,
RequestCancelInfos: cancellationInfos,
ChildExecutionInfos: childInfos,
Expand Down
13 changes: 5 additions & 8 deletions service/history/mutableState.go
Expand Up @@ -43,6 +43,7 @@ import (
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/mutablestate"
)

type (
Expand Down Expand Up @@ -95,9 +96,9 @@ type (
AddExternalWorkflowExecutionSignaled(int64, string, string, string, string) (*historypb.HistoryEvent, error)
AddFailWorkflowEvent(int64, enumspb.RetryState, *commandpb.FailWorkflowExecutionCommandAttributes) (*historypb.HistoryEvent, error)
AddRecordMarkerEvent(int64, *commandpb.RecordMarkerCommandAttributes) (*historypb.HistoryEvent, error)
AddRequestCancelExternalWorkflowExecutionFailedEvent(int64, int64, string, string, string, enumspb.CancelExternalWorkflowExecutionFailedCause) (*historypb.HistoryEvent, error)
AddRequestCancelExternalWorkflowExecutionFailedEvent(int64, string, string, string, enumspb.CancelExternalWorkflowExecutionFailedCause) (*historypb.HistoryEvent, error)
AddRequestCancelExternalWorkflowExecutionInitiatedEvent(int64, string, *commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes) (*historypb.HistoryEvent, *persistencespb.RequestCancelInfo, error)
AddSignalExternalWorkflowExecutionFailedEvent(int64, int64, string, string, string, string, enumspb.SignalExternalWorkflowExecutionFailedCause) (*historypb.HistoryEvent, error)
AddSignalExternalWorkflowExecutionFailedEvent(int64, string, string, string, string, enumspb.SignalExternalWorkflowExecutionFailedCause) (*historypb.HistoryEvent, error)
AddSignalExternalWorkflowExecutionInitiatedEvent(int64, string, *commandpb.SignalExternalWorkflowExecutionCommandAttributes) (*historypb.HistoryEvent, *persistencespb.SignalInfo, error)
AddSignalRequested(requestID string)
AddStartChildWorkflowExecutionFailedEvent(int64, enumspb.StartChildWorkflowExecutionFailedCause, *historypb.StartChildWorkflowExecutionInitiatedEventAttributes) (*historypb.HistoryEvent, error)
Expand All @@ -116,13 +117,11 @@ type (
CheckResettable() error
CloneToProto() *persistencespb.WorkflowMutableState
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
CreateNewHistoryEvent(eventType enumspb.EventType) *historypb.HistoryEvent
CreateNewHistoryEventWithTime(eventType enumspb.EventType, time time.Time) *historypb.HistoryEvent
CreateTransientWorkflowTaskEvents(di *workflowTaskInfo, identity string) (*historypb.HistoryEvent, *historypb.HistoryEvent)
DeleteWorkflowTask()
DeleteSignalRequested(requestID string)
FailWorkflowTask(bool)
FlushBufferedEvents() error
FlushBufferedEvents()
GetActivityByActivityID(string) (*persistencespb.ActivityInfo, bool)
GetActivityInfo(int64) (*persistencespb.ActivityInfo, bool)
GetActivityInfoWithTimerHeartbeat(scheduleEventID int64) (*persistencespb.ActivityInfo, time.Time, bool)
Expand All @@ -137,7 +136,6 @@ type (
GetCurrentVersion() int64
GetExecutionInfo() *persistencespb.WorkflowExecutionInfo
GetExecutionState() *persistencespb.WorkflowExecutionState
GetHistoryBuilder() *historyBuilder
GetInFlightWorkflowTask() (*workflowTaskInfo, bool)
GetPendingWorkflowTask() (*workflowTaskInfo, bool)
GetLastFirstEventID() int64
Expand Down Expand Up @@ -214,9 +212,8 @@ type (
ReplicateWorkflowExecutionTerminatedEvent(int64, *historypb.HistoryEvent) error
ReplicateWorkflowExecutionTimedoutEvent(int64, *historypb.HistoryEvent) error
SetCurrentBranchToken(branchToken []byte) error
SetHistoryBuilder(hBuilder *historyBuilder)
SetHistoryBuilder(hBuilder *mutablestate.HistoryBuilder)
SetHistoryTree(treeID string) error
SetNextEventID(nextEventID int64)
UpdateActivity(*persistencespb.ActivityInfo) error
UpdateActivityWithTimerHeartbeat(*persistencespb.ActivityInfo, time.Time) error
UpdateActivityProgress(ai *persistencespb.ActivityInfo, request *workflowservice.RecordActivityTaskHeartbeatRequest)
Expand Down

0 comments on commit b536952

Please sign in to comment.