Skip to content

Commit

Permalink
Multi-cursor: shrink slice predicate (#3291)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Sep 2, 2022
1 parent 3315733 commit e32488e
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 25 deletions.
13 changes: 3 additions & 10 deletions service/history/queues/executable.go
Expand Up @@ -388,16 +388,6 @@ func (e *executableImpl) GetPriority() ctasks.Priority {
return e.priority
}

func (e *executableImpl) SetPriority(priority ctasks.Priority) {
e.Lock()
defer e.Unlock()

e.priority = priority
if e.priority > e.lowestPriority {
e.lowestPriority = e.priority
}
}

func (e *executableImpl) Attempt() int {
e.Lock()
defer e.Unlock()
Expand Down Expand Up @@ -456,4 +446,7 @@ func (e *executableImpl) updatePriority() {
e.Lock()
defer e.Unlock()
e.priority = newPriority
if e.priority > e.lowestPriority {
e.lowestPriority = e.priority
}
}
19 changes: 19 additions & 0 deletions service/history/queues/queue_base.go
Expand Up @@ -53,6 +53,18 @@ const (
nonDefaultReaderMaxPendingTaskCoefficient = 0.8

queueIOTimeout = 5 * time.Second

// Force creating new slice every forceNewSliceDuration
// so that the last slice in the default reader won't grow
// infinitely.
// The benefit of forcing new slice is:
// 1. As long as the last slice won't grow infinitly, task loading
// for that slice will complete and it's scope (both range and
// predicate) is able to shrink
// 2. Current task loading implementation can only unload the entire
// slice. If there's only one slice, we may unload all tasks for a
// given namespace.
forceNewSliceDuration = 5 * time.Minute
)

type (
Expand Down Expand Up @@ -85,6 +97,7 @@ type (
nonReadableScope Scope
readerGroup *ReaderGroup
lastPollTime time.Time
nextForceNewSliceTime time.Time

checkpointRetrier backoff.Retrier
checkpointTimer *time.Timer
Expand Down Expand Up @@ -321,6 +334,12 @@ func (p *queueBase) processNewRange() {
reader, ok := p.readerGroup.ReaderByID(DefaultReaderId)
if !ok {
p.readerGroup.NewReader(DefaultReaderId, newSlice)
return
}

if now := p.timeSource.Now(); now.After(p.nextForceNewSliceTime) {
reader.AppendSlices(newSlice)
p.nextForceNewSliceTime = now.Add(forceNewSliceDuration)
} else {
reader.MergeSlices(newSlice)
}
Expand Down
42 changes: 36 additions & 6 deletions service/history/queues/reader.go
Expand Up @@ -55,6 +55,7 @@ type (
WalkSlices(SliceIterator)
SplitSlices(SliceSplitter)
MergeSlices(...Slice)
AppendSlices(...Slice)
ClearSlices(SlicePredicate)
CompactSlices(SlicePredicate)
ShrinkSlices()
Expand Down Expand Up @@ -245,19 +246,19 @@ func (r *ReaderImpl) MergeSlices(incomingSlices ...Slice) {
incomingSlice := incomingSlices[incomingSliceIdx]

if currentSlice.Scope().Range.InclusiveMin.CompareTo(incomingSlice.Scope().Range.InclusiveMin) < 0 {
appendSlice(mergedSlices, currentSlice)
mergeOrAppendSlice(mergedSlices, currentSlice)
currentSliceElement = currentSliceElement.Next()
} else {
appendSlice(mergedSlices, incomingSlice)
mergeOrAppendSlice(mergedSlices, incomingSlice)
incomingSliceIdx++
}
}

for ; currentSliceElement != nil; currentSliceElement = currentSliceElement.Next() {
appendSlice(mergedSlices, currentSliceElement.Value.(Slice))
mergeOrAppendSlice(mergedSlices, currentSliceElement.Value.(Slice))
}
for _, slice := range incomingSlices[incomingSliceIdx:] {
appendSlice(mergedSlices, slice)
mergeOrAppendSlice(mergedSlices, slice)
}

// clear existing list
Expand All @@ -268,6 +269,35 @@ func (r *ReaderImpl) MergeSlices(incomingSlices ...Slice) {
r.monitor.SetSliceCount(r.readerID, r.slices.Len())
}

func (r *ReaderImpl) AppendSlices(incomingSlices ...Slice) {
if len(incomingSlices) == 0 {
return
}

validateSlicesOrderedDisjoint(incomingSlices)
if back := r.slices.Back(); back != nil {
lastSliceRange := back.Value.(Slice).Scope().Range
firstIncomingRange := incomingSlices[0].Scope().Range
if lastSliceRange.ExclusiveMax.CompareTo(firstIncomingRange.InclusiveMin) > 0 {
panic(fmt.Sprintf(
"Can not append slice to existing list of slices, incoming slice range: %v, existing slice range: %v ",
firstIncomingRange,
lastSliceRange,
))
}
}

r.Lock()
defer r.Unlock()

for _, incomingSlice := range incomingSlices {
r.slices.PushBack(incomingSlice)
}

r.resetNextReadSliceLocked()
r.monitor.SetSliceCount(r.readerID, r.slices.Len())
}

func (r *ReaderImpl) ClearSlices(predicate SlicePredicate) {
r.Lock()
defer r.Unlock()
Expand Down Expand Up @@ -321,7 +351,7 @@ func (r *ReaderImpl) ShrinkSlices() {
next = element.Next()

slice := element.Value.(Slice)
slice.ShrinkRange()
slice.ShrinkScope()
if scope := slice.Scope(); scope.IsEmpty() {
r.slices.Remove(element)
}
Expand Down Expand Up @@ -455,7 +485,7 @@ func (r *ReaderImpl) verifyPendingTaskSize() bool {
return r.monitor.GetTotalPendingTaskCount() < r.options.MaxPendingTasksCount()
}

func appendSlice(
func mergeOrAppendSlice(
slices *list.List,
incomingSlice Slice,
) {
Expand Down
1 change: 1 addition & 0 deletions service/history/queues/reader_group_test.go
Expand Up @@ -126,6 +126,7 @@ func (r *testReader) Scopes() []Scope { panic("not implemented") }
func (r *testReader) WalkSlices(SliceIterator) { panic("not implemented") }
func (r *testReader) SplitSlices(SliceSplitter) { panic("not implemented") }
func (r *testReader) MergeSlices(...Slice) { panic("not implemented") }
func (r *testReader) AppendSlices(...Slice) { panic("not implemented") }
func (r *testReader) ClearSlices(SlicePredicate) { panic("not implemented") }
func (r *testReader) CompactSlices(SlicePredicate) { panic("not implemented") }
func (r *testReader) ShrinkSlices() { panic("not implemented") }
Expand Down
28 changes: 28 additions & 0 deletions service/history/queues/reader_test.go
Expand Up @@ -195,6 +195,34 @@ func (s *readerSuite) TestMergeSlices() {
}
}

func (s *readerSuite) TestAppendSlices() {
totalScopes := 10
scopes := NewRandomScopes(totalScopes)
currentScopes := scopes[:totalScopes/2]
reader := s.newTestReader(currentScopes, nil)

incomingScopes := scopes[totalScopes/2:]
incomingSlices := make([]Slice, 0, len(incomingScopes))
for _, incomingScope := range incomingScopes {
incomingSlices = append(incomingSlices, NewSlice(nil, s.executableInitializer, s.monitor, incomingScope))
}

reader.AppendSlices(incomingSlices...)

appendedScopes := reader.Scopes()
s.Len(appendedScopes, totalScopes)
for idx, scope := range appendedScopes[:len(appendedScopes)-1] {
nextScope := appendedScopes[idx+1]
if scope.Range.ExclusiveMax.CompareTo(nextScope.Range.InclusiveMin) > 0 {
panic(fmt.Sprintf(
"Found overlapping scope in appended slices, left: %v, right: %v",
scope,
nextScope,
))
}
}
}

func (s *readerSuite) TestShrinkSlices() {
numScopes := 10
scopes := NewRandomScopes(numScopes)
Expand Down
38 changes: 34 additions & 4 deletions service/history/queues/slice.go
Expand Up @@ -31,6 +31,10 @@ import (
"go.temporal.io/server/service/history/tasks"
)

const (
shrinkPredicateMaxPendingNamespaces = 3
)

type (

// Slice manages the loading and status tracking of all
Expand All @@ -45,7 +49,7 @@ type (
CanMergeWithSlice(Slice) bool
MergeWithSlice(Slice) []Slice
CompactWithSlice(Slice) Slice
ShrinkRange()
ShrinkScope()
SelectTasks(readerID int32, batchSize int) ([]Executable, error)
MoreTasks() bool
TaskStats() TaskStats
Expand Down Expand Up @@ -316,9 +320,16 @@ func (s *SliceImpl) CompactWithSlice(slice Slice) Slice {
)
}

func (s *SliceImpl) ShrinkRange() {
func (s *SliceImpl) ShrinkScope() {
s.stateSanityCheck()

s.shrinkRange()
s.shrinkPredicate()

s.monitor.SetSlicePendingTaskCount(s, len(s.executableTracker.pendingExecutables))
}

func (s *SliceImpl) shrinkRange() {
minPendingTaskKey := s.executableTracker.shrink()

minIteratorKey := tasks.MaximumKey
Expand All @@ -335,8 +346,27 @@ func (s *SliceImpl) ShrinkRange() {
}

s.scope.Range.InclusiveMin = newRangeMin
}

s.monitor.SetSlicePendingTaskCount(s, len(s.executableTracker.pendingExecutables))
func (s *SliceImpl) shrinkPredicate() {
if len(s.iterators) != 0 {
// predicate can't be updated if there're still
// tasks in persistence, as we don't know if those
// tasks will be filtered out or not if predicate is updated.
return
}

if len(s.executableTracker.pendingPerNamesapce) > shrinkPredicateMaxPendingNamespaces {
// only shrink predicate if there're few namespaces left
return
}

pendingNamespaceIDs := make([]string, 0, len(s.executableTracker.pendingPerNamesapce))
for namespaceID := range s.executableTracker.pendingPerNamesapce {
pendingNamespaceIDs = append(pendingNamespaceIDs, namespaceID.String())
}
namespacePredicate := tasks.NewNamespacePredicate(pendingNamespaceIDs)
s.scope.Predicate = tasks.AndPredicates(s.scope.Predicate, namespacePredicate)
}

func (s *SliceImpl) SelectTasks(readerID int32, batchSize int) ([]Executable, error) {
Expand Down Expand Up @@ -400,7 +430,7 @@ func (s *SliceImpl) TaskStats() TaskStats {
func (s *SliceImpl) Clear() {
s.stateSanityCheck()

s.ShrinkRange()
s.ShrinkScope()

s.iterators = []Iterator{
NewIterator(s.paginationFnProvider, s.scope.Range),
Expand Down
45 changes: 43 additions & 2 deletions service/history/queues/slice_test.go
Expand Up @@ -329,7 +329,7 @@ func (s *sliceSuite) TestCompactWithSlice() {
s.Panics(func() { slice2.stateSanityCheck() })
}

func (s *sliceSuite) TestShrinkRange() {
func (s *sliceSuite) TestShrinkScope_ShrinkRange() {
r := NewRandomRange()
predicate := predicates.Universal[tasks.Task]()

Expand Down Expand Up @@ -362,7 +362,7 @@ func (s *sliceSuite) TestShrinkRange() {
slice.pendingExecutables[executable.GetKey()] = executable
}

slice.ShrinkRange()
slice.ShrinkScope()
s.Len(slice.pendingExecutables, len(executables)-numAcked)
s.validateSliceState(slice)

Expand All @@ -378,6 +378,47 @@ func (s *sliceSuite) TestShrinkRange() {
s.Equal(NewRange(newInclusiveMin, r.ExclusiveMax), slice.Scope().Range)
}

func (s *sliceSuite) TestShrinkScope_ShrinkPredicate() {
r := NewRandomRange()
predicate := predicates.Universal[tasks.Task]()

slice := NewSlice(nil, s.executableInitializer, s.monitor, NewScope(r, predicate))
slice.iterators = []Iterator{} // manually set iterators to be empty to trigger predicate update

executables := s.randomExecutablesInRange(r, 100)
slices.SortFunc(executables, func(a, b Executable) bool {
return a.GetKey().CompareTo(b.GetKey()) < 0
})

pendingNamespaceID := []string{uuid.New(), uuid.New()}
s.True(len(pendingNamespaceID) <= shrinkPredicateMaxPendingNamespaces)
for _, executable := range executables {
mockExecutable := executable.(*MockExecutable)

mockExecutable.EXPECT().GetTask().Return(mockExecutable).AnyTimes()

acked := rand.Intn(10) < 8
if acked {
mockExecutable.EXPECT().GetNamespaceID().Return(uuid.New()).AnyTimes()
mockExecutable.EXPECT().State().Return(ctasks.TaskStateAcked).MaxTimes(1)
} else {
mockExecutable.EXPECT().GetNamespaceID().Return(pendingNamespaceID[rand.Intn(len(pendingNamespaceID))]).AnyTimes()
mockExecutable.EXPECT().State().Return(ctasks.TaskStatePending).MaxTimes(1)
}

slice.executableTracker.add(executable)
}

slice.ShrinkScope()
s.validateSliceState(slice)

namespacePredicate, ok := slice.Scope().Predicate.(*tasks.NamespacePredicate)
s.True(ok)
for namespaceID := range namespacePredicate.NamespaceIDs {
s.True(slices.Index(pendingNamespaceID, namespaceID) != -1)
}
}

func (s *sliceSuite) TestSelectTasks_NoError() {
r := NewRandomRange()
namespaceIDs := []string{uuid.New(), uuid.New(), uuid.New(), uuid.New()}
Expand Down
6 changes: 6 additions & 0 deletions service/history/queues/tracker.go
Expand Up @@ -117,6 +117,12 @@ func (t *executableTracker) shrink() tasks.Key {
minPendingTaskKey = tasks.MinKey(minPendingTaskKey, key)
}

for namespaceID, numPending := range t.pendingPerNamesapce {
if numPending == 0 {
delete(t.pendingPerNamesapce, namespaceID)
}
}

return minPendingTaskKey
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueFactory.go
Expand Up @@ -73,7 +73,7 @@ func NewTimerQueueFactory(
},
params.NamespaceRegistry,
params.TimeSource,
params.MetricsHandler,
params.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationTimerQueueProcessor)),
params.Logger,
)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueFactory.go
Expand Up @@ -76,7 +76,7 @@ func NewTransferQueueFactory(
},
params.NamespaceRegistry,
params.TimeSource,
params.MetricsHandler,
params.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationTransferQueueProcessor)),
params.Logger,
)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/visibilityQueueFactory.go
Expand Up @@ -66,7 +66,7 @@ func NewVisibilityQueueFactory(
},
params.NamespaceRegistry,
params.TimeSource,
params.MetricsHandler,
params.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationVisibilityQueueProcessor)),
params.Logger,
)
}
Expand Down

0 comments on commit e32488e

Please sign in to comment.