Skip to content

Commit

Permalink
Persistence Context Part 1: Execution Manager (#2622)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 21, 2022
1 parent b7100ae commit f4c0852
Show file tree
Hide file tree
Showing 110 changed files with 2,109 additions and 1,548 deletions.
3 changes: 2 additions & 1 deletion common/archiver/historyIterator.go
Expand Up @@ -28,6 +28,7 @@ package archiver

import (
"bytes"
"context"
"encoding/json"
"errors"

Expand Down Expand Up @@ -220,7 +221,7 @@ func (i *historyIterator) readHistory(firstEventID int64) ([]*historypb.History,
PageSize: i.historyPageSize,
ShardID: i.request.ShardID,
}
historyBatches, _, _, err := persistence.ReadFullPageEventsByBatch(i.executionManager, req)
historyBatches, _, _, err := persistence.ReadFullPageEventsByBatch(context.TODO(), i.executionManager, req)
return historyBatches, err
}

Expand Down
10 changes: 5 additions & 5 deletions common/archiver/historyIterator_test.go
Expand Up @@ -106,7 +106,7 @@ func (s *HistoryIteratorSuite) TearDownTest() {
}

func (s *HistoryIteratorSuite) TestReadHistory_Failed_EventsV2() {
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any()).Return(nil, errors.New("got error reading history branch"))
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), gomock.Any()).Return(nil, errors.New("got error reading history branch"))
itr := s.constructTestHistoryIterator(s.mockExecutionMgr, testDefaultTargetHistoryBlobSize, nil)
history, err := itr.readHistory(common.FirstEventID)
s.Error(err)
Expand All @@ -118,7 +118,7 @@ func (s *HistoryIteratorSuite) TestReadHistory_Success_EventsV2() {
History: []*historypb.History{},
NextPageToken: []byte{},
}
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any()).Return(&resp, nil)
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), gomock.Any()).Return(&resp, nil)
itr := s.constructTestHistoryIterator(s.mockExecutionMgr, testDefaultTargetHistoryBlobSize, nil)
history, err := itr.readHistory(common.FirstEventID)
s.NoError(err)
Expand Down Expand Up @@ -628,14 +628,14 @@ func (s *HistoryIteratorSuite) initMockExecutionManager(batchInfo []int, returnE
ShardID: testShardId,
}
if returnErrorOnPage == i {
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(req).Return(nil, errors.New("got error getting workflow execution history"))
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), req).Return(nil, errors.New("got error getting workflow execution history"))
return
}

resp := &persistence.ReadHistoryBranchByBatchResponse{
History: s.constructHistoryBatches(batchInfo, p, firstEventIDs[p.firstbatchIdx]),
}
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(req).Return(resp, nil).MaxTimes(2)
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), req).Return(resp, nil).MaxTimes(2)
}

if addNotExistCall {
Expand All @@ -646,7 +646,7 @@ func (s *HistoryIteratorSuite) initMockExecutionManager(batchInfo []int, returnE
PageSize: testDefaultPersistencePageSize,
ShardID: testShardId,
}
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(req).Return(nil, serviceerror.NewNotFound("Reach the end"))
s.mockExecutionMgr.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), req).Return(nil, serviceerror.NewNotFound("Reach the end"))
}
}

Expand Down
57 changes: 28 additions & 29 deletions common/persistence/dataInterfaces.go
Expand Up @@ -985,62 +985,61 @@ type (
Closeable
GetName() string

CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error)
ConflictResolveWorkflowExecution(request *ConflictResolveWorkflowExecutionRequest) (*ConflictResolveWorkflowExecutionResponse, error)
DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(request *DeleteCurrentWorkflowExecutionRequest) error
GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
SetWorkflowExecution(request *SetWorkflowExecutionRequest) (*SetWorkflowExecutionResponse, error)
CreateWorkflowExecution(ctx context.Context, request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error)
ConflictResolveWorkflowExecution(ctx context.Context, request *ConflictResolveWorkflowExecutionRequest) (*ConflictResolveWorkflowExecutionResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(ctx context.Context, request *DeleteCurrentWorkflowExecutionRequest) error
GetCurrentExecution(ctx context.Context, request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
GetWorkflowExecution(ctx context.Context, request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
SetWorkflowExecution(ctx context.Context, request *SetWorkflowExecutionRequest) (*SetWorkflowExecutionResponse, error)

// Scan operations

ListConcreteExecutions(request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error)
ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error)

// Tasks related APIs

AddHistoryTasks(request *AddHistoryTasksRequest) error
GetHistoryTask(request *GetHistoryTaskRequest) (*GetHistoryTaskResponse, error)
GetHistoryTasks(request *GetHistoryTasksRequest) (*GetHistoryTasksResponse, error)
CompleteHistoryTask(request *CompleteHistoryTaskRequest) error
RangeCompleteHistoryTasks(request *RangeCompleteHistoryTasksRequest) error
AddHistoryTasks(ctx context.Context, request *AddHistoryTasksRequest) error
GetHistoryTask(ctx context.Context, request *GetHistoryTaskRequest) (*GetHistoryTaskResponse, error)
GetHistoryTasks(ctx context.Context, request *GetHistoryTasksRequest) (*GetHistoryTasksResponse, error)
CompleteHistoryTask(ctx context.Context, request *CompleteHistoryTaskRequest) error
RangeCompleteHistoryTasks(ctx context.Context, request *RangeCompleteHistoryTasksRequest) error

PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetHistoryTasksResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*GetHistoryTasksResponse, error)
DeleteReplicationTaskFromDLQ(ctx context.Context, request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest) error

// The below are history V2 APIs
// V2 regards history events growing as a tree, decoupled from workflow concepts
// For Temporal, treeID is new runID, except for fork(reset), treeID will be the runID that it forks from.

// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
ReadHistoryBranchByBatch(request *ReadHistoryBranchRequest) (*ReadHistoryBranchByBatchResponse, error)
ReadHistoryBranchByBatch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchByBatchResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranchReverse(request *ReadHistoryBranchReverseRequest) (*ReadHistoryBranchReverseResponse, error)
ReadHistoryBranchReverse(ctx context.Context, request *ReadHistoryBranchReverseRequest) (*ReadHistoryBranchReverseResponse, error)
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
// NOTE: this API should only be used by 3+DC
ReadRawHistoryBranch(request *ReadHistoryBranchRequest) (*ReadRawHistoryBranchResponse, error)
ReadRawHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadRawHistoryBranchResponse, error)
// ForkHistoryBranch forks a new branch from a old branch
ForkHistoryBranch(request *ForkHistoryBranchRequest) (*ForkHistoryBranchResponse, error)
ForkHistoryBranch(ctx context.Context, request *ForkHistoryBranchRequest) (*ForkHistoryBranchResponse, error)
// DeleteHistoryBranch removes a branch
// If this is the last branch to delete, it will also remove the root node
DeleteHistoryBranch(request *DeleteHistoryBranchRequest) error
DeleteHistoryBranch(ctx context.Context, request *DeleteHistoryBranchRequest) error
// TrimHistoryBranch validate & trim a history branch
TrimHistoryBranch(request *TrimHistoryBranchRequest) (*TrimHistoryBranchResponse, error)
TrimHistoryBranch(ctx context.Context, request *TrimHistoryBranchRequest) (*TrimHistoryBranchResponse, error)
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error)
GetHistoryTree(ctx context.Context, request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
GetAllHistoryTreeBranches(ctx context.Context, request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
}

// TaskManager is used to manage tasks
// TODO: consider change the range for GetTasks and CompleteTasks to be [inclusive, exclusive)
TaskManager interface {
Closeable
GetName() string
Expand Down

0 comments on commit f4c0852

Please sign in to comment.