Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement history node deletion logic #1443

Merged
merged 1 commit into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ const (

// PersistenceAppendHistoryNodesScope tracks AppendHistoryNodes calls made by service to persistence layer
PersistenceAppendHistoryNodesScope
// PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer
PersistenceDeleteHistoryNodesScope
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
PersistenceReadHistoryBranchScope
// PersistenceForkHistoryBranchScope tracks ForkHistoryBranch calls made by service to persistence layer
Expand Down Expand Up @@ -1176,6 +1178,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"},
PersistenceCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes"},
PersistenceDeleteHistoryNodesScope: {operation: "DeleteHistoryNodes"},
PersistenceReadHistoryBranchScope: {operation: "ReadHistoryBranch"},
PersistenceForkHistoryBranchScope: {operation: "ForkHistoryBranch"},
PersistenceDeleteHistoryBranchScope: {operation: "DeleteHistoryBranch"},
Expand Down
35 changes: 32 additions & 3 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
v2templateReadHistoryNodeMetadata = `SELECT node_id, prev_txn_id, txn_id FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateDeleteHistoryNode = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id = ? AND txn_id = ? `

v2templateRangeDeleteHistoryNode = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `

// below are templates for history_tree table
Expand Down Expand Up @@ -95,17 +97,16 @@ func newHistoryPersistence(
}, nil
}

// AppendHistoryNodes upsert a batch of events as a single node to a history branch
// AppendHistoryNode upsert a batch of events as a single node to a history branch
// Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
request *p.InternalAppendHistoryNodesRequest,
) error {

branchInfo := request.BranchInfo
beginNodeID := p.GetBeginNodeID(branchInfo)
node := request.Node

if node.NodeID < beginNodeID {
if node.NodeID < p.GetBeginNodeID(branchInfo) {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
Expand Down Expand Up @@ -161,6 +162,34 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
return nil
}

// DeleteHistoryNode delete a history node
func (h *cassandraHistoryV2Persistence) DeleteHistoryNodes(
request *p.InternalDeleteHistoryNodesRequest,
) error {
branchInfo := request.BranchInfo
treeID := branchInfo.TreeId
branchID := branchInfo.BranchId
nodeID := request.NodeID
txnID := request.TransactionID

if nodeID < p.GetBeginNodeID(branchInfo) {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot delete from ancestors' nodes"),
}
}

query := h.session.Query(v2templateDeleteHistoryNode,
treeID,
branchID,
nodeID,
txnID,
)
if err := query.Exec(); err != nil {
return gocql.ConvertError("DeleteHistoryNodes", err)
}
return nil
}

// ReadHistoryBranch returns history node data for a branch
// NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ type (
// V2 regards history events growing as a tree, decoupled from workflow concepts
// For Temporal, treeID is new runID, except for fork(reset), treeID will be the runID that it forks from.

// AppendHistoryNodes add(or override) a batch of nodes to a history branch
// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (m *historyV2ManagerImpl) GetHistoryTree(
return m.persistence.GetHistoryTree(request)
}

// AppendHistoryNodes add(or override) a node to a history branch
// AppendHistoryNodes add a node to history node table
func (m *historyV2ManagerImpl) AppendHistoryNodes(
request *AppendHistoryNodesRequest,
) (*AppendHistoryNodesResponse, error) {
Expand Down
16 changes: 15 additions & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ type (
// The below are history V2 APIs
// V2 regards history events growing as a tree, decoupled from workflow concepts

// AppendHistoryNodes add(or override) a node to a history branch
// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(request *InternalAppendHistoryNodesRequest) error
// DeleteHistoryNodes delete a node from history node table
DeleteHistoryNodes(request *InternalDeleteHistoryNodesRequest) error
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error)
// ForkHistoryBranch forks a new branch from a old branch
Expand Down Expand Up @@ -374,6 +376,18 @@ type (
NewBranchInfo *persistencespb.HistoryBranch
}

// InternalDeleteHistoryNodesRequest is used to remove a history node
InternalDeleteHistoryNodesRequest struct {
// Used in sharded data stores to identify which shard to use
ShardID int32
// The branch to be appended
BranchInfo *persistencespb.HistoryBranch
// node ID of the history node
NodeID int64
// transaction ID of the history node
TransactionID int64
}

// InternalDeleteHistoryBranchRequest is used to remove a history branch
InternalDeleteHistoryBranchRequest struct {
// branch to be deleted
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ func (p *historyV2PersistenceClient) Close() {
p.persistence.Close()
}

// AppendHistoryNodes add(or override) a node to a history branch
// AppendHistoryNodes add a node to history node table
func (p *historyV2PersistenceClient) AppendHistoryNodes(request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceAppendHistoryNodesScope, metrics.PersistenceRequests)
sw := p.metricClient.StartTimer(metrics.PersistenceAppendHistoryNodesScope, metrics.PersistenceLatency)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func (p *historyV2RateLimitedPersistenceClient) Close() {
p.persistence.Close()
}

// AppendHistoryNodes add(or override) a node to a history branch
// AppendHistoryNodes add a node to history node table
func (p *historyV2RateLimitedPersistenceClient) AppendHistoryNodes(request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,28 @@ func newHistoryV2Persistence(
}, nil
}

// AppendHistoryNodes add(or override) a node to a history branch
// AppendHistoryNode add(or override) a node to a history branch
func (m *sqlHistoryV2Manager) AppendHistoryNodes(
request *p.InternalAppendHistoryNodesRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
branchInfo := request.BranchInfo
beginNodeID := p.GetBeginNodeID(branchInfo)
node := request.Node

branchIDBytes, err := primitives.ParseUUID(branchInfo.GetBranchId())
if err != nil {
return err
if node.NodeID < p.GetBeginNodeID(branchInfo) {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
}

treeIDBytes, err := primitives.ParseUUID(branchInfo.GetTreeId())
if err != nil {
return err
}

if node.NodeID < beginNodeID {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
branchIDBytes, err := primitives.ParseUUID(branchInfo.GetBranchId())
if err != nil {
return err
}

nodeRow := &sqlplugin.HistoryNodeRow{
Expand Down Expand Up @@ -155,7 +154,47 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes(
if m.db.IsDupEntryError(err) {
return &p.ConditionFailedError{Msg: fmt.Sprintf("AppendHistoryNodes: row already exist: %v", err)}
}
return serviceerror.NewInternal(fmt.Sprintf("AppendHistoryEvents: %v", err))
return serviceerror.NewInternal(fmt.Sprintf("AppendHistoryNodes: %v", err))
}
return nil
}

func (m *sqlHistoryV2Manager) DeleteHistoryNodes(
request *p.InternalDeleteHistoryNodesRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
branchInfo := request.BranchInfo
nodeID := request.NodeID
txnID := request.TransactionID
shardID := request.ShardID

if nodeID < p.GetBeginNodeID(branchInfo) {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
}

treeIDBytes, err := primitives.ParseUUID(branchInfo.GetTreeId())
if err != nil {
return err
}
branchIDBytes, err := primitives.ParseUUID(branchInfo.GetBranchId())
if err != nil {
return err
}

nodeRow := &sqlplugin.HistoryNodeRow{
TreeID: treeIDBytes,
BranchID: branchIDBytes,
NodeID: nodeID,
TxnID: txnID,
ShardID: shardID,
}

_, err = m.db.DeleteFromHistoryNode(ctx, nodeRow)
if err != nil {
return serviceerror.NewInternal(fmt.Sprintf("DeleteHistoryNodes: %v", err))
}
return nil
}
Expand Down Expand Up @@ -193,7 +232,7 @@ func (m *sqlHistoryV2Manager) ReadHistoryBranch(
}
}

rows, err := m.db.SelectFromHistoryNode(ctx, sqlplugin.HistoryNodeSelectFilter{
rows, err := m.db.RangeSelectFromHistoryNode(ctx, sqlplugin.HistoryNodeSelectFilter{
ShardID: request.ShardID,
TreeID: treeIDBytes,
BranchID: branchIDBytes,
Expand Down Expand Up @@ -440,7 +479,7 @@ func (m *sqlHistoryV2Manager) DeleteHistoryBranch(
// No any branch is using this range, we can delete all of it
deleteFilter.MinNodeID = br.BeginNodeId
}
_, err := tx.DeleteFromHistoryNode(ctx, deleteFilter)
_, err := tx.RangeDeleteFromHistoryNode(ctx, deleteFilter)
if err != nil {
return err
}
Expand Down
File renamed without changes.
5 changes: 3 additions & 2 deletions common/persistence/sql/sqlplugin/history_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type (
// HistoryNode is the SQL persistence interface for history nodes
HistoryNode interface {
InsertIntoHistoryNode(ctx context.Context, row *HistoryNodeRow) (sql.Result, error)
SelectFromHistoryNode(ctx context.Context, filter HistoryNodeSelectFilter) ([]HistoryNodeRow, error)
DeleteFromHistoryNode(ctx context.Context, filter HistoryNodeDeleteFilter) (sql.Result, error)
DeleteFromHistoryNode(ctx context.Context, row *HistoryNodeRow) (sql.Result, error)
RangeSelectFromHistoryNode(ctx context.Context, filter HistoryNodeSelectFilter) ([]HistoryNodeRow, error)
RangeDeleteFromHistoryNode(ctx context.Context, filter HistoryNodeDeleteFilter) (sql.Result, error)
}
)
25 changes: 22 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
`WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND ((node_id = ? AND txn_id > ?) OR node_id > ?) AND node_id < ? ` +
`ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? `

deleteHistoryNodeQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id = ? AND txn_id = ? `

deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? `

// below are templates for history_tree table
Expand All @@ -65,16 +67,33 @@ func (mdb *db) InsertIntoHistoryNode(
ctx context.Context,
row *sqlplugin.HistoryNodeRow,
) (sql.Result, error) {
// NOTE: Query 5.6 doesn't support clustering order, to workaround, we let txn_id multiple by -1
// NOTE: txn_id is *= -1 within DB
row.TxnID = -row.TxnID
return mdb.conn.NamedExecContext(ctx,
addHistoryNodesQuery,
row,
)
}

// DeleteFromHistoryNode delete a row from history_node table
func (mdb *db) DeleteFromHistoryNode(
ctx context.Context,
row *sqlplugin.HistoryNodeRow,
) (sql.Result, error) {
// NOTE: txn_id is *= -1 within DB
row.TxnID = -row.TxnID
return mdb.conn.ExecContext(ctx,
deleteHistoryNodeQuery,
row.ShardID,
row.TreeID,
row.BranchID,
row.NodeID,
row.TxnID,
)
}

// SelectFromHistoryNode reads one or more rows from history_node table
func (mdb *db) SelectFromHistoryNode(
func (mdb *db) RangeSelectFromHistoryNode(
ctx context.Context,
filter sqlplugin.HistoryNodeSelectFilter,
) ([]sqlplugin.HistoryNodeRow, error) {
Expand Down Expand Up @@ -108,7 +127,7 @@ func (mdb *db) SelectFromHistoryNode(
}

// DeleteFromHistoryNode deletes one or more rows from history_node table
func (mdb *db) DeleteFromHistoryNode(
func (mdb *db) RangeDeleteFromHistoryNode(
ctx context.Context,
filter sqlplugin.HistoryNodeDeleteFilter,
) (sql.Result, error) {
Expand Down
25 changes: 22 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
`WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND ((node_id = $4 AND txn_id > $5) OR node_id > $6) AND node_id < $7 ` +
`ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $8 `

deleteHistoryNodeQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id = $4 AND txn_id = $5 `

deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 `

// below are templates for history_tree table
Expand All @@ -66,16 +68,33 @@ func (pdb *db) InsertIntoHistoryNode(
ctx context.Context,
row *sqlplugin.HistoryNodeRow,
) (sql.Result, error) {
// NOTE: Query 5.6 doesn't support clustering order, to workaround, we let txn_id multiple by -1
// NOTE: txn_id is *= -1 within DB
row.TxnID = -row.TxnID
return pdb.conn.NamedExecContext(ctx,
addHistoryNodesQuery,
row,
)
}

// DeleteFromHistoryNode delete a row from history_node table
func (pdb *db) DeleteFromHistoryNode(
ctx context.Context,
row *sqlplugin.HistoryNodeRow,
) (sql.Result, error) {
// NOTE: txn_id is *= -1 within DB
row.TxnID = -row.TxnID
return pdb.conn.ExecContext(ctx,
deleteHistoryNodeQuery,
row.ShardID,
row.TreeID,
row.BranchID,
row.NodeID,
row.TxnID,
)
}

// SelectFromHistoryNode reads one or more rows from history_node table
func (pdb *db) SelectFromHistoryNode(
func (pdb *db) RangeSelectFromHistoryNode(
ctx context.Context,
filter sqlplugin.HistoryNodeSelectFilter,
) ([]sqlplugin.HistoryNodeRow, error) {
Expand Down Expand Up @@ -110,7 +129,7 @@ func (pdb *db) SelectFromHistoryNode(
}

// DeleteFromHistoryNode deletes one or more rows from history_node table
func (pdb *db) DeleteFromHistoryNode(
func (pdb *db) RangeDeleteFromHistoryNode(
ctx context.Context,
filter sqlplugin.HistoryNodeDeleteFilter,
) (sql.Result, error) {
Expand Down