Skip to content

Commit

Permalink
Fix unable to reset workflow issue (#1673)
Browse files Browse the repository at this point in the history
* Properly calculate lower / higher bound when reading each history branch, otherwise history event branch after fork operation can return more data than requested.
  • Loading branch information
wxing1292 committed Jun 23, 2021
1 parent 6038900 commit 35c5ff4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
4 changes: 3 additions & 1 deletion common/persistence/history_store.go
Expand Up @@ -411,7 +411,9 @@ func (m *historyV2ManagerImpl) readRawHistoryBranch(
currentBranch := branchAncestors[token.CurrentRangeIndex]
// minNodeID remains the same, since caller can read from the middle
// maxNodeID need to be shortened since this branch can contain additional history nodes
maxNodeID = currentBranch.GetEndNodeId()
if currentBranch.GetEndNodeId() < maxNodeID {
maxNodeID = currentBranch.GetEndNodeId()
}
branchID := currentBranch.GetBranchId()
resp, err := m.persistence.ReadHistoryBranch(&InternalReadHistoryBranchRequest{
ShardID: shardID,
Expand Down
43 changes: 43 additions & 0 deletions common/persistence/tests/history_store_test.go
Expand Up @@ -109,6 +109,7 @@ func (s *historyEventsSuite) TestAppendSelect_First() {
)
s.appendHistoryEvents(shardID, branchToken, eventsPacket)

s.Equal(eventsPacket.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4))
s.Equal(eventsPacket.events, s.listAllHistoryEvents(shardID, branchToken))
}

Expand Down Expand Up @@ -136,6 +137,8 @@ func (s *historyEventsSuite) TestAppendSelect_NonShadowing() {
s.appendHistoryEvents(shardID, branchToken, eventsPacket1)
events = append(events, eventsPacket1.events...)

s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4))
s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, 4, 6))
s.Equal(events, s.listAllHistoryEvents(shardID, branchToken))
}

Expand Down Expand Up @@ -175,6 +178,8 @@ func (s *historyEventsSuite) TestAppendSelect_Shadowing() {
s.appendHistoryEvents(shardID, branchToken, eventsPacket11)
events1 = append(events1, eventsPacket11.events...)

s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4))
s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, branchToken, 4, 6))
s.Equal(events1, s.listAllHistoryEvents(shardID, branchToken))
}

Expand Down Expand Up @@ -213,6 +218,10 @@ func (s *historyEventsSuite) TestAppendForkSelect_NoShadowing() {
s.appendHistoryEvents(shardID, newBranchToken, eventsPacket11)
events1 = append(events1, eventsPacket11.events...)

s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4))
s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4))
s.Equal(eventsPacket10.events, s.listHistoryEvents(shardID, branchToken, 4, 6))
s.Equal(eventsPacket11.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6))
s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken))
s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken))
}
Expand Down Expand Up @@ -267,6 +276,12 @@ func (s *historyEventsSuite) TestAppendForkSelect_Shadowing_NonLastBranch() {
s.appendHistoryEvents(shardID, newBranchToken, eventsPacket21)
events1 = append(events1, eventsPacket21.events...)

s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, branchToken, common.FirstEventID, 4))
s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4))
s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, branchToken, 4, 6))
s.Equal(eventsPacket1.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6))
s.Equal(eventsPacket20.events, s.listHistoryEvents(shardID, branchToken, 6, 7))
s.Equal(eventsPacket21.events, s.listHistoryEvents(shardID, newBranchToken, 6, 7))
s.Equal(events0, s.listAllHistoryEvents(shardID, branchToken))
s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken))
}
Expand Down Expand Up @@ -304,6 +319,8 @@ func (s *historyEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() {
s.appendHistoryEvents(shardID, newBranchToken, eventsPacket20)
events0 = append(events0, eventsPacket20.events...)

s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4))
s.Equal(eventsPacket20.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6))
s.Equal(events0, s.listAllHistoryEvents(shardID, newBranchToken))

eventsPacket21 := s.newHistoryEvents(
Expand All @@ -314,6 +331,8 @@ func (s *historyEventsSuite) TestAppendForkSelect_Shadowing_LastBranch() {
s.appendHistoryEvents(shardID, newBranchToken, eventsPacket21)
events1 = append(events1, eventsPacket21.events...)

s.Equal(eventsPacket0.events, s.listHistoryEvents(shardID, newBranchToken, common.FirstEventID, 4))
s.Equal(eventsPacket21.events, s.listHistoryEvents(shardID, newBranchToken, 4, 6))
s.Equal(events1, s.listAllHistoryEvents(shardID, newBranchToken))
}

Expand Down Expand Up @@ -501,6 +520,30 @@ func (s *historyEventsSuite) trimHistoryBranch(
s.NoError(err)
}

func (s *historyEventsSuite) listHistoryEvents(
shardID int32,
branchToken []byte,
startEventID int64,
endEventID int64,
) []*historypb.HistoryEvent {
var token []byte
var events []*historypb.HistoryEvent
for doContinue := true; doContinue; doContinue = len(token) > 0 {
resp, err := s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{
ShardID: shardID,
BranchToken: branchToken,
MinEventID: startEventID,
MaxEventID: endEventID,
PageSize: 1, // use 1 here for better testing exp
NextPageToken: token,
})
s.NoError(err)
token = resp.NextPageToken
events = append(events, resp.HistoryEvents...)
}
return events
}

func (s *historyEventsSuite) listAllHistoryEvents(
shardID int32,
branchToken []byte,
Expand Down

0 comments on commit 35c5ff4

Please sign in to comment.