Skip to content

Commit

Permalink
Split history tree insert from history node append (#4147)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberthu committed Apr 13, 2023
1 parent 87dbab0 commit cdd1a21
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 143 deletions.
64 changes: 27 additions & 37 deletions common/persistence/cassandra/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,49 +85,39 @@ func NewHistoryStore(
}
}

func (h *HistoryStore) InsertHistoryTree(
ctx context.Context,
request *p.InternalInsertHistoryTreeRequest,
) error {
query := h.Session.Query(v2templateInsertTree,
request.BranchInfo.TreeId,
request.BranchInfo.BranchId,
request.TreeInfo.Data,
request.TreeInfo.EncodingType.String(),
).WithContext(ctx)
if err := query.Exec(); err != nil {
return convertTimeoutError(gocql.ConvertError("InsertHistoryTree", err))
}
return nil

}

// AppendHistoryNodes upsert a batch of events as a single node to a history branch
// Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (h *HistoryStore) AppendHistoryNodes(
ctx context.Context,
request *p.InternalAppendHistoryNodesRequest,
) error {
branchInfo := request.BranchInfo
node := request.Node

if !request.IsNewBranch {
query := h.Session.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
).WithContext(ctx)
if err := query.Exec(); err != nil {
return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err))
}
return nil
}

treeInfoDataBlob := request.TreeInfo
batch := h.Session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
branchInfo.TreeId,
branchInfo.BranchId,
treeInfoDataBlob.Data,
treeInfoDataBlob.EncodingType.String(),
)
batch.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
if err := h.Session.ExecuteBatch(batch); err != nil {
query := h.Session.Query(v2templateUpsertHistoryNode,
request.BranchInfo.TreeId,
request.BranchInfo.BranchId,
request.Node.NodeID,
request.Node.PrevTransactionID,
request.Node.TransactionID,
request.Node.Events.Data,
request.Node.Events.EncodingType.String(),
).WithContext(ctx)
if err := query.Exec(); err != nil {
return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err))
}
return nil
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/client/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,16 @@ func (e *FaultInjectionExecutionStore) RangeDeleteReplicationTaskFromDLQ(
return e.baseExecutionStore.RangeDeleteReplicationTaskFromDLQ(ctx, request)
}

func (e *FaultInjectionExecutionStore) InsertHistoryTree(
ctx context.Context,
request *persistence.InternalInsertHistoryTreeRequest,
) error {
if err := e.ErrorGenerator.Generate(); err != nil {
return err
}
return e.baseExecutionStore.InsertHistoryTree(ctx, request)
}

func (e *FaultInjectionExecutionStore) AppendHistoryNodes(
ctx context.Context,
request *persistence.InternalAppendHistoryNodesRequest,
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ type (
Msg string
}

// AppendHistoryTimeoutError represents a failed insert to history tree / node request
// InsertHistoryTimeoutError represents a failed insert to history tree request
InsertHistoryTimeoutError struct {
Msg string
}

// AppendHistoryTimeoutError represents a failed insert to history node request
AppendHistoryTimeoutError struct {
Msg string
}
Expand Down Expand Up @@ -898,7 +903,7 @@ type (
// A UUID of a tree
TreeID string
// Get data from this shard
ShardID *int32
ShardID int32
}

// HistoryBranchDetail contains detailed information of a branch
Expand Down Expand Up @@ -1147,6 +1152,10 @@ func (e *InvalidPersistenceRequestError) Error() string {
return e.Msg
}

func (e *InsertHistoryTimeoutError) Error() string {
return e.Msg
}

func (e *AppendHistoryTimeoutError) Error() string {
return e.Msg
}
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (m *executionManagerImpl) serializeWorkflowEvents(
request.Info = BuildHistoryGarbageCleanupInfo(workflowEvents.NamespaceID, workflowEvents.WorkflowID, workflowEvents.RunID)
}

return m.serializeAppendHistoryNodesRequest(ctx, request)
return m.serializeAppendHistoryNodesRequest(request)
}

func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport
Expand Down
90 changes: 51 additions & 39 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch(
// Get the entire history tree, so we know if any part of the target branch is referenced by other branches.
historyTreeResp, err := m.GetHistoryTree(ctx, &GetHistoryTreeRequest{
TreeID: branch.TreeId,
ShardID: &request.ShardID,
ShardID: request.ShardID,
})
if err != nil {
return err
Expand Down Expand Up @@ -369,8 +369,35 @@ func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataB
return treeInfo, nil
}

func (m *executionManagerImpl) serializeInsertHistoryTreeRequest(
shardID int32,
info string,
branchToken []byte,
) (*InternalInsertHistoryTreeRequest, error) {
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken)
if err != nil {
return nil, err
}

// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchToken: branchToken,
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}

return &InternalInsertHistoryTreeRequest{
BranchInfo: branch,
TreeInfo: treeInfoBlob,
ShardID: shardID,
}, nil
}

func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
ctx context.Context,
request *AppendHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
Expand Down Expand Up @@ -423,7 +450,6 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(

req := &InternalAppendHistoryNodesRequest{
BranchToken: request.BranchToken,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
Expand All @@ -435,20 +461,6 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
ShardID: request.ShardID,
}

if req.IsNewBranch {
// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchToken: request.BranchToken,
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: request.Info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
req.TreeInfo = treeInfoBlob
}

if nodeID < GetBeginNodeID(branch) {
return nil, &InvalidPersistenceRequestError{
Msg: "cannot append to ancestors' nodes",
Expand All @@ -459,7 +471,6 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
}

func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(
ctx context.Context,
request *AppendRawHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
Expand Down Expand Up @@ -491,7 +502,6 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(

req := &InternalAppendHistoryNodesRequest{
BranchToken: request.BranchToken,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
Expand All @@ -503,20 +513,6 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(
ShardID: request.ShardID,
}

if req.IsNewBranch {
// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchToken: request.BranchToken,
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: request.Info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
req.TreeInfo = treeInfoBlob
}

if nodeID < GetBeginNodeID(branch) {
return nil, &InvalidPersistenceRequestError{
Msg: "cannot append to ancestors' nodes",
Expand All @@ -532,16 +528,23 @@ func (m *executionManagerImpl) AppendHistoryNodes(
request *AppendHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {

req, err := m.serializeAppendHistoryNodesRequest(ctx, request)

nodeReq, err := m.serializeAppendHistoryNodesRequest(request)
if err != nil {
return nil, err
}

err = m.persistence.AppendHistoryNodes(ctx, req)
err = m.persistence.AppendHistoryNodes(ctx, nodeReq)
if err == nil && request.IsNewBranch {
var treeReq *InternalInsertHistoryTreeRequest
treeReq, err = m.serializeInsertHistoryTreeRequest(request.ShardID, request.Info, request.BranchToken)
if err == nil {
// Only insert history tree if first history node append succeeds
err = m.persistence.InsertHistoryTree(ctx, treeReq)
}
}

return &AppendHistoryNodesResponse{
Size: len(req.Node.Events.Data),
Size: len(nodeReq.Node.Events.Data),
}, err
}

Expand All @@ -551,12 +554,21 @@ func (m *executionManagerImpl) AppendRawHistoryNodes(
request *AppendRawHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {

req, err := m.serializeAppendRawHistoryNodesRequest(ctx, request)
nodeReq, err := m.serializeAppendRawHistoryNodesRequest(request)
if err != nil {
return nil, err
}

err = m.persistence.AppendHistoryNodes(ctx, req)
err = m.persistence.AppendHistoryNodes(ctx, nodeReq)
if err == nil && request.IsNewBranch {
var treeReq *InternalInsertHistoryTreeRequest
treeReq, err = m.serializeInsertHistoryTreeRequest(request.ShardID, request.Info, request.BranchToken)
if err == nil {
// Only insert history tree if first history node append succeeds
err = m.persistence.InsertHistoryTree(ctx, treeReq)
}
}

return &AppendHistoryNodesResponse{
Size: len(request.History.Data),
}, err
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/mock/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/convert"
p "go.temporal.io/server/common/persistence"
)

Expand Down Expand Up @@ -777,7 +776,7 @@ func (s *HistoryV2PersistenceSuite) deleteHistoryBranch(branch []byte) error {
func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*persistencespb.HistoryBranch {
resp, err := s.ExecutionManager.GetHistoryTree(s.ctx, &p.GetHistoryTreeRequest{
TreeID: treeID,
ShardID: convert.Int32Ptr(s.ShardInfo.GetShardId()),
ShardID: s.ShardInfo.GetShardId(),
})
s.Nil(err)
branches, err := s.toHistoryBranches(resp.BranchTokens)
Expand Down
14 changes: 10 additions & 4 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type (
// The below are history V2 APIs
// V2 regards history events growing as a tree, decoupled from workflow concepts

InsertHistoryTree(ctx context.Context, request *InternalInsertHistoryTreeRequest) error
// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error
// DeleteHistoryNodes delete a node from history node table
Expand Down Expand Up @@ -446,6 +447,15 @@ type (
ExecutionState *persistencespb.WorkflowExecutionState
}

InternalInsertHistoryTreeRequest struct {
// The branch to be appended
BranchInfo *persistencespb.HistoryBranch
// Serialized TreeInfo
TreeInfo *commonpb.DataBlob
// Used in sharded data stores to identify which shard to use
ShardID int32
}

// InternalHistoryNode represent a history node metadata
InternalHistoryNode struct {
// The first eventID becomes the nodeID to be appended
Expand All @@ -462,14 +472,10 @@ type (
InternalAppendHistoryNodesRequest struct {
// The raw branch token
BranchToken []byte
// True if it is the first append request to the branch
IsNewBranch bool
// The info for clean up data in background
Info string
// The branch to be appended
BranchInfo *persistencespb.HistoryBranch
// Serialized TreeInfo
TreeInfo *commonpb.DataBlob
// The history node
Node InternalHistoryNode
// Used in sharded data stores to identify which shard to use
Expand Down
Loading

0 comments on commit cdd1a21

Please sign in to comment.