Skip to content

Commit

Permalink
Reduce redundant history tree creation for workflow reset (#825)
Browse files Browse the repository at this point in the history
* When performing workflow reset & terminating current workflow, one unnecessary create history tree API is called, this PR remove the redundant call to improve performance
  • Loading branch information
wxing1292 committed Oct 13, 2020
1 parent 490d2fa commit abf5779
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 32 deletions.
8 changes: 3 additions & 5 deletions common/persistence/sql/sqlplugin/mysql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ const (
deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? `

// below are templates for history_tree table
upsertHistoryTreeQuery = `INSERT INTO history_tree (` +
addHistoryTreeQuery = `INSERT INTO history_tree (` +
`shard_id, tree_id, branch_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :data, :data_encoding) ` +
`ON DUPLICATE KEY UPDATE ` +
`data=VALUES(data), data_encoding=VALUES(data_encoding)`
`VALUES (:shard_id, :tree_id, :branch_id, :data, :data_encoding) `

getHistoryTreeQuery = `SELECT branch_id, data, data_encoding FROM history_tree WHERE shard_id = ? AND tree_id = ? `

Expand Down Expand Up @@ -83,7 +81,7 @@ func (mdb *db) DeleteFromHistoryNode(filter sqlplugin.HistoryNodeDeleteFilter) (

// InsertIntoHistoryTree inserts a row into history_tree table
func (mdb *db) InsertIntoHistoryTree(row *sqlplugin.HistoryTreeRow) (sql.Result, error) {
return mdb.conn.NamedExec(upsertHistoryTreeQuery, row)
return mdb.conn.NamedExec(addHistoryTreeQuery, row)
}

// SelectFromHistoryTree reads one or more rows from history_tree table
Expand Down
7 changes: 3 additions & 4 deletions common/persistence/sql/sqlplugin/postgresql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ const (
deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 `

// below are templates for history_tree table
upsertHistoryTreeQuery = `INSERT INTO history_tree (` +
addHistoryTreeQuery = `INSERT INTO history_tree (` +
`shard_id, tree_id, branch_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :data, :data_encoding) ` +
`ON CONFLICT (shard_id, tree_id, branch_id) DO UPDATE SET data = excluded.data, data_encoding = excluded.data_encoding`
`VALUES (:shard_id, :tree_id, :branch_id, :data, :data_encoding) `

getHistoryTreeQuery = `SELECT branch_id, data, data_encoding FROM history_tree WHERE shard_id = $1 AND tree_id = $2 `

Expand Down Expand Up @@ -82,7 +81,7 @@ func (pdb *db) DeleteFromHistoryNode(filter sqlplugin.HistoryNodeDeleteFilter) (

// InsertIntoHistoryTree inserts a row into history_tree table
func (pdb *db) InsertIntoHistoryTree(row *sqlplugin.HistoryTreeRow) (sql.Result, error) {
return pdb.conn.NamedExec(upsertHistoryTreeQuery, row)
return pdb.conn.NamedExec(addHistoryTreeQuery, row)
}

// SelectFromHistoryTree reads one or more rows from history_tree table
Expand Down
34 changes: 16 additions & 18 deletions common/persistence/sql/sqlplugin/tests/history_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,22 @@ func (s *historyTreeSuite) TestInsert_Success() {
s.Equal(1, int(rowsAffected))
}

// TODO: when issue https://github.com/temporalio/temporal/issues/818
// is resolved, uncomment this test
//func (s *historyTreeSuite) TestInsert_Fail_Duplicate() {
// shardID := rand.Int31()
// treeID := primitives.NewUUID()
// branchID := primitives.NewUUID()
//
// node := s.newRandomTreeRow(shardID, treeID, branchID)
// result, err := s.store.InsertIntoHistoryTree(&node)
// s.NoError(err)
// rowsAffected, err := result.RowsAffected()
// s.NoError(err)
// s.Equal(1, int(rowsAffected))
//
// node = s.newRandomTreeRow(shardID, treeID, branchID)
// _, err = s.store.InsertIntoHistoryTree(&node)
// s.Error(err) // TODO persistence layer should do proper error translation
//}
func (s *historyTreeSuite) TestInsert_Fail_Duplicate() {
shardID := rand.Int31()
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()

node := s.newRandomTreeRow(shardID, treeID, branchID)
result, err := s.store.InsertIntoHistoryTree(&node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
s.NoError(err)
s.Equal(1, int(rowsAffected))

node = s.newRandomTreeRow(shardID, treeID, branchID)
_, err = s.store.InsertIntoHistoryTree(&node)
s.Error(err) // TODO persistence layer should do proper error translation
}

func (s *historyTreeSuite) TestInsertSelect() {
shardID := rand.Int31()
Expand Down
20 changes: 15 additions & 5 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,12 +699,22 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNew(
}
newWorkflowSizeSize := newContext.getHistorySize()
startEvents := newWorkflowEventsSeq[0]
eventsSize, err := c.persistFirstWorkflowEvents(startEvents)
if err != nil {
return err
firstEventID := startEvents.Events[0].EventId
if firstEventID == common.FirstEventID {
eventsSize, err := c.persistFirstWorkflowEvents(startEvents)
if err != nil {
return err
}
newWorkflowSizeSize += eventsSize
newContext.setHistorySize(newWorkflowSizeSize)
} else {
eventsSize, err := c.persistNonFirstWorkflowEvents(startEvents)
if err != nil {
return err
}
newWorkflowSizeSize += eventsSize
newContext.setHistorySize(newWorkflowSizeSize)
}
newWorkflowSizeSize += eventsSize
newContext.setHistorySize(newWorkflowSizeSize)
newWorkflow.ExecutionStats = &persistenceblobs.ExecutionStats{
HistorySize: newWorkflowSizeSize,
}
Expand Down

0 comments on commit abf5779

Please sign in to comment.