Skip to content

Commit

Permalink
Simplify history engine task read ID logic (#2724)
Browse files Browse the repository at this point in the history
* Do not update max readable task ID if transaction to DB is rejected
* Remove redundant logic which try to update max readable task ID after shard range ID increase
  • Loading branch information
wxing1292 committed Apr 14, 2022
1 parent d541fc0 commit 6409090
Showing 1 changed file with 10 additions and 17 deletions.
27 changes: 10 additions & 17 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type (

contextRequestAcquire struct{}
contextRequestAcquired struct{}
contextRequestLost struct{ newMaxReadLevel int64 }
contextRequestLost struct{}
contextRequestStop struct{}
contextRequestFinishStop struct{}
)
Expand Down Expand Up @@ -1305,9 +1305,6 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
*serviceerror.ResourceExhausted:
// Persistence failure that means the write was definitely not committed:
// No special handling required for these errors.
// Update max read level here anyway because we already allocated the
// task ids and will not reuse them.
s.updateMaxReadLevelLocked(newMaxReadLevel)
return err

case *persistence.ShardOwnershipLostError:
Expand All @@ -1321,9 +1318,9 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
// the shard in the background. If successful, we'll get a new RangeID, to guarantee that subsequent
// reads will either see that write, or know for certain that it failed. This allows the callers to
// reliably check the outcome by performing a read. If we fail, we'll shut down the shard.
// We only want to update the max read level _after_ the re-acquire succeeds, not right now, otherwise
// a write that gets applied after we see a timeout could cause us to lose tasks.
s.transitionLocked(contextRequestLost{newMaxReadLevel: newMaxReadLevel})
// Note that reacquiring the shard will cause the max read level to be updated
// to the new range (i.e. past newMaxReadLevel).
s.transitionLocked(contextRequestLost{})
return err
}
}
Expand Down Expand Up @@ -1483,9 +1480,9 @@ func (s *ContextImpl) transitionLocked(request contextRequest) {
*/

setStateAcquiring := func(newMaxReadLevel int64) {
setStateAcquiring := func() {
s.state = contextStateAcquiring
go s.acquireShard(newMaxReadLevel)
go s.acquireShard()
}

setStateStopping := func() {
Expand All @@ -1510,7 +1507,7 @@ func (s *ContextImpl) transitionLocked(request contextRequest) {
case contextStateInitialized:
switch request.(type) {
case contextRequestAcquire:
setStateAcquiring(0)
setStateAcquiring()
return
case contextRequestStop:
setStateStopping()
Expand All @@ -1536,11 +1533,11 @@ func (s *ContextImpl) transitionLocked(request contextRequest) {
return
}
case contextStateAcquired:
switch request := request.(type) {
switch request.(type) {
case contextRequestAcquire:
return // nothing to to do, already acquired
case contextRequestLost:
setStateAcquiring(request.newMaxReadLevel)
setStateAcquiring()
return
case contextRequestStop:
setStateStopping()
Expand Down Expand Up @@ -1699,7 +1696,7 @@ func (s *ContextImpl) getRemoteClusterInfoLocked(clusterName string) *remoteClus
return info
}

func (s *ContextImpl) acquireShard(newMaxReadLevel int64) {
func (s *ContextImpl) acquireShard() {
// Retry for 5m, with interval up to 10s (default)
policy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
policy.SetExpirationInterval(5 * time.Minute)
Expand Down Expand Up @@ -1753,10 +1750,6 @@ func (s *ContextImpl) acquireShard(newMaxReadLevel int64) {
s.engine = engine
}

// Set max read level after a re-acquisition (if this is the first
// acquisition, newMaxReadLevel will be zero so it's a no-op)
s.updateMaxReadLevelLocked(newMaxReadLevel)

s.transitionLocked(contextRequestAcquired{})
return nil
}
Expand Down

0 comments on commit 6409090

Please sign in to comment.