Skip to content

Commit

Permalink
Merge history tree insert with history node append
Browse files Browse the repository at this point in the history
This is primarily a revert from cdd1a21
  • Loading branch information
norberthu committed Jun 9, 2023
1 parent 543e698 commit 1709b3c
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 174 deletions.
64 changes: 37 additions & 27 deletions common/persistence/cassandra/history_store.go
Expand Up @@ -85,39 +85,49 @@ func NewHistoryStore(
}
}

func (h *HistoryStore) InsertHistoryTree(
ctx context.Context,
request *p.InternalInsertHistoryTreeRequest,
) error {
query := h.Session.Query(v2templateInsertTree,
request.BranchInfo.TreeId,
request.BranchInfo.BranchId,
request.TreeInfo.Data,
request.TreeInfo.EncodingType.String(),
).WithContext(ctx)
if err := query.Exec(); err != nil {
return convertTimeoutError(gocql.ConvertError("InsertHistoryTree", err))
}
return nil

}

// AppendHistoryNodes upsert a batch of events as a single node to a history branch
// Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (h *HistoryStore) AppendHistoryNodes(
ctx context.Context,
request *p.InternalAppendHistoryNodesRequest,
) error {
query := h.Session.Query(v2templateUpsertHistoryNode,
request.BranchInfo.TreeId,
request.BranchInfo.BranchId,
request.Node.NodeID,
request.Node.PrevTransactionID,
request.Node.TransactionID,
request.Node.Events.Data,
request.Node.Events.EncodingType.String(),
).WithContext(ctx)
if err := query.Exec(); err != nil {
branchInfo := request.BranchInfo
node := request.Node

if !request.IsNewBranch {
query := h.Session.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
).WithContext(ctx)
if err := query.Exec(); err != nil {
return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err))
}
return nil
}

treeInfoDataBlob := request.TreeInfo
batch := h.Session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
branchInfo.TreeId,
branchInfo.BranchId,
treeInfoDataBlob.Data,
treeInfoDataBlob.EncodingType.String(),
)
batch.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
if err := h.Session.ExecuteBatch(batch); err != nil {
return convertTimeoutError(gocql.ConvertError("AppendHistoryNodes", err))
}
return nil
Expand Down
10 changes: 0 additions & 10 deletions common/persistence/client/fault_injection.go
Expand Up @@ -655,16 +655,6 @@ func (e *FaultInjectionExecutionStore) IsReplicationDLQEmpty(
return e.baseExecutionStore.IsReplicationDLQEmpty(ctx, request)
}

func (e *FaultInjectionExecutionStore) InsertHistoryTree(
ctx context.Context,
request *persistence.InternalInsertHistoryTreeRequest,
) error {
if err := e.ErrorGenerator.Generate(); err != nil {
return err
}
return e.baseExecutionStore.InsertHistoryTree(ctx, request)
}

func (e *FaultInjectionExecutionStore) AppendHistoryNodes(
ctx context.Context,
request *persistence.InternalAppendHistoryNodesRequest,
Expand Down
11 changes: 1 addition & 10 deletions common/persistence/dataInterfaces.go
Expand Up @@ -114,12 +114,7 @@ type (
Msg string
}

// InsertHistoryTimeoutError represents a failed insert to history tree request
InsertHistoryTimeoutError struct {
Msg string
}

// AppendHistoryTimeoutError represents a failed insert to history node request
// AppendHistoryTimeoutError represents a failed insert to history tree / node request
AppendHistoryTimeoutError struct {
Msg string
}
Expand Down Expand Up @@ -1215,10 +1210,6 @@ func (e *InvalidPersistenceRequestError) Error() string {
return e.Msg
}

func (e *InsertHistoryTimeoutError) Error() string {
return e.Msg
}

func (e *AppendHistoryTimeoutError) Error() string {
return e.Msg
}
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/execution_manager.go
Expand Up @@ -458,7 +458,7 @@ func (m *executionManagerImpl) serializeWorkflowEvents(
request.Info = BuildHistoryGarbageCleanupInfo(workflowEvents.NamespaceID, workflowEvents.WorkflowID, workflowEvents.RunID)
}

return m.serializeAppendHistoryNodesRequest(request)
return m.serializeAppendHistoryNodesRequest(ctx, request)
}

func (m *executionManagerImpl) SerializeWorkflowMutation( // unexport
Expand Down
88 changes: 38 additions & 50 deletions common/persistence/history_manager.go
Expand Up @@ -368,35 +368,8 @@ func ToHistoryTreeInfo(serializer serialization.Serializer, blob *commonpb.DataB
return treeInfo, nil
}

func (m *executionManagerImpl) serializeInsertHistoryTreeRequest(
shardID int32,
info string,
branchToken []byte,
) (*InternalInsertHistoryTreeRequest, error) {
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(branchToken)
if err != nil {
return nil, err
}

// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchToken: branchToken,
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}

return &InternalInsertHistoryTreeRequest{
BranchInfo: branch,
TreeInfo: treeInfoBlob,
ShardID: shardID,
}, nil
}

func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
ctx context.Context,
request *AppendHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
Expand Down Expand Up @@ -449,6 +422,7 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(

req := &InternalAppendHistoryNodesRequest{
BranchToken: request.BranchToken,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
Expand All @@ -460,6 +434,20 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
ShardID: request.ShardID,
}

if req.IsNewBranch {
// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchToken: request.BranchToken,
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: request.Info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
req.TreeInfo = treeInfoBlob
}

if nodeID < GetBeginNodeID(branch) {
return nil, &InvalidPersistenceRequestError{
Msg: "cannot append to ancestors' nodes",
Expand All @@ -470,6 +458,7 @@ func (m *executionManagerImpl) serializeAppendHistoryNodesRequest(
}

func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(
ctx context.Context,
request *AppendRawHistoryNodesRequest,
) (*InternalAppendHistoryNodesRequest, error) {
branch, err := m.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
Expand Down Expand Up @@ -501,6 +490,7 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(

req := &InternalAppendHistoryNodesRequest{
BranchToken: request.BranchToken,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
Expand All @@ -512,6 +502,20 @@ func (m *executionManagerImpl) serializeAppendRawHistoryNodesRequest(
ShardID: request.ShardID,
}

if req.IsNewBranch {
// TreeInfo is only needed for new branch
treeInfoBlob, err := m.serializer.HistoryTreeInfoToBlob(&persistencespb.HistoryTreeInfo{
BranchToken: request.BranchToken,
BranchInfo: branch,
ForkTime: timestamp.TimeNowPtrUtc(),
Info: request.Info,
}, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return nil, err
}
req.TreeInfo = treeInfoBlob
}

if nodeID < GetBeginNodeID(branch) {
return nil, &InvalidPersistenceRequestError{
Msg: "cannot append to ancestors' nodes",
Expand All @@ -527,23 +531,16 @@ func (m *executionManagerImpl) AppendHistoryNodes(
request *AppendHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {

nodeReq, err := m.serializeAppendHistoryNodesRequest(request)
req, err := m.serializeAppendHistoryNodesRequest(ctx, request)

if err != nil {
return nil, err
}

err = m.persistence.AppendHistoryNodes(ctx, nodeReq)
if err == nil && request.IsNewBranch {
var treeReq *InternalInsertHistoryTreeRequest
treeReq, err = m.serializeInsertHistoryTreeRequest(request.ShardID, request.Info, request.BranchToken)
if err == nil {
// Only insert history tree if first history node append succeeds
err = m.persistence.InsertHistoryTree(ctx, treeReq)
}
}
err = m.persistence.AppendHistoryNodes(ctx, req)

return &AppendHistoryNodesResponse{
Size: len(nodeReq.Node.Events.Data),
Size: len(req.Node.Events.Data),
}, err
}

Expand All @@ -553,21 +550,12 @@ func (m *executionManagerImpl) AppendRawHistoryNodes(
request *AppendRawHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {

nodeReq, err := m.serializeAppendRawHistoryNodesRequest(request)
req, err := m.serializeAppendRawHistoryNodesRequest(ctx, request)
if err != nil {
return nil, err
}

err = m.persistence.AppendHistoryNodes(ctx, nodeReq)
if err == nil && request.IsNewBranch {
var treeReq *InternalInsertHistoryTreeRequest
treeReq, err = m.serializeInsertHistoryTreeRequest(request.ShardID, request.Info, request.BranchToken)
if err == nil {
// Only insert history tree if first history node append succeeds
err = m.persistence.InsertHistoryTree(ctx, treeReq)
}
}

err = m.persistence.AppendHistoryNodes(ctx, req)
return &AppendHistoryNodesResponse{
Size: len(request.History.Data),
}, err
Expand Down
14 changes: 0 additions & 14 deletions common/persistence/mock/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 4 additions & 10 deletions common/persistence/persistenceInterface.go
Expand Up @@ -149,7 +149,6 @@ type (
// The below are history V2 APIs
// V2 regards history events growing as a tree, decoupled from workflow concepts

InsertHistoryTree(ctx context.Context, request *InternalInsertHistoryTreeRequest) error
// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error
// DeleteHistoryNodes delete a node from history node table
Expand Down Expand Up @@ -478,15 +477,6 @@ type (
ExecutionState *persistencespb.WorkflowExecutionState
}

InternalInsertHistoryTreeRequest struct {
// The branch to be appended
BranchInfo *persistencespb.HistoryBranch
// Serialized TreeInfo
TreeInfo *commonpb.DataBlob
// Used in sharded data stores to identify which shard to use
ShardID int32
}

// InternalHistoryNode represent a history node metadata
InternalHistoryNode struct {
// The first eventID becomes the nodeID to be appended
Expand All @@ -503,10 +493,14 @@ type (
InternalAppendHistoryNodesRequest struct {
// The raw branch token
BranchToken []byte
// True if it is the first append request to the branch
IsNewBranch bool
// The info for clean up data in background
Info string
// The branch to be appended
BranchInfo *persistencespb.HistoryBranch
// Serialized TreeInfo
TreeInfo *commonpb.DataBlob
// The history node
Node InternalHistoryNode
// Used in sharded data stores to identify which shard to use
Expand Down

0 comments on commit 1709b3c

Please sign in to comment.